add digester func as attribute to pkgfile

This commit is contained in:
Pelle Koster 2020-10-11 22:06:37 +02:00
parent 3e99067ec5
commit c959372a43
5 changed files with 112 additions and 89 deletions

@ -284,23 +284,25 @@ def simple(project):
if project != normalized:
return redirect(f"/simple/{normalized}/", 301)
files = sorted(
packages = sorted(
core.find_project_packages(project),
key=lambda x: (x.parsed_version, x.relfn),
)
if not files:
if not packages:
if config.redirect_to_fallback:
return redirect(f"{config.fallback_url.rstrip('/')}/{project}/")
return HTTPError(404, f"Not Found ({normalized} does not exist)\n\n")
fp = request.custom_fullpath
current_uri = request.custom_fullpath
links = [
(
os.path.basename(f.relfn),
urljoin(fp, f"../../packages/{f.fname_and_hash(config.hash_algo)}"),
os.path.basename(pkg.relfn),
urljoin(current_uri, f"../../packages/{pkg.fname_and_hash}"),
)
for f in files
for pkg in packages
]
tmpl = """\
<html>
<head>
@ -321,14 +323,15 @@ def simple(project):
@auth("list")
def list_packages():
fp = request.custom_fullpath
files = sorted(
packages = sorted(
core.get_all_packages(),
key=lambda x: (os.path.dirname(x.relfn), x.pkgname, x.parsed_version),
)
links = [
(f.relfn_unix, urljoin(fp, f.fname_and_hash(config.hash_algo)))
for f in files
(pkg.relfn_unix, urljoin(fp, pkg.fname_and_hash)) for pkg in packages
]
tmpl = """\
<html>
<head>

@ -15,18 +15,18 @@ PathLike = t.Union[str, bytes, Path, PurePath]
class PkgFile:
__slots__ = [
"fn",
"root",
"_fname_and_hash",
"relfn",
"relfn_unix",
"pkgname_norm",
"pkgname",
"version",
"parsed_version",
"replaces",
"pkgname", # The projects/package name with possible capitailization
"version", # The packag version as a string
"fn", # The full file path
"root", # An optional root directory of the file
"relfn", # The file path relative to the root
"replaces", # The previous version of the package (used by manage.py)
"pkgname_norm", # The PEP503 normalized project name
"digest", # Thee file digest in the form of <algo>=<hash>
"relfn_unix", # Thee relative file path in unix notation
"parsed_version", # The package version as a tuple of parts
"digester", # a function that calculates the digest for the package
]
def __init__(
@ -41,6 +41,7 @@ class PkgFile:
self.relfn = relfn
self.relfn_unix = None if relfn is None else relfn.replace("\\", "/")
self.replaces = replaces
self.digest = None
def __repr__(self):
return "{}({})".format(
@ -53,19 +54,18 @@ class PkgFile:
),
)
def fname_and_hash(self, hash_algo):
if not hasattr(self, "_fname_and_hash"):
if hash_algo:
self._fname_and_hash = (
f"{self.relfn_unix}#{hash_algo}="
f"{digest_file(self.fn, hash_algo)}"
)
else:
self._fname_and_hash = self.relfn_unix
return self._fname_and_hash
@property
def fname_and_hash(self):
if self.digest is None:
self.digester(self)
hashpart = f"#{self.digest}" if self.digest else ""
return self.relfn_unix + hashpart
class Backend:
def __init__(self, hash_algo: t.Optional[str] = None):
self.hash_algo = hash_algo
def get_all_packages(self) -> t.Iterable[PkgFile]:
"""Implement this method to return an Iterable of all packages (as
PkgFile objects) that are available in the Backend.
@ -86,9 +86,12 @@ class Backend:
"""Remove a package from the Backend"""
raise NotImplementedError
def digest(self, pkg: PkgFile, hash_algo):
"""Calculate a package's digest"""
raise NotImplementedError
def digest(self, pkg: PkgFile):
if self.hash_algo is None:
return None
digest = _digest_file(pkg.fn, self.hash_algo)
pkg.digest = digest
return digest
def exists(self, filename) -> bool:
"""Does a package by the given name exist?"""
@ -131,20 +134,24 @@ class Backend:
def as_file(fh: t.BinaryIO, destination: PathLike):
# taken from bottle.FileUpload
chunk_size = 2 ** 16 # 64 KB
read, offset = fh.read, fh.tell()
with open(destination, "wb") as dest:
while True:
buf = read(chunk_size)
if not buf:
break
dest.write(buf)
fh.seek(offset)
"""write a byte stream into a destination file. Writes are chunked to reduce
the memory footprint
"""
chunk_size = 2 ** 20 # 1 MB
offset = fh.tell()
try:
with open(destination, "wb") as dest:
for chunk in iter(lambda: fh.read(chunk_size), b""):
dest.write(chunk)
finally:
fh.seek(offset)
class SimpleFileBackend(Backend):
def __init__(self, roots: t.List[PathLike] = None):
def __init__(
self, roots: t.List[PathLike], hash_algo: t.Optional[str] = None
):
super().__init__(hash_algo)
self.roots = [Path(root).resolve() for root in roots]
def add_package(self, filename: str, fh: t.BinaryIO):
@ -183,11 +190,11 @@ def _listdir(root: PathLike) -> t.Iterable[PkgFile]:
)
def _digest_file(fpath, hash_algo):
def _digest_file(fpath, hash_algo: str):
"""
Reads and digests a file according to specified hashing-algorith.
:param str sha256: any algo contained in :mod:`hashlib`
:param hash_algo: any algo contained in :mod:`hashlib`
:return: <hash_algo>=<hex_digest>
From http://stackoverflow.com/a/21565932/548792
@ -197,7 +204,7 @@ def _digest_file(fpath, hash_algo):
with open(fpath, "rb") as f:
for block in iter(lambda: f.read(blocksize), b""):
digester.update(block)
return digester.hexdigest()
return f"{hash_algo}={digester.hexdigest()}"
try:

@ -42,7 +42,7 @@ def configure(**kwds):
err = sys.exc_info()[1]
sys.exit(f"Error: while trying to list root({r}): {err}")
backend = SimpleFileBackend(roots)
backend = SimpleFileBackend(roots, c.hash_algo)
if not c.authenticated:
c.authenticated = []
@ -112,23 +112,36 @@ def get_bad_url_redirect_path(request, project):
return uri
def get_all_packages():
def with_digester(func: t.Callable[..., t.Iterable[PkgFile]]):
@functools.wraps(func)
def add_digester_method(*args, **kwargs):
packages = func(*args, **kwargs)
for package in packages:
package.digester = backend.digest
yield package
return add_digester_method
@with_digester
def get_all_packages() -> t.Iterable[PkgFile]:
return backend.get_all_packages()
def find_project_packages(project):
@with_digester
def find_project_packages(project) -> t.Iterable[PkgFile]:
return backend.find_project_packages(project)
def find_version(name: str, version: str):
def find_version(name: str, version: str) -> t.Iterable[PkgFile]:
return backend.find_version(name, version)
def get_projects():
def get_projects() -> t.Iterable[str]:
return backend.get_projects()
def exists(filename: str):
def exists(filename: str) -> bool:
assert "/" not in filename
return backend.exists(filename)
@ -140,3 +153,7 @@ def add_package(filename, fh: t.BinaryIO):
def remove_package(pkg: PkgFile):
return backend.remove_package(pkg)
def digest(pkg: PkgFile) -> str:
return backend.digest(pkg)

@ -1,16 +1,16 @@
#! /usr/bin/env py.test
# -*- coding: utf-8 -*-
import logging
import os
import pytest
from pypiserver import __main__, core, backend, manage
from pypiserver.pkg_helpers import guess_pkgname_and_version, normalize_pkgname_for_url
from pypiserver.pkg_helpers import (
guess_pkgname_and_version,
normalize_pkgname_for_url,
)
from tests.doubles import Namespace
## Enable logging to detect any problems with it
##
__main__.init_logging()
@ -105,13 +105,13 @@ def test_guess_pkgname_and_version_asc(filename, pkgname, version):
assert guess_pkgname_and_version(filename) == exp
def test_listdir_bad_name(tmpdir):
tmpdir.join("foo.whl").ensure()
res = list(backend.listdir(tmpdir.strpath))
def test_listdir_bad_name(tmp_path):
tmp_path.joinpath("foo.whl").touch()
res = list(backend.listdir(str(tmp_path)))
assert res == []
def test_read_lines(tmpdir):
def test_read_lines(tmp_path):
filename = "pkg_blacklist"
file_contents = (
"# Names of private packages that we don't want to upgrade\n"
@ -121,10 +121,11 @@ def test_read_lines(tmpdir):
" my_other_private_pkg"
)
f = tmpdir.join(filename).ensure()
f.write(file_contents)
f = tmp_path.joinpath(filename)
f.touch()
f.write_text(file_contents)
assert manage.read_lines(f.strpath) == [
assert manage.read_lines(str(f)) == [
"my_private_pkg",
"my_other_private_pkg",
]
@ -142,21 +143,27 @@ hashes = (
@pytest.mark.parametrize(("algo", "digest"), hashes)
def test_hashfile(tmpdir, algo, digest):
f = tmpdir.join("empty")
f.ensure()
assert backend.digest_file(f.strpath, algo) == digest
def test_hashfile(tmp_path, algo, digest):
f = tmp_path.joinpath("empty")
f.touch()
assert backend.digest_file(str(f), algo) == f"{algo}={digest}"
@pytest.mark.parametrize("hash_algo", ("md5", "sha256", "sha512"))
def test_fname_and_hash(tmpdir, hash_algo):
def test_fname_and_hash(tmp_path, hash_algo):
"""Ensure we are returning the expected hashes for files."""
f = tmpdir.join("tmpfile")
f.ensure()
pkgfile = backend.PkgFile("tmp", "1.0.0", f.strpath, f.dirname, f.basename)
assert pkgfile.fname_and_hash(hash_algo) == "{}#{}={}".format(
f.basename, hash_algo, str(f.computehash(hashtype=hash_algo))
)
def digester(pkg):
digest = backend._digest_file(pkg.fn, hash_algo)
pkg.digest = digest
return digest
f = tmp_path.joinpath("tmpfile")
f.touch()
pkgfile = backend.PkgFile("tmp", "1.0.0", str(f), f.parent, f.name)
pkgfile.digester = digester
assert pkgfile.fname_and_hash == f"{f.name}#{digester(pkgfile)}"
def test_redirect_project_encodes_newlines():
@ -169,6 +176,4 @@ def test_redirect_project_encodes_newlines():
def test_normalize_pkgname_for_url_encodes_newlines():
"""Ensure newlines are url encoded in package names for urls."""
assert "\n" not in normalize_pkgname_for_url(
"/\nSet-Cookie:malicious=1;"
)
assert "\n" not in normalize_pkgname_for_url("/\nSet-Cookie:malicious=1;")

@ -1,12 +1,11 @@
#!/usr/bin/env py.test
"""Tests for manage.py."""
from pathlib import Path
import pypiserver.manage
from unittest.mock import Mock
import py
import pytest
from pypiserver import manage
@ -23,20 +22,12 @@ from pypiserver.manage import (
)
def touch_files(root, files):
root = py.path.local(root) # pylint: disable=no-member
for f in files:
root.join(f).ensure()
def pkgfile_from_path(fn):
pkgname, version = guess_pkgname_and_version(fn)
return PkgFile(
pkgname=pkgname,
version=version,
root=py.path.local(fn)
.parts()[1]
.strpath, # noqa pylint: disable=no-member
root=str(Path(fn).parent),
fn=fn,
)