mirror of
https://github.com/pypiserver/pypiserver
synced 2024-12-20 13:55:49 +01:00
parent
03e30846d8
commit
91a5ebd8b1
@ -65,7 +65,7 @@ class CacheManager(object):
|
||||
cache = self.digest_cache[hash_algo]
|
||||
except KeyError:
|
||||
cache = self.digest_cache.setdefault(hash_algo, {})
|
||||
|
||||
|
||||
try:
|
||||
return cache[fpath]
|
||||
except KeyError:
|
||||
|
@ -228,8 +228,9 @@ class PkgFile(object):
|
||||
def fname_and_hash(self, hash_algo):
|
||||
if not hasattr(self, '_fname_and_hash'):
|
||||
if hash_algo:
|
||||
self._fname_and_hash = '%s#%s=%.32s' % (self.relfn_unix, hash_algo,
|
||||
digest_file(self.fn, hash_algo))
|
||||
self._fname_and_hash = '%s#%s=%s' % (
|
||||
self.relfn_unix, hash_algo, digest_file(self.fn, hash_algo)
|
||||
)
|
||||
else:
|
||||
self._fname_and_hash = self.relfn_unix
|
||||
return self._fname_and_hash
|
||||
@ -308,7 +309,7 @@ def _digest_file(fpath, hash_algo):
|
||||
with open(fpath, 'rb') as f:
|
||||
for block in iter(lambda: f.read(blocksize), b''):
|
||||
digester.update(block)
|
||||
return digester.hexdigest()[:32]
|
||||
return digester.hexdigest()
|
||||
|
||||
|
||||
try:
|
||||
|
@ -3,10 +3,13 @@
|
||||
# Builtin imports
|
||||
import logging
|
||||
|
||||
try:
|
||||
|
||||
try: # python 3
|
||||
from html.parser import HTMLParser
|
||||
from html import unescape
|
||||
except ImportError:
|
||||
from HTMLParser import HTMLParser
|
||||
unescape = HTMLParser().unescape
|
||||
|
||||
try:
|
||||
import xmlrpc.client as xmlrpclib
|
||||
@ -27,7 +30,6 @@ import tests.test_core as test_core
|
||||
# Enable logging to detect any problems with it
|
||||
##
|
||||
__main__.init_logging(level=logging.NOTSET)
|
||||
hp = HTMLParser()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
@ -400,13 +402,13 @@ def test_cache_control_set(root):
|
||||
def test_upload_noAction(root, testapp):
|
||||
resp = testapp.post("/", expect_errors=1)
|
||||
assert resp.status == '400 Bad Request'
|
||||
assert "Missing ':action' field!" in hp.unescape(resp.text)
|
||||
assert "Missing ':action' field!" in unescape(resp.text)
|
||||
|
||||
|
||||
def test_upload_badAction(root, testapp):
|
||||
resp = testapp.post("/", params={':action': 'BAD'}, expect_errors=1)
|
||||
assert resp.status == '400 Bad Request'
|
||||
assert "Unsupported ':action' field: BAD" in hp.unescape(resp.text)
|
||||
assert "Unsupported ':action' field: BAD" in unescape(resp.text)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("package", [f[0]
|
||||
@ -463,7 +465,7 @@ def test_remove_pkg_missingNaveVersion(name, version, root, testapp):
|
||||
resp = testapp.post("/", expect_errors=1, params=params)
|
||||
|
||||
assert resp.status == '400 Bad Request'
|
||||
assert msg % (name, version) in hp.unescape(resp.text)
|
||||
assert msg % (name, version) in unescape(resp.text)
|
||||
|
||||
|
||||
def test_remove_pkg_notFound(root, testapp):
|
||||
@ -474,7 +476,7 @@ def test_remove_pkg_notFound(root, testapp):
|
||||
'version': '123',
|
||||
})
|
||||
assert resp.status == '404 Not Found'
|
||||
assert "foo (123) not found" in hp.unescape(resp.text)
|
||||
assert "foo (123) not found" in unescape(resp.text)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('pkgs,matches', [
|
||||
|
@ -82,10 +82,12 @@ def test_listdir_bad_name(tmpdir):
|
||||
res = list(core.listdir(tmpdir.strpath))
|
||||
assert res == []
|
||||
|
||||
hashes = [
|
||||
('sha256', 'e3b0c44298fc1c149afbf4c8996fb924'), # empty-sha256
|
||||
('md5', 'd41d8cd98f00b204e9800998ecf8427e'), # empty-md5
|
||||
]
|
||||
hashes = (
|
||||
# empty-sha256
|
||||
('sha256', 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'),
|
||||
# empty-md5
|
||||
('md5', 'd41d8cd98f00b204e9800998ecf8427e'),
|
||||
)
|
||||
@pytest.mark.parametrize(("algo", "digest"), hashes)
|
||||
def test_hashfile(tmpdir, algo, digest):
|
||||
f = tmpdir.join("empty")
|
||||
@ -93,6 +95,19 @@ def test_hashfile(tmpdir, algo, digest):
|
||||
assert core.digest_file(f.strpath, algo) == digest
|
||||
|
||||
|
||||
@pytest.mark.parametrize("hash_algo", ("md5", "sha256", "sha512"))
|
||||
def test_fname_and_hash(tmpdir, hash_algo):
|
||||
"""Ensure we are returning the expected hashes for files."""
|
||||
f = tmpdir.join("tmpfile")
|
||||
f.ensure()
|
||||
pkgfile = core.PkgFile(
|
||||
"tmp", "1.0.0", f.strpath, f.dirname, f.basename
|
||||
)
|
||||
assert pkgfile.fname_and_hash(hash_algo) == "{}#{}={}".format(
|
||||
f.basename, hash_algo, str(f.computehash(hashtype=hash_algo))
|
||||
)
|
||||
|
||||
|
||||
def test_redirect_prefix_encodes_newlines():
|
||||
"""Ensure raw newlines are url encoded in the generated redirect."""
|
||||
request = Namespace(
|
||||
|
@ -183,7 +183,8 @@ def _build_url(port, user='', pswd=''):
|
||||
|
||||
def _run_pip(cmd):
|
||||
ncmd = (
|
||||
"pip --disable-pip-version-check --retries 0 --timeout 5 --no-input %s"
|
||||
"pip --no-cache-dir --disable-pip-version-check "
|
||||
"--retries 0 --timeout 5 --no-input %s"
|
||||
) % cmd
|
||||
print('PIP: %s' % ncmd)
|
||||
proc = Popen(split(ncmd))
|
||||
@ -390,9 +391,24 @@ def test_twine_upload_open(empty_packdir, port, package):
|
||||
with pypirc_tmpfile(port, user, pswd) as rcfile:
|
||||
twine_upload([package.strpath], repository='test', conf=rcfile)
|
||||
time.sleep(SLEEP_AFTER_SRV)
|
||||
|
||||
assert len(empty_packdir.listdir()) == 1
|
||||
|
||||
|
||||
@pytest.mark.parametrize("hash_algo", ("md5", "sha256", "sha512"))
|
||||
def test_hash_algos(empty_packdir, port, package, pipdir, hash_algo):
|
||||
"""Test twine upload with no authentication"""
|
||||
user, pswd = 'foo', 'bar'
|
||||
with new_server(
|
||||
empty_packdir, port, other_cli="--hash-algo {}".format(hash_algo)
|
||||
):
|
||||
with pypirc_tmpfile(port, user, pswd) as rcfile:
|
||||
twine_upload([package.strpath], repository='test', conf=rcfile)
|
||||
time.sleep(SLEEP_AFTER_SRV)
|
||||
|
||||
assert _run_pip_install("centodeps", port, pipdir) == 0
|
||||
|
||||
|
||||
def test_twine_upload_authed(empty_packdir, port, package):
|
||||
"""Test authenticated twine upload"""
|
||||
user, pswd = 'a', 'a'
|
||||
|
Loading…
Reference in New Issue
Block a user