Refactor storage operations into separate Backend classes (#348)

Following the discussion in #253 and #325 I've created a first iteration on what a `Backend` interface could look like and how the current file storage operations may be refactored into this interface. It goes from the following principles

* `app.py` talks only to `core.py` with regards to package operations
* at configuration time, a `Backend` implementation is chosen and created for the lifetime of the configured app
* `core.py` proxies requests for packages to this `Backend()`
* The `Backend` interface/api is defined through three things
  * methods that an implementation must implement
  * methods that an implementation may override if it knows better than the defaults
  * the `PkgFIle` class that is (should be) the main carrier of data
* where possible, implementation details must be hidden from concrete `Backend`s to promote extensibility

Other things I've done in this PR:
* I've tried to talk about packages and projects, rather than files and prefixes, since these are the domain terms PEP503 uses, and imho it's also more clear what it means
* Better testability of the `CacheManager` (no more race conditions when `watchdog` is installed during testing)
* Cleanup some more Python 2 code
* Started moving away from  `os.path` and `py.path` in favour of `pathlib`

Furthermore I've created a `plugin.py` with a sample of how I think plugin system could look like. This sampIe assumes we use `argparse`  and allows for the extension of cli arguments that a plugin may need. I think the actual implementation of such a plugin system is beyond the scope of this PR, but I've used it as a target for the Backend refactoring. If requested, I'll remove it from this PR.

The following things still need to be done / discussed. These can be part of this PR or moved into their own, separate PRs
- [ ] Simplify the `PgkFile` class. It currently consists of a number of attributes that don't necessarily belong with it, and not all attributes are aptly named (imho). I would like to minimalize the scope of `PkgFile` so that its only concern is being a data carrier between the app and the backends, and make its use more clear.
- [ ] Add a `PkgFile.metadata` that backend implementations may use to store custom data for packages. For example the current `PkgFile.root` attribute is an implementation detail of the filestorage backends, and other Backend implementations should not be bothered by it.
- [ ] Use `pathlib` wherever possible. This may also result in less attributes for `PkgFile`, since some things may be just contained in a single `Path` object, instead of multtiple strings.
- [ ] Improve testing of the `CacheManager`.

----
* move some functions around in preparation for backend module

* rename pkg_utils to pkg_helpers to prevent confusion with stdlib pkgutil

* further implement the current filestorage as simple file backend

* rename prefix to project, since that's more descriptive

* add digester func as attribute to pkgfile

* WIP caching backend

* WIP make cache better testable

* better testability of cache

* WIP file backends as plugin

* fix typos, run black

* Apply suggestions from code review

Co-authored-by: Matthew Planchard <mplanchard@users.noreply.github.com>

* add more type hints to pass mypy, fix tox.ini

* add package count method to backend

* add package count method to backend

* minor changes

* bugfix when checking invalid whl file

* check for existing package recursively, bugfix, some more pathlib

* fix unittest

* rm dead code

* exclude bottle.py from coverage

* fix merge mistakes

* fix tab indentation

* backend as a cli argument

* fix cli, add tests

* fix mypy

* fix more silly mistakes

* process feedback

* remove dead code

Co-authored-by: Matthew Planchard <mplanchard@users.noreply.github.com>
This commit is contained in:
PelleK 2021-02-02 18:44:29 +01:00 committed by GitHub
parent 7688e1b2bd
commit cf424c982d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 957 additions and 519 deletions

2
.coveragerc Normal file
View File

@ -0,0 +1,2 @@
[run]
omit = pypiserver/bottle.py

1
.gitignore vendored
View File

@ -29,6 +29,7 @@ __pycache__/
**/*.egg-info/
/.standalone
/.coverage*
!/.coveragerc
/htmlcov/
/.installed.cfg
/develop-eggs/

View File

@ -1,10 +1,13 @@
from collections import namedtuple
import logging
import mimetypes
import os
import re
import zipfile
import xml.dom.minidom
import xmlrpc.client as xmlrpclib
import zipfile
from collections import namedtuple
from io import BytesIO
from urllib.parse import urljoin, urlparse
from pypiserver.config import RunConfig
from . import __version__
@ -18,26 +21,10 @@ from .bottle import (
Bottle,
template,
)
try:
import xmlrpc.client as xmlrpclib # py3
except ImportError:
import xmlrpclib # py2
try:
from io import BytesIO
except ImportError:
from StringIO import StringIO as BytesIO
try: # PY3
from urllib.parse import urljoin, urlparse
except ImportError: # PY2
from urlparse import urljoin, urlparse
from .pkg_helpers import guess_pkgname_and_version, normalize_pkgname_for_url
log = logging.getLogger(__name__)
config: RunConfig
app = Bottle()
@ -103,19 +90,13 @@ def favicon():
def root():
fp = request.custom_fullpath
try:
numpkgs = len(list(config.iter_packages()))
except Exception as exc:
log.error(f"Could not list packages: {exc}")
numpkgs = 0
# Ensure template() does not consider `msg` as filename!
msg = config.welcome_msg + "\n"
return template(
msg,
URL=request.url.rstrip("/") + "/",
VERSION=__version__,
NUMPKGS=numpkgs,
NUMPKGS=config.backend.package_count(),
PACKAGES=fp.rstrip("/") + "/packages/",
SIMPLE=fp.rstrip("/") + "/simple/",
)
@ -148,16 +129,12 @@ def remove_pkg():
if not name or not version:
msg = f"Missing 'name'/'version' fields: name={name}, version={version}"
raise HTTPError(400, msg)
pkgs = list(
filter(
lambda pkg: pkg.pkgname == name and pkg.version == version,
core.find_packages(config.iter_packages()),
)
)
if len(pkgs) == 0:
pkgs = list(config.backend.find_version(name, version))
if not pkgs:
raise HTTPError(404, f"{name} ({version}) not found")
for pkg in pkgs:
os.unlink(pkg.fn)
config.backend.remove_package(pkg)
Upload = namedtuple("Upload", "pkg sig")
@ -183,13 +160,11 @@ def file_upload():
continue
if (
not is_valid_pkg_filename(uf.raw_filename)
or core.guess_pkgname_and_version(uf.raw_filename) is None
or guess_pkgname_and_version(uf.raw_filename) is None
):
raise HTTPError(400, f"Bad filename: {uf.raw_filename}")
if not config.overwrite and core.exists(
config.package_root, uf.raw_filename
):
if not config.overwrite and config.backend.exists(uf.raw_filename):
log.warning(
f"Cannot upload {uf.raw_filename!r} since it already exists! \n"
" You may start server with `--overwrite` option. "
@ -200,7 +175,7 @@ def file_upload():
" You may start server with `--overwrite` option.",
)
core.store(config.package_root, uf.raw_filename, uf.save)
config.backend.add_package(uf.raw_filename, uf.file)
if request.auth:
user = request.auth[0]
else:
@ -231,10 +206,10 @@ def update():
@app.route("/simple")
@app.route("/simple/:prefix")
@app.route("/simple/:project")
@app.route("/packages")
@auth("list")
def pep_503_redirects(prefix=None):
def pep_503_redirects(project=None):
return redirect(request.custom_fullpath + "/", 301)
@ -257,7 +232,7 @@ def handle_rpc():
)
response = []
ordering = 0
for p in config.iter_packages():
for p in config.backend.get_all_packages():
if p.pkgname.count(value) > 0:
# We do not presently have any description/summary, returning
# version instead
@ -278,7 +253,7 @@ def handle_rpc():
@app.route("/simple/")
@auth("list")
def simpleindex():
links = sorted(core.get_prefixes(config.iter_packages()))
links = sorted(config.backend.get_projects())
tmpl = """\
<html>
<head>
@ -295,59 +270,62 @@ def simpleindex():
return template(tmpl, links=links)
@app.route("/simple/:prefix/")
@app.route("/simple/:project/")
@auth("list")
def simple(prefix=""):
# PEP 503: require normalized prefix
normalized = core.normalize_pkgname_for_url(prefix)
if prefix != normalized:
return redirect("/simple/{0}/".format(normalized), 301)
def simple(project):
# PEP 503: require normalized project
normalized = normalize_pkgname_for_url(project)
if project != normalized:
return redirect(f"/simple/{normalized}/", 301)
files = sorted(
core.find_packages(config.iter_packages(), prefix=prefix),
packages = sorted(
config.backend.find_project_packages(project),
key=lambda x: (x.parsed_version, x.relfn),
)
if not files:
if not packages:
if not config.disable_fallback:
return redirect(f"{config.fallback_url.rstrip('/')}/{prefix}/")
return redirect(f"{config.fallback_url.rstrip('/')}/{project}/")
return HTTPError(404, f"Not Found ({normalized} does not exist)\n\n")
fp = request.custom_fullpath
links = [
current_uri = request.custom_fullpath
links = (
(
os.path.basename(f.relfn),
urljoin(fp, f"../../packages/{f.fname_and_hash(config.hash_algo)}"),
os.path.basename(pkg.relfn),
urljoin(current_uri, f"../../packages/{pkg.fname_and_hash}"),
)
for f in files
]
for pkg in packages
)
tmpl = """\
<html>
<head>
<title>Links for {{prefix}}</title>
<title>Links for {{project}}</title>
</head>
<body>
<h1>Links for {{prefix}}</h1>
<h1>Links for {{project}}</h1>
% for file, href in links:
<a href="{{href}}">{{file}}</a><br>
% end
</body>
</html>
"""
return template(tmpl, prefix=prefix, links=links)
return template(tmpl, project=project, links=links)
@app.route("/packages/")
@auth("list")
def list_packages():
fp = request.custom_fullpath
files = sorted(
core.find_packages(config.iter_packages()),
packages = sorted(
config.backend.get_all_packages(),
key=lambda x: (os.path.dirname(x.relfn), x.pkgname, x.parsed_version),
)
links = [
(f.relfn_unix, urljoin(fp, f.fname_and_hash(config.hash_algo)))
for f in files
]
links = (
(pkg.relfn_unix, urljoin(fp, pkg.fname_and_hash)) for pkg in packages
)
tmpl = """\
<html>
<head>
@ -367,7 +345,7 @@ def list_packages():
@app.route("/packages/:filename#.*#")
@auth("download")
def server_static(filename):
entries = core.find_packages(config.iter_packages())
entries = config.backend.get_all_packages()
for x in entries:
f = x.relfn_unix
if f == filename:
@ -385,8 +363,8 @@ def server_static(filename):
return HTTPError(404, f"Not Found ({filename} does not exist)\n\n")
@app.route("/:prefix")
@app.route("/:prefix/")
def bad_url(prefix):
@app.route("/:project")
@app.route("/:project/")
def bad_url(project):
"""Redirect unknown root URLs to /simple/."""
return redirect(core.get_bad_url_redirect_path(request, prefix))
return redirect(core.get_bad_url_redirect_path(request, project))

305
pypiserver/backend.py Normal file
View File

@ -0,0 +1,305 @@
import abc
import functools
import hashlib
import itertools
import os
import typing as t
from pathlib import Path
from .cache import CacheManager, ENABLE_CACHING
from .core import PkgFile
from .pkg_helpers import (
normalize_pkgname,
is_listed_path,
guess_pkgname_and_version,
)
if t.TYPE_CHECKING:
from .config import _ConfigCommon as Configuration
PathLike = t.Union[str, os.PathLike]
class IBackend(abc.ABC):
@abc.abstractmethod
def get_all_packages(self) -> t.Iterable[PkgFile]:
pass
@abc.abstractmethod
def find_project_packages(self, project: str) -> t.Iterable[PkgFile]:
pass
@abc.abstractmethod
def find_version(self, name: str, version: str) -> t.Iterable[PkgFile]:
pass
@abc.abstractmethod
def get_projects(self) -> t.Iterable[str]:
pass
@abc.abstractmethod
def exists(self, filename: str) -> bool:
pass
@abc.abstractmethod
def digest(self, pkg: PkgFile) -> t.Optional[str]:
pass
@abc.abstractmethod
def package_count(self) -> int:
pass
@abc.abstractmethod
def add_package(self, filename: str, stream: t.BinaryIO) -> None:
pass
@abc.abstractmethod
def remove_package(self, pkg: PkgFile) -> None:
pass
class Backend(IBackend, abc.ABC):
def __init__(self, config: "Configuration"):
self.hash_algo = config.hash_algo
@abc.abstractmethod
def get_all_packages(self) -> t.Iterable[PkgFile]:
"""Implement this method to return an Iterable of all packages (as
PkgFile objects) that are available in the Backend.
"""
pass
@abc.abstractmethod
def add_package(self, filename: str, stream: t.BinaryIO) -> None:
"""Add a package to the Backend. `filename` is the package's filename
(without any directory parts). It is just a name, there is no file by
that name (yet). `stream` is an open file-like object that can be used
to read the file's content. To convert the package into an actual file
on disk, run `write_file(filename, stream)`.
"""
pass
@abc.abstractmethod
def remove_package(self, pkg: PkgFile) -> None:
"""Remove a package from the Backend"""
pass
@abc.abstractmethod
def exists(self, filename: str) -> bool:
"""Does a package by the given name exist?"""
pass
def digest(self, pkg: PkgFile) -> t.Optional[str]:
if self.hash_algo is None or pkg.fn is None:
return None
return digest_file(pkg.fn, self.hash_algo)
def package_count(self) -> int:
"""Return a count of all available packages. When implementing a Backend
class, either use this method as is, or override it with a more
performant version.
"""
return sum(1 for _ in self.get_all_packages())
def get_projects(self) -> t.Iterable[str]:
"""Return an iterable of all (unique) projects available in the store
in their PEP503 normalized form. When implementing a Backend class,
either use this method as is, or override it with a more performant
version.
"""
return set(package.pkgname_norm for package in self.get_all_packages())
def find_project_packages(self, project: str) -> t.Iterable[PkgFile]:
"""Find all packages from a given project. The project may be given
as either the normalized or canonical name. When implementing a
Backend class, either use this method as is, or override it with a
more performant version.
"""
return (
x
for x in self.get_all_packages()
if normalize_pkgname(project) == x.pkgname_norm
)
def find_version(self, name: str, version: str) -> t.Iterable[PkgFile]:
"""Return all packages that match PkgFile.pkgname == name and
PkgFile.version == version` When implementing a Backend class,
either use this method as is, or override it with a more performant
version.
"""
return filter(
lambda pkg: pkg.pkgname == name and pkg.version == version,
self.get_all_packages(),
)
class SimpleFileBackend(Backend):
def __init__(self, config: "Configuration"):
super().__init__(config)
self.roots = [Path(root).resolve() for root in config.roots]
def get_all_packages(self) -> t.Iterable[PkgFile]:
return itertools.chain.from_iterable(listdir(r) for r in self.roots)
def add_package(self, filename: str, stream: t.BinaryIO) -> None:
write_file(stream, self.roots[0].joinpath(filename))
def remove_package(self, pkg: PkgFile) -> None:
if pkg.fn is not None:
os.remove(pkg.fn)
def exists(self, filename: str) -> bool:
return any(
filename == existing_file.name
for root in self.roots
for existing_file in all_listed_files(root)
)
class CachingFileBackend(SimpleFileBackend):
def __init__(
self,
config: "Configuration",
cache_manager: t.Optional[CacheManager] = None,
):
super().__init__(config)
self.cache_manager = cache_manager or CacheManager() # type: ignore
def get_all_packages(self) -> t.Iterable[PkgFile]:
return itertools.chain.from_iterable(
self.cache_manager.listdir(r, listdir) for r in self.roots
)
def digest(self, pkg: PkgFile) -> t.Optional[str]:
if self.hash_algo is None or pkg.fn is None:
return None
return self.cache_manager.digest_file(
pkg.fn, self.hash_algo, digest_file
)
def write_file(fh: t.BinaryIO, destination: PathLike) -> None:
"""write a byte stream into a destination file. Writes are chunked to reduce
the memory footprint
"""
chunk_size = 2 ** 20 # 1 MB
offset = fh.tell()
try:
with open(destination, "wb") as dest:
for chunk in iter(lambda: fh.read(chunk_size), b""):
dest.write(chunk)
finally:
fh.seek(offset)
def listdir(root: Path) -> t.Iterator[PkgFile]:
root = root.resolve()
files = all_listed_files(root)
yield from valid_packages(root, files)
def all_listed_files(root: Path) -> t.Iterator[Path]:
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = (
dirname for dirname in dirnames if is_listed_path(Path(dirname))
)
for filename in filenames:
if not is_listed_path(Path(filename)):
continue
filepath = root / dirpath / filename
if Path(filepath).is_file():
yield filepath
def valid_packages(root: Path, files: t.Iterable[Path]) -> t.Iterator[PkgFile]:
for file in files:
res = guess_pkgname_and_version(str(file.name))
if res is not None:
pkgname, version = res
fn = str(file)
root_name = str(root)
yield PkgFile(
pkgname=pkgname,
version=version,
fn=fn,
root=root_name,
relfn=fn[len(root_name) + 1 :],
)
def digest_file(file_path: PathLike, hash_algo: str) -> str:
"""
Reads and digests a file according to specified hashing-algorith.
:param file_path: path to a file on disk
:param hash_algo: any algo contained in :mod:`hashlib`
:return: <hash_algo>=<hex_digest>
From http://stackoverflow.com/a/21565932/548792
"""
blocksize = 2 ** 16
digester = hashlib.new(hash_algo)
with open(file_path, "rb") as f:
for block in iter(lambda: f.read(blocksize), b""):
digester.update(block)
return f"{hash_algo}={digester.hexdigest()}"
def get_file_backend(config: "Configuration") -> Backend:
if ENABLE_CACHING:
return CachingFileBackend(config)
return SimpleFileBackend(config)
PkgFunc = t.TypeVar("PkgFunc", bound=t.Callable[..., t.Iterable[PkgFile]])
def with_digester(func: PkgFunc) -> PkgFunc:
@functools.wraps(func)
def add_digester_method(
self: "BackendProxy", *args: t.Any, **kwargs: t.Any
) -> t.Iterable[PkgFile]:
packages = func(self, *args, **kwargs)
for package in packages:
package.digester = self.backend.digest
yield package
return t.cast(PkgFunc, add_digester_method)
class BackendProxy(IBackend):
def __init__(self, wraps: Backend):
self.backend = wraps
@with_digester
def get_all_packages(self) -> t.Iterable[PkgFile]:
return self.backend.get_all_packages()
@with_digester
def find_project_packages(self, project: str) -> t.Iterable[PkgFile]:
return self.backend.find_project_packages(project)
def find_version(self, name: str, version: str) -> t.Iterable[PkgFile]:
return self.backend.find_version(name, version)
def get_projects(self) -> t.Iterable[str]:
return self.backend.get_projects()
def exists(self, filename: str) -> bool:
assert "/" not in filename
return self.backend.exists(filename)
def package_count(self) -> int:
return self.backend.package_count()
def add_package(self, filename: str, fh: t.BinaryIO) -> None:
assert "/" not in filename
return self.backend.add_package(filename, fh)
def remove_package(self, pkg: PkgFile) -> None:
return self.backend.remove_package(pkg)
def digest(self, pkg: PkgFile) -> t.Optional[str]:
return self.backend.digest(pkg)

View File

@ -4,10 +4,24 @@
#
from os.path import dirname
from watchdog.observers import Observer
from pathlib import Path
import typing as t
import threading
try:
from watchdog.observers import Observer
ENABLE_CACHING = True
except ImportError:
Observer = None
ENABLE_CACHING = False
if t.TYPE_CHECKING:
from pypiserver.core import PkgFile
class CacheManager:
"""
@ -26,6 +40,11 @@ class CacheManager:
"""
def __init__(self):
if not ENABLE_CACHING:
raise RuntimeError(
"Please install the extra cache requirements by running 'pip "
"install pypiserver[cache]' to use the CachingFileBackend"
)
# Cache for listdir output
self.listdir_cache = {}
@ -46,7 +65,12 @@ class CacheManager:
self.digest_lock = threading.Lock()
self.listdir_lock = threading.Lock()
def listdir(self, root, impl_fn):
def listdir(
self,
root: t.Union[Path, str],
impl_fn: t.Callable[[Path], t.Iterable["PkgFile"]],
) -> t.Iterable["PkgFile"]:
root = str(root)
with self.listdir_lock:
try:
return self.listdir_cache[root]
@ -56,11 +80,13 @@ class CacheManager:
if root not in self.watched:
self._watch(root)
v = list(impl_fn(root))
v = list(impl_fn(Path(root)))
self.listdir_cache[root] = v
return v
def digest_file(self, fpath, hash_algo, impl_fn):
def digest_file(
self, fpath: str, hash_algo: str, impl_fn: t.Callable[[str, str], str]
) -> str:
with self.digest_lock:
try:
cache = self.digest_cache[hash_algo]
@ -82,13 +108,17 @@ class CacheManager:
cache[fpath] = v
return v
def _watch(self, root):
def _watch(self, root: str):
self.watched.add(root)
self.observer.schedule(_EventHandler(self, root), root, recursive=True)
def invalidate_root_cache(self, root: t.Union[Path, str]):
with self.listdir_lock:
self.listdir_cache.pop(str(root), None)
class _EventHandler:
def __init__(self, cache, root):
def __init__(self, cache: CacheManager, root: str):
self.cache = cache
self.root = root
@ -101,8 +131,7 @@ class _EventHandler:
return
# Lazy: just invalidate the whole cache
with cache.listdir_lock:
cache.listdir_cache.pop(self.root, None)
cache.invalidate_root_cache(self.root)
# Digests are more expensive: invalidate specific paths
paths = []
@ -117,6 +146,3 @@ class _EventHandler:
for _, subcache in cache.digest_cache.items():
for path in paths:
subcache.pop(path, None)
cache_manager = CacheManager()

View File

@ -37,25 +37,31 @@ import argparse
import contextlib
import hashlib
import io
import itertools
import logging
import pathlib
import pkg_resources
import re
import sys
import textwrap
import typing as t
from distutils.util import strtobool as strtoint
# The `passlib` requirement is optional, so we need to verify its import here.
import pkg_resources
from pypiserver.backend import (
SimpleFileBackend,
CachingFileBackend,
Backend,
IBackend,
get_file_backend,
BackendProxy,
)
# The `passlib` requirement is optional, so we need to verify its import here.
try:
from passlib.apache import HtpasswdFile
except ImportError:
HtpasswdFile = None
from pypiserver import core
# The "strtobool" function in distutils does a nice job at parsing strings,
# but returns an integer. This just wraps it in a boolean call so that we
@ -80,6 +86,7 @@ class DEFAULTS:
PACKAGE_DIRECTORIES = [pathlib.Path("~/packages").expanduser().resolve()]
PORT = 8080
SERVER_METHOD = "auto"
BACKEND = "auto"
def auth_arg(arg: str) -> t.List[str]:
@ -236,6 +243,28 @@ def add_common_args(parser: argparse.ArgumentParser) -> None:
"standard python library)"
),
)
parser.add_argument(
"--hash-algo",
default=DEFAULTS.HASH_ALGO,
type=hash_algo_arg,
help=(
"Any `hashlib` available algorithm to use for generating fragments "
"on package links. Can be disabled with one of (0, no, off, false)."
),
)
parser.add_argument(
"--backend",
default=DEFAULTS.BACKEND,
choices=("auto", "simple-dir", "cached-dir"),
dest="backend_arg",
help=(
"A backend implementation. Keep the default 'auto' to automatically"
" determine whether to activate caching or not"
),
)
parser.add_argument(
"--version",
action="version",
@ -254,7 +283,6 @@ def get_parser() -> argparse.ArgumentParser:
"directories starting with a dot. Multiple package directories "
"may be specified."
),
# formatter_class=argparse.RawTextHelpFormatter,
formatter_class=PreserveWhitespaceRawTextHelpFormatter,
epilog=(
"Visit https://github.com/pypiserver/pypiserver "
@ -381,15 +409,6 @@ def get_parser() -> argparse.ArgumentParser:
action="store_true",
help="Allow overwriting existing package files during upload.",
)
run_parser.add_argument(
"--hash-algo",
default=DEFAULTS.HASH_ALGO,
type=hash_algo_arg,
help=(
"Any `hashlib` available algorithm to use for generating fragments "
"on package links. Can be disabled with one of (0, no, off, false)."
),
)
run_parser.add_argument(
"--welcome",
metavar="HTML_FILE",
@ -504,9 +523,12 @@ def get_parser() -> argparse.ArgumentParser:
TConf = t.TypeVar("TConf", bound="_ConfigCommon")
BackendFactory = t.Callable[["_ConfigCommon"], Backend]
class _ConfigCommon:
hash_algo: t.Optional[str] = None
def __init__(
self,
roots: t.List[pathlib.Path],
@ -514,6 +536,8 @@ class _ConfigCommon:
log_frmt: str,
log_file: t.Optional[str],
log_stream: t.Optional[t.IO],
hash_algo: t.Optional[str],
backend_arg: str,
) -> None:
"""Construct a RuntimeConfig."""
# Global arguments
@ -521,18 +545,24 @@ class _ConfigCommon:
self.log_file = log_file
self.log_stream = log_stream
self.log_frmt = log_frmt
self.roots = roots
self.hash_algo = hash_algo
self.backend_arg = backend_arg
# Derived properties are directly based on other properties and are not
# included in equality checks.
self._derived_properties: t.Tuple[str, ...] = (
"iter_packages",
"package_root",
"backend",
)
# The first package directory is considered the root. This is used
# for uploads.
self.package_root = self.roots[0]
self.backend = self.get_backend(backend_arg)
@classmethod
def from_namespace(
cls: t.Type[TConf], namespace: argparse.Namespace
@ -551,6 +581,8 @@ class _ConfigCommon:
log_stream=namespace.log_stream,
log_frmt=namespace.log_frmt,
roots=namespace.package_directory,
hash_algo=namespace.hash_algo,
backend_arg=namespace.backend_arg,
)
@property
@ -565,13 +597,17 @@ class _ConfigCommon:
# If we've specified 3 or more levels of verbosity, just return not set.
return levels.get(self.verbosity, logging.NOTSET)
def iter_packages(self) -> t.Iterator[core.PkgFile]:
"""Iterate over packages in root directories."""
yield from (
itertools.chain.from_iterable(
core.listdir(str(r)) for r in self.roots
)
)
def get_backend(self, arg: str) -> IBackend:
available_backends: t.Dict[str, BackendFactory] = {
"auto": get_file_backend,
"simple-dir": SimpleFileBackend,
"cached-dir": CachingFileBackend,
}
backend = available_backends[arg]
return BackendProxy(backend(self))
def with_updates(self: TConf, **kwargs: t.Any) -> TConf:
"""Create a new config with the specified updates.
@ -624,7 +660,6 @@ class RunConfig(_ConfigCommon):
fallback_url: str,
server_method: str,
overwrite: bool,
hash_algo: t.Optional[str],
welcome_msg: str,
cache_control: t.Optional[int],
log_req_frmt: str,
@ -643,13 +678,11 @@ class RunConfig(_ConfigCommon):
self.fallback_url = fallback_url
self.server_method = server_method
self.overwrite = overwrite
self.hash_algo = hash_algo
self.welcome_msg = welcome_msg
self.cache_control = cache_control
self.log_req_frmt = log_req_frmt
self.log_res_frmt = log_res_frmt
self.log_err_frmt = log_err_frmt
# Derived properties
self._derived_properties = self._derived_properties + ("auther",)
self.auther = self.get_auther(auther)
@ -669,7 +702,6 @@ class RunConfig(_ConfigCommon):
"fallback_url": namespace.fallback_url,
"server_method": namespace.server,
"overwrite": namespace.overwrite,
"hash_algo": namespace.hash_algo,
"welcome_msg": namespace.welcome,
"cache_control": namespace.cache_control,
"log_req_frmt": namespace.log_req_frmt,
@ -752,6 +784,9 @@ class UpdateConfig(_ConfigCommon):
}
Configuration = t.Union[RunConfig, UpdateConfig]
class Config:
"""Config constructor for building a config from args."""
@ -767,9 +802,7 @@ class Config:
return default_config.with_updates(**overrides)
@classmethod
def from_args(
cls, args: t.Sequence[str] = None
) -> t.Union[RunConfig, UpdateConfig]:
def from_args(cls, args: t.Sequence[str] = None) -> Configuration:
"""Construct a Config from the passed args or sys.argv."""
# If pulling args from sys.argv (commandline arguments), argv[0] will
# be the program name, (i.e. pypi-server), so we don't need to

View File

@ -1,154 +1,69 @@
#! /usr/bin/env python
#! /usr/bin/env python3
"""minimal PyPI like server for use with pip/easy_install"""
import hashlib
import logging
import mimetypes
import os
import re
import typing as t
from urllib.parse import quote
log = logging.getLogger(__name__)
from pypiserver.pkg_helpers import normalize_pkgname, parse_version
mimetypes.add_type("application/octet-stream", ".egg")
mimetypes.add_type("application/octet-stream", ".whl")
mimetypes.add_type("text/plain", ".asc")
# ### Next 2 functions adapted from :mod:`distribute.pkg_resources`.
#
component_re = re.compile(r"(\d+ | [a-z]+ | \.| -)", re.I | re.VERBOSE)
replace = {"pre": "c", "preview": "c", "-": "final-", "rc": "c", "dev": "@"}.get
def _parse_version_parts(s):
for part in component_re.split(s):
part = replace(part, part)
if part in ["", "."]:
continue
if part[:1] in "0123456789":
yield part.zfill(8) # pad for numeric comparison
else:
yield "*" + part
yield "*final" # ensure that alpha/beta/candidate are before final
def parse_version(s):
parts = []
for part in _parse_version_parts(s.lower()):
if part.startswith("*"):
# remove trailing zeros from each series of numeric parts
while parts and parts[-1] == "00000000":
parts.pop()
parts.append(part)
return tuple(parts)
#
#### -- End of distribute's code.
_archive_suffix_rx = re.compile(
r"(\.zip|\.tar\.gz|\.tgz|\.tar\.bz2|-py[23]\.\d-.*|"
r"\.win-amd64-py[23]\.\d\..*|\.win32-py[23]\.\d\..*|\.egg)$",
re.I,
)
wheel_file_re = re.compile(
r"""^(?P<namever>(?P<name>.+?)-(?P<ver>\d.*?))
((-(?P<build>\d.*?))?-(?P<pyver>.+?)-(?P<abi>.+?)-(?P<plat>.+?)
\.whl|\.dist-info)$""",
re.VERBOSE,
)
_pkgname_re = re.compile(r"-\d+[a-z_.!+]", re.I)
_pkgname_parts_re = re.compile(
r"[\.\-](?=cp\d|py\d|macosx|linux|sunos|solaris|irix|aix|cygwin|win)", re.I
)
def _guess_pkgname_and_version_wheel(basename):
m = wheel_file_re.match(basename)
if not m:
return None, None
name = m.group("name")
ver = m.group("ver")
build = m.group("build")
if build:
return name, ver + "-" + build
else:
return name, ver
def guess_pkgname_and_version(path):
path = os.path.basename(path)
if path.endswith(".asc"):
path = path.rstrip(".asc")
if path.endswith(".whl"):
return _guess_pkgname_and_version_wheel(path)
if not _archive_suffix_rx.search(path):
return
path = _archive_suffix_rx.sub("", path)
if "-" not in path:
pkgname, version = path, ""
elif path.count("-") == 1:
pkgname, version = path.split("-", 1)
elif "." not in path:
pkgname, version = path.rsplit("-", 1)
else:
pkgname = _pkgname_re.split(path)[0]
ver_spec = path[len(pkgname) + 1 :]
parts = _pkgname_parts_re.split(ver_spec)
version = parts[0]
return pkgname, version
def normalize_pkgname(name):
"""Perform PEP 503 normalization"""
return re.sub(r"[-_.]+", "-", name).lower()
def normalize_pkgname_for_url(name):
"""Perform PEP 503 normalization and ensure the value is safe for URLs."""
return quote(re.sub(r"[-_.]+", "-", name).lower())
def is_allowed_path(path_part):
p = path_part.replace("\\", "/")
return not (p.startswith(".") or "/." in p)
def get_bad_url_redirect_path(request, project):
"""Get the path for a bad root url."""
uri = request.custom_fullpath
if uri.endswith("/"):
uri = uri[:-1]
uri = uri.rsplit("/", 1)[0]
project = quote(project)
uri += f"/simple/{project}/"
return uri
class PkgFile:
__slots__ = [
"fn",
"root",
"_fname_and_hash",
"relfn",
"relfn_unix",
"pkgname_norm",
"pkgname",
"version",
"parsed_version",
"replaces",
"pkgname", # The projects/package name with possible capitalization
"version", # The package version as a string
"fn", # The full file path
"root", # An optional root directory of the file
"relfn", # The file path relative to the root
"replaces", # The previous version of the package (used by manage.py)
"pkgname_norm", # The PEP503 normalized project name
"digest", # The file digest in the form of <algo>=<hash>
"relfn_unix", # The relative file path in unix notation
"parsed_version", # The package version as a tuple of parts
"digester", # a function that calculates the digest for the package
]
digest: t.Optional[str]
digester: t.Optional[t.Callable[["PkgFile"], t.Optional[str]]]
parsed_version: tuple
relfn_unix: t.Optional[str]
def __init__(
self, pkgname, version, fn=None, root=None, relfn=None, replaces=None
self,
pkgname: str,
version: str,
fn: t.Optional[str] = None,
root: t.Optional[str] = None,
relfn: t.Optional[str] = None,
replaces: t.Optional["PkgFile"] = None,
):
self.pkgname = pkgname
self.pkgname_norm = normalize_pkgname(pkgname)
self.version = version
self.parsed_version = parse_version(version)
self.parsed_version: tuple = parse_version(version)
self.fn = fn
self.root = root
self.relfn = relfn
self.relfn_unix = None if relfn is None else relfn.replace("\\", "/")
self.replaces = replaces
self.digest = None
self.digester = None
def __repr__(self):
def __repr__(self) -> str:
return "{}({})".format(
self.__class__.__name__,
", ".join(
@ -159,109 +74,9 @@ class PkgFile:
),
)
def fname_and_hash(self, hash_algo):
if not hasattr(self, "_fname_and_hash"):
if hash_algo:
self._fname_and_hash = (
f"{self.relfn_unix}#{hash_algo}="
f"{digest_file(self.fn, hash_algo)}"
)
else:
self._fname_and_hash = self.relfn_unix
return self._fname_and_hash
def _listdir(root: str) -> t.Iterable[PkgFile]:
root = os.path.abspath(root)
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [x for x in dirnames if is_allowed_path(x)]
for x in filenames:
fn = os.path.join(root, dirpath, x)
if not is_allowed_path(x) or not os.path.isfile(fn):
continue
res = guess_pkgname_and_version(x)
if not res:
# #Seems the current file isn't a proper package
continue
pkgname, version = res
if pkgname:
yield PkgFile(
pkgname=pkgname,
version=version,
fn=fn,
root=root,
relfn=fn[len(root) + 1 :],
)
def find_packages(pkgs, prefix=""):
prefix = normalize_pkgname(prefix)
for x in pkgs:
if prefix and x.pkgname_norm != prefix:
continue
yield x
def get_prefixes(pkgs):
normalized_pkgnames = set()
for x in pkgs:
if x.pkgname:
normalized_pkgnames.add(x.pkgname_norm)
return normalized_pkgnames
def exists(root, filename):
assert "/" not in filename
dest_fn = os.path.join(root, filename)
return os.path.exists(dest_fn)
def store(root, filename, save_method):
assert "/" not in filename
dest_fn = os.path.join(root, filename)
save_method(dest_fn, overwrite=True) # Overwite check earlier.
def get_bad_url_redirect_path(request, prefix):
"""Get the path for a bad root url."""
p = request.custom_fullpath
if p.endswith("/"):
p = p[:-1]
p = p.rsplit("/", 1)[0]
prefix = quote(prefix)
p += "/simple/{}/".format(prefix)
return p
def _digest_file(fpath, hash_algo):
"""
Reads and digests a file according to specified hashing-algorith.
:param str sha256: any algo contained in :mod:`hashlib`
:return: <hash_algo>=<hex_digest>
From http://stackoverflow.com/a/21565932/548792
"""
blocksize = 2 ** 16
digester = hashlib.new(hash_algo)
with open(fpath, "rb") as f:
for block in iter(lambda: f.read(blocksize), b""):
digester.update(block)
return digester.hexdigest()
try:
from .cache import cache_manager
def listdir(root: str) -> t.Iterable[PkgFile]:
# root must be absolute path
return cache_manager.listdir(root, _listdir)
def digest_file(fpath, hash_algo):
# fpath must be absolute path
return cache_manager.digest_file(fpath, hash_algo, _digest_file)
except ImportError:
listdir = _listdir
digest_file = _digest_file
@property
def fname_and_hash(self) -> str:
if self.digest is None and self.digester is not None:
self.digest = self.digester(self)
hashpart = f"#{self.digest}" if self.digest else ""
return self.relfn_unix + hashpart # type: ignore

View File

@ -6,13 +6,15 @@ import itertools
import os
import sys
from distutils.version import LooseVersion
from pathlib import Path
from subprocess import call
from xmlrpc.client import Server
import pip
from . import core
from xmlrpc.client import Server
from .backend import listdir
from .core import PkgFile
from .pkg_helpers import normalize_pkgname, parse_version
def make_pypi_client(url):
@ -41,7 +43,7 @@ def filter_latest_pkgs(pkgs):
pkgname2latest = {}
for x in pkgs:
pkgname = core.normalize_pkgname(x.pkgname)
pkgname = normalize_pkgname(x.pkgname)
if pkgname not in pkgname2latest:
pkgname2latest[pkgname] = x
@ -53,9 +55,9 @@ def filter_latest_pkgs(pkgs):
def build_releases(pkg, versions):
for x in versions:
parsed_version = core.parse_version(x)
parsed_version = parse_version(x)
if parsed_version > pkg.parsed_version:
yield core.PkgFile(pkgname=pkg.pkgname, version=x, replaces=pkg)
yield PkgFile(pkgname=pkg.pkgname, version=x, replaces=pkg)
def find_updates(pkgset, stable_only=True):
@ -98,7 +100,8 @@ def find_updates(pkgset, stable_only=True):
if no_releases:
sys.stdout.write(
f"no releases found on pypi for {', '.join(sorted(no_releases))}\n\n"
f"no releases found on pypi for"
f" {', '.join(sorted(no_releases))}\n\n"
)
return need_update
@ -135,8 +138,7 @@ class PipCmd:
def update_package(pkg, destdir, dry_run=False):
"""Print and optionally execute a package update."""
print(
"# update {0.pkgname} from {0.replaces.version} to "
"{0.version}".format(pkg)
f"# update {pkg.pkgname} from {pkg.replaces.version} to {pkg.version}"
)
cmd = tuple(
@ -148,7 +150,7 @@ def update_package(pkg, destdir, dry_run=False):
)
)
print("{}\n".format(" ".join(cmd)))
print(" ".join(cmd), end="\n\n")
if not dry_run:
call(cmd)
@ -171,7 +173,9 @@ def update(pkgset, destdir=None, dry_run=False, stable_only=True):
def update_all_packages(
roots, destdir=None, dry_run=False, stable_only=True, ignorelist=None
):
all_packages = itertools.chain(*[core.listdir(r) for r in roots])
all_packages = itertools.chain.from_iterable(
listdir(Path(r)) for r in roots
)
skip_packages = set(ignorelist or ())

112
pypiserver/pkg_helpers.py Normal file
View File

@ -0,0 +1,112 @@
import os
import re
import typing as t
from pathlib import PurePath, Path
from urllib.parse import quote
def normalize_pkgname(name: str) -> str:
"""Perform PEP 503 normalization"""
return re.sub(r"[-_.]+", "-", name).lower()
def normalize_pkgname_for_url(name: str) -> str:
"""Perform PEP 503 normalization and ensure the value is safe for URLs."""
return quote(normalize_pkgname(name))
# ### Next 2 functions adapted from :mod:`distribute.pkg_resources`.
#
component_re = re.compile(r"(\d+ | [a-z]+ | \.| -)", re.I | re.VERBOSE)
replace = {"pre": "c", "preview": "c", "-": "final-", "rc": "c", "dev": "@"}.get
def _parse_version_parts(s):
for part in component_re.split(s):
part = replace(part, part)
if part in ["", "."]:
continue
if part[:1] in "0123456789":
yield part.zfill(8) # pad for numeric comparison
else:
yield "*" + part
yield "*final" # ensure that alpha/beta/candidate are before final
def parse_version(s: str) -> tuple:
parts = []
for part in _parse_version_parts(s.lower()):
if part.startswith("*"):
# remove trailing zeros from each series of numeric parts
while parts and parts[-1] == "00000000":
parts.pop()
parts.append(part)
return tuple(parts)
#
# ### -- End of distribute's code.
def is_listed_path(path_part: t.Union[PurePath, str]) -> bool:
if isinstance(path_part, str):
path_part = PurePath(path_part)
return not any(part.startswith(".") for part in path_part.parts)
_archive_suffix_rx = re.compile(
r"(\.zip|\.tar\.gz|\.tgz|\.tar\.bz2|-py[23]\.\d-.*|"
r"\.win-amd64-py[23]\.\d\..*|\.win32-py[23]\.\d\..*|\.egg)$",
re.I,
)
wheel_file_re = re.compile(
r"""^(?P<namever>(?P<name>.+?)-(?P<ver>\d.*?))
((-(?P<build>\d.*?))?-(?P<pyver>.+?)-(?P<abi>.+?)-(?P<plat>.+?)
\.whl|\.dist-info)$""",
re.VERBOSE,
)
_pkgname_re = re.compile(r"-\d+[a-z_.!+]", re.I)
_pkgname_parts_re = re.compile(
r"[\.\-](?=cp\d|py\d|macosx|linux|sunos|solaris|irix|aix|cygwin|win)", re.I
)
def _guess_pkgname_and_version_wheel(
basename: str,
) -> t.Optional[t.Tuple[str, str]]:
m = wheel_file_re.match(basename)
if not m:
return None
name = m.group("name")
ver = m.group("ver")
build = m.group("build")
if build:
return name, ver + "-" + build
else:
return name, ver
def guess_pkgname_and_version(path: str) -> t.Optional[t.Tuple[str, str]]:
path = os.path.basename(path)
if path.endswith(".asc"):
path = path.rstrip(".asc")
if path.endswith(".whl"):
return _guess_pkgname_and_version_wheel(path)
if not _archive_suffix_rx.search(path):
return None
path = _archive_suffix_rx.sub("", path)
if "-" not in path:
pkgname, version = path, ""
elif path.count("-") == 1:
pkgname, version = path.split("-", 1)
elif "." not in path:
pkgname, version = path.rsplit("-", 1)
else:
pkgname = _pkgname_re.split(path)[0]
ver_spec = path[len(pkgname) + 1 :]
parts = _pkgname_parts_re.split(ver_spec)
version = parts[0]
return pkgname, version

41
pypiserver/plugin.py Normal file
View File

@ -0,0 +1,41 @@
""" NOT YET IMPLEMENTED
Plugins are callable setuptools entrypoints that are invoked at startup that
a developer may use to extend the behaviour of pypiserver. A plugin for example
may add an additional Backend to the system. A plugin will be called
with the following keyword arguments
* app: the Bottle App object
* add_argument: A callable for registering command line arguments for your
plugin using the argparse cli library
* backends: A Dict[str, callable] object that you may register a backend to.
The key is the identifier for the backend in the `--backend` command line
argument.
The callable must take a single argument `config` as a Configuration object
and return a Backend instance. It may be the class constructor or a factory
function to construct a Backend object
In the future, the plugin callable may be called with additional keyword
arguments, so a plugin should accept a **kwargs variadic keyword argument.
"""
from pypiserver.backend import SimpleFileBackend, CachingFileBackend
from pypiserver import get_file_backend
DEFAULT_PACKAGE_DIRECTORIES = ["~/packages"]
# register this as a setuptools entrypoint under the 'pypiserver.plugin' key
def my_plugin(add_argument, backends, **_):
add_argument(
"package_directory",
default=DEFAULT_PACKAGE_DIRECTORIES,
nargs="*",
help="The directory from which to serve packages.",
)
backends.update(
{
"auto": get_file_backend,
"simple-dir": SimpleFileBackend,
"cached-dir": CachingFileBackend,
}
)

View File

@ -4,34 +4,21 @@
import logging
import os
import pathlib
try: # python 3
from html.parser import HTMLParser
from html import unescape
except ImportError:
from HTMLParser import HTMLParser
unescape = HTMLParser().unescape
try:
import xmlrpc.client as xmlrpclib
except ImportError:
import xmlrpclib # legacy Python
from html import unescape
# Third party imports
import pytest
import webtest
# Local Imports
from pypiserver import __main__, bottle
import tests.test_core as test_core
from tests.test_pkg_helpers import files, invalid_files
from pypiserver import __main__, bottle, core, Bottle
from pypiserver.backend import CachingFileBackend, SimpleFileBackend
# Enable logging to detect any problems with it
##
__main__.init_logging()
@ -43,12 +30,14 @@ def app(tmpdir):
roots=[pathlib.Path(tmpdir.strpath)],
authenticate=[],
password_file=".",
backend_arg="simple-dir",
)
@pytest.fixture
def testapp(app):
"""Return a webtest TestApp initiated with pypiserver app"""
bottle.debug(True)
return webtest.TestApp(app)
@ -111,7 +100,18 @@ def welcome_file_all_vars(request, root):
return wfile
def test_root_count(root, testapp):
@pytest.fixture
def add_file_to_root(app):
def file_adder(root, filename, content=""):
root.join(filename).write(content)
backend = app.config.backend
if isinstance(backend, CachingFileBackend):
backend.cache_manager.invalidate_root_cache(root)
return file_adder
def test_root_count(root, testapp, add_file_to_root):
"""Test that the welcome page count updates with added packages
:param root: root temporary directory fixture
@ -119,7 +119,7 @@ def test_root_count(root, testapp):
"""
resp = testapp.get("/")
resp.mustcontain("PyPI compatible package index serving 0 packages")
root.join("Twisted-11.0.0.tar.bz2").write("")
add_file_to_root(root, "Twisted-11.0.0.tar.bz2")
resp = testapp.get("/")
resp.mustcontain("PyPI compatible package index serving 1 packages")
@ -330,16 +330,19 @@ def test_nonroot_root_with_x_forwarded_host_without_trailing_slash(testapp):
resp.mustcontain("""<a href="/priv/packages/">here</a>""")
def test_nonroot_simple_index(root, testpriv):
root.join("foobar-1.0.zip").write("")
def test_nonroot_simple_index(root, testpriv, add_file_to_root):
add_file_to_root(root, "foobar-1.0.zip", "123")
resp = testpriv.get("/priv/simple/foobar/")
links = resp.html("a")
assert len(links) == 1
assert links[0]["href"].startswith("/priv/packages/foobar-1.0.zip#")
def test_nonroot_simple_index_with_x_forwarded_host(root, testapp):
root.join("foobar-1.0.zip").write("")
def test_nonroot_simple_index_with_x_forwarded_host(
root, testapp, add_file_to_root
):
add_file_to_root(root, "foobar-1.0.zip", "123")
resp = testapp.get(
"/simple/foobar/", headers={"X-Forwarded-Host": "forwarded.ed/priv/"}
)
@ -348,22 +351,25 @@ def test_nonroot_simple_index_with_x_forwarded_host(root, testapp):
assert links[0]["href"].startswith("/priv/packages/foobar-1.0.zip#")
def test_nonroot_simple_packages(root, testpriv):
root.join("foobar-1.0.zip").write("123")
def test_nonroot_simple_packages(root, testpriv, add_file_to_root):
add_file_to_root(root, "foobar-1.0.zip", "123")
resp = testpriv.get("/priv/packages/")
links = resp.html("a")
assert len(links) == 1
assert links[0]["href"].startswith("/priv/packages/foobar-1.0.zip#")
assert "/priv/packages/foobar-1.0.zip#" in links[0]["href"]
def test_nonroot_simple_packages_with_x_forwarded_host(root, testapp):
root.join("foobar-1.0.zip").write("123")
def test_nonroot_simple_packages_with_x_forwarded_host(
root, testapp, add_file_to_root
):
add_file_to_root(root, "foobar-1.0.zip", "123")
resp = testapp.get(
"/packages/", headers={"X-Forwarded-Host": "forwarded/priv/"}
)
links = resp.html("a")
assert len(links) == 1
assert links[0]["href"].startswith("/priv/packages/foobar-1.0.zip#")
assert "/priv/packages/foobar-1.0.zip#" in links[0]["href"]
def test_root_no_relative_paths(testpriv):
@ -444,7 +450,7 @@ def test_upload_badAction(testapp):
@pytest.mark.parametrize(
"package", [f[0] for f in test_core.files if f[1] and "/" not in f[0]]
"package", [f[0] for f in files if f[1] and "/" not in f[0]]
)
def test_upload(package, root, testapp):
resp = testapp.post(
@ -458,8 +464,23 @@ def test_upload(package, root, testapp):
assert uploaded_pkgs[0].lower() == package.lower()
def test_upload_conflict_on_existing(root, testapp):
package = "foo_bar-1.0.tar.gz"
root.join("foo_bar-1.0.tar.gz").write("")
resp = testapp.post(
"/",
params={":action": "file_upload"},
upload_files=[("content", package, b"")],
status=409,
)
assert resp.status_int == 409
assert "Package 'foo_bar-1.0.tar.gz' already exists!" in unescape(resp.text)
@pytest.mark.parametrize(
"package", [f[0] for f in test_core.files if f[1] and "/" not in f[0]]
"package", [f[0] for f in files if f[1] and "/" not in f[0]]
)
def test_upload_with_signature(package, root, testapp):
resp = testapp.post(
@ -477,9 +498,7 @@ def test_upload_with_signature(package, root, testapp):
assert f"{package.lower()}.asc" in uploaded_pkgs
@pytest.mark.parametrize(
"package", [f[0] for f in test_core.files if f[1] is None]
)
@pytest.mark.parametrize("package", invalid_files)
def test_upload_badFilename(package, root, testapp):
resp = testapp.post(
"/",

42
tests/test_backend.py Normal file
View File

@ -0,0 +1,42 @@
from pathlib import Path
import pytest
from pypiserver.backend import listdir
def create_path(root: Path, path: Path):
if path.is_absolute():
raise ValueError(
"Only test using relative paths"
" to prevent leaking outside test environment"
)
fullpath = root / path
if not fullpath.parent.exists():
fullpath.parent.mkdir(parents=True)
fullpath.touch()
valid_paths = ["direct-in-root.zip", "some/nested/pkg.zip"]
@pytest.mark.parametrize("path_name", valid_paths)
def test_listdir_generates_pkgfile_for_valid_package(tmp_path, path_name):
path = Path(path_name)
create_path(tmp_path, path)
assert len(list(listdir(tmp_path))) == 1
invalid_paths = [
".hidden-pkg.zip",
".hidden/dir/pkg.zip",
"in/between/.hidden/pkg.zip",
"invalid-wheel.whl",
]
@pytest.mark.parametrize("path_name", invalid_paths)
def test_listdir_doesnt_generate_pkgfile_for_invalid_file(tmp_path, path_name):
path = Path(path_name)
create_path(tmp_path, path)
assert not list(listdir(tmp_path))

View File

@ -8,6 +8,7 @@ import sys
import pytest
from pypiserver.backend import SimpleFileBackend, BackendProxy
from pypiserver.config import DEFAULTS, Config, RunConfig, UpdateConfig
FILE_DIR = pathlib.Path(__file__).parent.resolve()
@ -530,6 +531,35 @@ _CONFIG_TEST_PARAMS: t.Tuple[ConfigTestCase, ...] = (
exp_config_type=RunConfig,
exp_config_values={"log_err_frmt": "foo"},
),
# backend
ConfigTestCase(
"Run: backend unspecified",
args=["run"],
legacy_args=[],
exp_config_type=RunConfig,
exp_config_values={
"backend_arg": "auto",
"_test": (
lambda conf: (
isinstance(conf.backend, BackendProxy)
and isinstance(conf.backend.backend, SimpleFileBackend)
)
),
},
),
ConfigTestCase(
"Run: simple backend specified",
args=["run", "--backend", "simple-dir"],
legacy_args=["--backend", "simple-dir"],
exp_config_type=RunConfig,
exp_config_values={
"_test": (
lambda conf: (
isinstance(conf.backend.backend, SimpleFileBackend)
)
),
},
),
# ******************************************************************
# Update subcommand args
# ******************************************************************

View File

@ -6,7 +6,10 @@ import os
import pytest
from pypiserver import __main__, core
from pypiserver import __main__, core, backend
from pypiserver.pkg_helpers import (
normalize_pkgname_for_url,
)
from tests.doubles import Namespace
@ -15,98 +18,9 @@ from tests.doubles import Namespace
__main__.init_logging()
files = [
("pytz-2012b.tar.bz2", "pytz", "2012b"),
("pytz-2012b.tgz", "pytz", "2012b"),
("pytz-2012b.ZIP", "pytz", "2012b"),
("pytz-2012a.zip", "pytz", "2012a"),
("gevent-1.0b1.win32-py2.6.exe", "gevent", "1.0b1"),
("gevent-1.0b1.win32-py2.7.msi", "gevent", "1.0b1"),
("greenlet-0.3.4-py3.1-win-amd64.egg", "greenlet", "0.3.4"),
("greenlet-0.3.4.win-amd64-py3.2.exe", "greenlet", "0.3.4"),
("greenlet-0.3.4-py3.2-win32.egg", "greenlet", "0.3.4"),
("greenlet-0.3.4-py2.7-linux-x86_64.egg", "greenlet", "0.3.4"),
("pep8-0.6.0.zip", "pep8", "0.6.0"),
("ABC12-34_V1X-1.2.3.zip", "ABC12", "34_V1X-1.2.3"),
("A100-200-XYZ-1.2.3.zip", "A100-200-XYZ", "1.2.3"),
("flup-1.0.3.dev-20110405.tar.gz", "flup", "1.0.3.dev-20110405"),
("package-1.0.0-alpha.1.zip", "package", "1.0.0-alpha.1"),
("package-1.3.7+build.11.e0f985a.zip", "package", "1.3.7+build.11.e0f985a"),
("package-v1-8.1.301.ga0df26f.zip", "package-v1", "8.1.301.ga0df26f"),
("package-v1.1-8.1.301.ga0df26f.zip", "package-v1.1", "8.1.301.ga0df26f"),
("package-2013.02.17.dev123.zip", "package", "2013.02.17.dev123"),
("package-20000101.zip", "package", "20000101"),
("flup-123-1.0.3.dev-20110405.tar.gz", "flup-123", "1.0.3.dev-20110405"),
("package-123-1.0.0-alpha.1.zip", "package-123", "1.0.0-alpha.1"),
(
"package-123-1.3.7+build.11.e0f985a.zip",
"package-123",
"1.3.7+build.11.e0f985a",
),
("package-123-v1.1_3-8.1.zip", "package-123-v1.1_3", "8.1"),
("package-123-2013.02.17.dev123.zip", "package-123", "2013.02.17.dev123"),
("package-123-20000101.zip", "package-123", "20000101"),
(
"pyelasticsearch-0.5-brainbot-1-20130712.zip",
"pyelasticsearch",
"0.5-brainbot-1-20130712",
),
("pywin32-217-cp27-none-win32.whl", "pywin32", "217"),
("pywin32-217-55-cp27-none-win32.whl", "pywin32", "217-55"),
("pywin32-217.1-cp27-none-win32.whl", "pywin32", "217.1"),
("package.zip", "package", ""),
(
"package-name-0.0.1.dev0.linux-x86_64.tar.gz",
"package-name",
"0.0.1.dev0",
),
(
"package-name-0.0.1.dev0.macosx-10.10-intel.tar.gz",
"package-name",
"0.0.1.dev0",
),
(
"package-name-0.0.1.alpha.1.win-amd64-py3.2.exe",
"package-name",
"0.0.1.alpha.1",
),
("pkg-3!1.0-0.1.tgz", "pkg", "3!1.0-0.1"), # TO BE FIXED
("pkg-3!1+.0-0.1.tgz", "pkg", "3!1+.0-0.1"), # TO BE FIXED
("pkg.zip", "pkg", ""),
("foo/pkg.zip", "pkg", ""),
("foo/pkg-1b.zip", "pkg", "1b"),
(
"package-name-0.0.1.alpha.1.win-amd64-py3.2.exe",
"package-name",
"0.0.1.alpha.1",
),
]
def _capitalize_ext(fpath):
f, e = os.path.splitext(fpath)
if e != ".whl":
e = e.upper()
return f + e
@pytest.mark.parametrize(("filename", "pkgname", "version"), files)
def test_guess_pkgname_and_version(filename, pkgname, version):
exp = (pkgname, version)
assert core.guess_pkgname_and_version(filename) == exp
assert core.guess_pkgname_and_version(_capitalize_ext(filename)) == exp
@pytest.mark.parametrize(("filename", "pkgname", "version"), files)
def test_guess_pkgname_and_version_asc(filename, pkgname, version):
exp = (pkgname, version)
filename = f"{filename}.asc"
assert core.guess_pkgname_and_version(filename) == exp
def test_listdir_bad_name(tmpdir):
tmpdir.join("foo.whl").ensure()
res = list(core.listdir(tmpdir.strpath))
def test_listdir_bad_name(tmp_path):
tmp_path.joinpath("foo.whl").touch()
res = list(backend.listdir(tmp_path))
assert res == []
@ -122,33 +36,37 @@ hashes = (
@pytest.mark.parametrize(("algo", "digest"), hashes)
def test_hashfile(tmpdir, algo, digest):
f = tmpdir.join("empty")
f.ensure()
assert core.digest_file(f.strpath, algo) == digest
def test_hashfile(tmp_path, algo, digest):
f = tmp_path.joinpath("empty")
f.touch()
assert backend.digest_file(str(f), algo) == f"{algo}={digest}"
@pytest.mark.parametrize("hash_algo", ("md5", "sha256", "sha512"))
def test_fname_and_hash(tmpdir, hash_algo):
def test_fname_and_hash(tmp_path, hash_algo):
"""Ensure we are returning the expected hashes for files."""
f = tmpdir.join("tmpfile")
f.ensure()
pkgfile = core.PkgFile("tmp", "1.0.0", f.strpath, f.dirname, f.basename)
assert pkgfile.fname_and_hash(hash_algo) == "{}#{}={}".format(
f.basename, hash_algo, str(f.computehash(hashtype=hash_algo))
)
def digester(pkg):
digest = backend.digest_file(pkg.fn, hash_algo)
pkg.digest = digest
return digest
f = tmp_path.joinpath("tmpfile")
f.touch()
pkgfile = core.PkgFile("tmp", "1.0.0", str(f), f.parent, f.name)
pkgfile.digester = digester
assert pkgfile.fname_and_hash == f"{f.name}#{digester(pkgfile)}"
def test_redirect_prefix_encodes_newlines():
def test_redirect_project_encodes_newlines():
"""Ensure raw newlines are url encoded in the generated redirect."""
request = Namespace(custom_fullpath="/\nSet-Cookie:malicious=1;")
prefix = "\nSet-Cookie:malicious=1;"
newpath = core.get_bad_url_redirect_path(request, prefix)
project = "\nSet-Cookie:malicious=1;"
newpath = core.get_bad_url_redirect_path(request, project)
assert "\n" not in newpath
def test_normalize_pkgname_for_url_encodes_newlines():
"""Ensure newlines are url encoded in package names for urls."""
assert "\n" not in core.normalize_pkgname_for_url(
"/\nSet-Cookie:malicious=1;"
)
assert "\n" not in normalize_pkgname_for_url("/\nSet-Cookie:malicious=1;")

View File

@ -1,6 +1,7 @@
import logging
import os
import pathlib
from pathlib import Path
import sys
import typing as t
from unittest import mock

View File

@ -3,20 +3,15 @@
from __future__ import absolute_import, print_function, unicode_literals
try:
from pathlib import Path
from unittest.mock import Mock
except ImportError:
from mock import Mock
import py
import pytest
from pypiserver import manage
from pypiserver.core import (
PkgFile,
guess_pkgname_and_version,
parse_version,
)
from pypiserver.core import PkgFile
from pypiserver.pkg_helpers import guess_pkgname_and_version, parse_version
from pypiserver.manage import (
PipCmd,
build_releases,
@ -210,17 +205,17 @@ def test_update_all_packages(monkeypatch):
private_pkg_2 = PkgFile("my_other_private_pkg", "1.0")
roots_mock = {
"/opt/pypi": [
Path("/opt/pypi"): [
public_pkg_1,
private_pkg_1,
],
"/data/pypi": [public_pkg_2, private_pkg_2],
Path("/data/pypi"): [public_pkg_2, private_pkg_2],
}
def core_listdir_mock(directory):
return roots_mock.get(directory, [])
def core_listdir_mock(path: Path):
return roots_mock.get(path, [])
monkeypatch.setattr(manage.core, "listdir", core_listdir_mock)
monkeypatch.setattr(manage, "listdir", core_listdir_mock)
monkeypatch.setattr(manage, "update", Mock(return_value=None))
destdir = None
@ -243,7 +238,7 @@ def test_update_all_packages(monkeypatch):
)
def test_update_all_packages_with_ignorelist(monkeypatch):
def test_update_all_packages_with_blacklist(monkeypatch):
"""Test calling update_all_packages()"""
public_pkg_1 = PkgFile("Flask", "1.0")
public_pkg_2 = PkgFile("requests", "1.0")
@ -251,17 +246,17 @@ def test_update_all_packages_with_ignorelist(monkeypatch):
private_pkg_2 = PkgFile("my_other_private_pkg", "1.0")
roots_mock = {
"/opt/pypi": [
Path("/opt/pypi"): [
public_pkg_1,
private_pkg_1,
],
"/data/pypi": [public_pkg_2, private_pkg_2],
Path("/data/pypi"): [public_pkg_2, private_pkg_2],
}
def core_listdir_mock(directory):
return roots_mock.get(directory, [])
def core_listdir_mock(path: Path):
return roots_mock.get(path, [])
monkeypatch.setattr(manage.core, "listdir", core_listdir_mock)
monkeypatch.setattr(manage, "listdir", core_listdir_mock)
monkeypatch.setattr(manage, "update", Mock(return_value=None))
destdir = None

116
tests/test_pkg_helpers.py Normal file
View File

@ -0,0 +1,116 @@
import os
from pathlib import WindowsPath, PureWindowsPath
import pytest
from pypiserver.pkg_helpers import guess_pkgname_and_version, is_listed_path
files = [
("pytz-2012b.tar.bz2", "pytz", "2012b"),
("pytz-2012b.tgz", "pytz", "2012b"),
("pytz-2012b.ZIP", "pytz", "2012b"),
("pytz-2012a.zip", "pytz", "2012a"),
("gevent-1.0b1.win32-py2.6.exe", "gevent", "1.0b1"),
("gevent-1.0b1.win32-py2.7.msi", "gevent", "1.0b1"),
("greenlet-0.3.4-py3.1-win-amd64.egg", "greenlet", "0.3.4"),
("greenlet-0.3.4.win-amd64-py3.2.exe", "greenlet", "0.3.4"),
("greenlet-0.3.4-py3.2-win32.egg", "greenlet", "0.3.4"),
("greenlet-0.3.4-py2.7-linux-x86_64.egg", "greenlet", "0.3.4"),
("pep8-0.6.0.zip", "pep8", "0.6.0"),
("ABC12-34_V1X-1.2.3.zip", "ABC12", "34_V1X-1.2.3"),
("A100-200-XYZ-1.2.3.zip", "A100-200-XYZ", "1.2.3"),
("flup-1.0.3.dev-20110405.tar.gz", "flup", "1.0.3.dev-20110405"),
("package-1.0.0-alpha.1.zip", "package", "1.0.0-alpha.1"),
("package-1.3.7+build.11.e0f985a.zip", "package", "1.3.7+build.11.e0f985a"),
("package-v1-8.1.301.ga0df26f.zip", "package-v1", "8.1.301.ga0df26f"),
("package-v1.1-8.1.301.ga0df26f.zip", "package-v1.1", "8.1.301.ga0df26f"),
("package-2013.02.17.dev123.zip", "package", "2013.02.17.dev123"),
("package-20000101.zip", "package", "20000101"),
("flup-123-1.0.3.dev-20110405.tar.gz", "flup-123", "1.0.3.dev-20110405"),
("package-123-1.0.0-alpha.1.zip", "package-123", "1.0.0-alpha.1"),
(
"package-123-1.3.7+build.11.e0f985a.zip",
"package-123",
"1.3.7+build.11.e0f985a",
),
("package-123-v1.1_3-8.1.zip", "package-123-v1.1_3", "8.1"),
("package-123-2013.02.17.dev123.zip", "package-123", "2013.02.17.dev123"),
("package-123-20000101.zip", "package-123", "20000101"),
(
"pyelasticsearch-0.5-brainbot-1-20130712.zip",
"pyelasticsearch",
"0.5-brainbot-1-20130712",
),
("pywin32-217-cp27-none-win32.whl", "pywin32", "217"),
("pywin32-217-55-cp27-none-win32.whl", "pywin32", "217-55"),
("pywin32-217.1-cp27-none-win32.whl", "pywin32", "217.1"),
("package.zip", "package", ""),
(
"package-name-0.0.1.dev0.linux-x86_64.tar.gz",
"package-name",
"0.0.1.dev0",
),
(
"package-name-0.0.1.dev0.macosx-10.10-intel.tar.gz",
"package-name",
"0.0.1.dev0",
),
(
"package-name-0.0.1.alpha.1.win-amd64-py3.2.exe",
"package-name",
"0.0.1.alpha.1",
),
("pkg-3!1.0-0.1.tgz", "pkg", "3!1.0-0.1"), # TO BE FIXED
("pkg-3!1+.0-0.1.tgz", "pkg", "3!1+.0-0.1"), # TO BE FIXED
("pkg.zip", "pkg", ""),
("foo/pkg.zip", "pkg", ""),
("foo/pkg-1b.zip", "pkg", "1b"),
("foo/pywin32-217.1-cp27-none-win32.whl", "pywin32", "217.1"),
(
"package-name-0.0.1.alpha.1.win-amd64-py3.2.exe",
"package-name",
"0.0.1.alpha.1",
),
]
def _capitalize_ext(fpath):
f, e = os.path.splitext(fpath)
if e != ".whl":
e = e.upper()
return f + e
@pytest.mark.parametrize(("filename", "pkgname", "version"), files)
def test_guess_pkgname_and_version(filename, pkgname, version):
exp = (pkgname, version)
assert guess_pkgname_and_version(filename) == exp
assert guess_pkgname_and_version(_capitalize_ext(filename)) == exp
@pytest.mark.parametrize(("filename", "pkgname", "version"), files)
def test_guess_pkgname_and_version_asc(filename, pkgname, version):
exp = (pkgname, version)
filename = f"{filename}.asc"
assert guess_pkgname_and_version(filename) == exp
invalid_files = ["some_file", "some_file.ext", "some_wheel.whl"]
@pytest.mark.parametrize("filename", invalid_files)
def test_guess_pkgname_and_version_invalid_files(filename):
assert guess_pkgname_and_version(filename) is None
paths = [
("/some/path", True),
(PureWindowsPath(r"c:\some\windows\path"), True),
("/.hidden", False),
(PureWindowsPath(r"c:\.hidden\windows\path"), False),
]
@pytest.mark.parametrize(("pathname", "allowed"), paths)
def test_allowed_path_check(pathname, allowed):
assert is_listed_path(pathname) == allowed