1
0
mirror of https://github.com/pypiserver/pypiserver synced 2024-11-09 16:45:51 +01:00

Use argparse config throughout app (#349)

This PR is a pretty substantial refactor of the entrypoints of pypiserver (`__main__` and `__init__`) to use the argparse-based config added in #339.

- Updated `RunConfig` and `UpdateConfig` classes to have exclusive init kwargs, instead of taking an namespace. This turned out to be much easier when working with the library-style app initialization in `__init__`, both for direct instantiation and via paste config
- Added an `iter_packages()` method to the `RunConfig` to iterate over packages specified by the configuration (note @elfjes, I think that replacing this with e.g. a `backend` reference will be a nice way to tie in #348)
- Added a general-purpose method to map legacy keyword arguments to the `app()` and `paste_app_factory()` functions to updated forms
- Refactored the `paste_app_factory()` to not mutate the incoming dictionary
- Removed all argument-parsing and config-related code from `__main__` and `core`
- Moved `_logwrite` from `__init__` to `__main__`, since that was the only place it was being used after the updates to `core`
- Updated `digest_file` to use `hashlib.new(algo)` instead of `getattr(hashlib, algo)`, because the former supports more algorithms
- Updated `setup.py` to, instead of calling `eval()` on the entirety of `__init__`, to instead just evaluate the line that defines the version
- Assigned the config to a `._pypiserver_config` attribute on the `Bottle` instance to reduce hacky test workarounds
- Fixed the tox config, which I broke in #339 

* Config: add auth & absolute path resolution

* Config: check pkg dirs on config creation

* Instantiate config with kwargs, not namespace

* WIP: still pulling the threads

* Init seems to be working

* tests passing locally, still need to update cache

* Fix tox command

* unused import

* Fix typing

* Be more selective in exec() in setup.py

* Require accurate casing for hash algos

* Remove old comment

* Comments, minor updates and simplifications

* move _logwrite to a more reasonable place

* Update config to work with cache

* Type cachemanager listdir in core

* Update config module docstring, rename method

* Add more comments re: paste config

* Add comments to main, remove unneded check

* Remove commented code

* Use {posargs} instead of [] for clarity in tox

* Add dupe check for kwarg updater

* Remove unused references on app instance

* Fix typo

* Remove redundancy in log level parsing
This commit is contained in:
Matthew Planchard 2020-10-25 18:48:28 -05:00 committed by GitHub
parent 47d6efe196
commit c668b1814a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 795 additions and 832 deletions

@ -1,6 +1,11 @@
import os
import functools
import pathlib
import re as _re
import sys
import typing as t
from pypiserver.bottle import Bottle
from pypiserver.config import Config, RunConfig, strtobool
version = __version__ = "2.0.0dev1"
__version_info__ = tuple(_re.split("[.-]", __version__))
@ -11,203 +16,191 @@ __summary__ = "A minimal PyPI server for use with pip/easy_install."
__uri__ = "https://github.com/pypiserver/pypiserver"
class Configuration:
"""
.. see:: config-options: :func:`pypiserver.configure()`
identity = lambda x: x
def backwards_compat_kwargs(kwargs: dict, warn: bool = True) -> dict:
"""Return a dict with deprecated kwargs converted to new kwargs.
:param kwargs: the incoming kwargs to convert
:param warn: whether to output a warning to stderr if there are deprecated
kwargs found in the incoming kwargs
"""
# A mapping of deprecated kwargs to a 2-tuple of their corresponding updated
# kwarg and a function to convert the value of the deprecated kwarg to a
# value for the new kwarg. `identity` is just a function that returns
# whatever it is passed and is used in cases where the only change from
# a legacy kwarg is its name.
backwards_compat = {
"authenticated": ("authenticate", identity),
"passwords": ("password_file", identity),
# `root` could be a string or an array of strings. Handle both cases,
# converting strings to Path instances.
"root": (
"roots",
lambda root: [
# Convert strings to absolute Path instances
pathlib.Path(r).expanduser().resolve()
for r in ([root] if isinstance(root, str) else root)
],
),
# `redirect_to_fallback` was changed to `disable_fallback` for clearer
# use as a flag to disable the default behavior. Since its behavior
# is the opposite, we negate it.
"redirect_to_fallback": (
"disable_fallback",
lambda redirect: not redirect,
),
"server": ("server_method", identity),
# `welcome_msg` now is just provided as text, so that anyone using
# pypiserver as a library doesn't need to worry about distributing
# files if they don't need to. If we're still passed an old-style
# `welcome_file` argument, we go ahead and resolve it to an absolute
# path and read the text.
"welcome_file": (
"welcome_msg",
lambda p: pathlib.Path(p).expanduser().resolve().read_text(),
),
}
# Warn the user if they're using any deprecated arguments
if warn and any(k in backwards_compat for k in kwargs):
# Make nice instructions like `Please replace the following:
# 'authenticated' with 'authenticate'` and print to stderr.
replacement_strs = (
f"'{k}' with '{backwards_compat[k][0]}'"
for k in filter(lambda k: k in kwargs, backwards_compat)
)
warn_str = (
"You are using deprecated arguments. Please replace the following: \n"
f" {', '.join(replacement_strs)}"
)
print(warn_str, file=sys.stderr)
def __init__(self, **kwds):
vars(self).update(kwds)
# Create an iterable of 2-tuple to collect into the updated dictionary. Each
# item will either be the existing key-value pair from kwargs, or, if the
# keyword is a legacy keyword, the new key and potentially adjusted value
# for that keyword. Note that depending on the order the argument are
# specified, this _could_ mean an updated legacy keyword could override
# a new argument if that argument is also specified. However, in that
# case, our updated kwargs dictionary would have a different number of
# keys compared to our incoming dictionary, so we check for that case
# below.
rv_iter = (
(
(k, v)
if k not in backwards_compat
else (backwards_compat[k][0], backwards_compat[k][1](v))
)
for k, v in kwargs.items()
)
updated_kwargs = dict(rv_iter)
def __repr__(self, *args, **kwargs):
return f"Configuration(**{vars(self)})"
def __str__(self, *args, **kwargs):
return "Configuration:\n" + "\n".join(
f"{k:>20} = {v}" for k, v in sorted(vars(self).items())
# If our dictionaries have different lengths, we must have gotten duplicate
# legacy/modern keys. Figure out which keys were dupes and throw an error.
if len(updated_kwargs) != len(kwargs):
legacy_to_modern = {k: v[0] for k, v in backwards_compat.items()}
dupes = [
(k, v)
for k, v in legacy_to_modern.items()
if k in kwargs and v in kwargs
]
raise ValueError(
"Keyword arguments for pypiserver app() constructor contained "
"duplicate legacy and modern keys. Duplicates are shown below, in "
"the form (legacy_key, modern_key):\n"
f"{dupes}"
)
def update(self, props):
d = props if isinstance(props, dict) else vars(props)
vars(self).update(d)
return updated_kwargs
DEFAULT_SERVER = "auto"
def app(**kwargs: t.Any) -> Bottle:
"""Construct a bottle app running pypiserver.
def default_config(
root=None,
host="0.0.0.0",
port=8080,
server=DEFAULT_SERVER,
redirect_to_fallback=True,
fallback_url=None,
authenticated=["update"],
password_file=None,
overwrite=False,
hash_algo="md5",
verbosity=1,
log_file=None,
log_stream="stderr",
log_frmt="%(asctime)s|%(name)s|%(levelname)s|%(thread)d|%(message)s",
log_req_frmt="%(bottle.request)s",
log_res_frmt="%(status)s",
log_err_frmt="%(body)s: %(exception)s \n%(traceback)s",
welcome_file=None,
cache_control=None,
auther=None,
VERSION=__version__,
):
:param kwds: Any overrides for defaults. Any property of RunConfig
(or its base), defined in `pypiserver.config`, may be overridden.
"""
Fetch default-opts with overridden kwds, capable of starting-up pypiserver.
Does not validate overridden options.
Example usage::
kwds = pypiserver.default_config(<override_kwds> ...)
## More modifications on kwds.
pypiserver.app(**kwds)``.
Kwds correspond to same-named cmd-line opts, with '-' --> '_' substitution.
Non standard args are described below:
:param return_defaults_only:
When `True`, returns defaults, otherwise,
configures "runtime" attributes and returns also the "packages"
found in the roots.
:param root:
A list of paths, derived from the packages specified on cmd-line.
If `None`, defaults to '~/packages'.
:param redirect_to_fallback:
see :option:`--disable-fallback`
:param authenticated:
see :option:`--authenticate`
:param password_file:
see :option:`--passwords`
:param log_file:
see :option:`--log-file`
Not used, passed here for logging it.
:param log_frmt:
see :option:`--log-frmt`
Not used, passed here for logging it.
:param callable auther:
An API-only options that if it evaluates to a callable,
it is invoked to allow access to protected operations
(instead of htpaswd mechanism) like that::
auther(username, password): bool
When defined, `password_file` is ignored.
:param host:
see :option:`--interface`
Not used, passed here for logging it.
:param port:
see :option:`--port`
Not used, passed here for logging it.
:param server:
see :option:`--server`
Not used, passed here for logging it.
:param verbosity:
see :option:`-v`
Not used, passed here for logging it.
:param VERSION:
Not used, passed here for logging it.
:return: a dict of defaults
"""
return locals()
config = Config.default_with_overrides(**backwards_compat_kwargs(kwargs))
return app_from_config(config)
def app(**kwds):
"""
:param dict kwds: Any overrides for defaults, as fetched by
:func:`default_config()`. Check the docstring of this function
for supported kwds.
"""
from . import core
def app_from_config(config: RunConfig) -> Bottle:
"""Construct a bottle app from the provided RunConfig."""
# The _app module instantiates a Bottle instance directly when it is
# imported. That is `_app.app`. We directly mutate some global variables
# on the imported `_app` module so that its endpoints will behave as
# we expect.
_app = __import__("_app", globals(), locals(), ["."], 1)
# Because we're about to mutate our import, we pop it out of the imported
# modules map, so that any future imports do not receive our mutated version
sys.modules.pop("pypiserver._app", None)
kwds = default_config(**kwds)
config, packages = core.configure(**kwds)
_app.config = config
_app.packages = packages
_app.app.module = _app # HACK for testing.
# Add a reference to our config on the Bottle app for easy access in testing
# and other contexts.
_app.app._pypiserver_config = config
return _app.app
def str2bool(s, default):
if s is not None and s != "":
return s.lower() not in ("no", "off", "0", "false")
return default
T = t.TypeVar("T")
def _str_strip(string):
"""Provide a generic strip method to pass as a callback."""
return string.strip()
def paste_app_factory(_global_config, **local_conf):
"""Parse a paste config and return an app.
The paste config is entirely strings, so we need to parse those
strings into values usable for the config, if they're present.
"""
def paste_app_factory(global_config, **local_conf):
"""Parse a paste config and return an app."""
def to_bool(val: t.Optional[str]) -> t.Optional[bool]:
"""Convert a string value, if provided, to a bool."""
return val if val is None else strtobool(val)
def upd_conf_with_bool_item(conf, attr, sdict):
conf[attr] = str2bool(sdict.pop(attr, None), conf[attr])
def to_int(val: t.Optional[str]) -> t.Optional[int]:
"""Convert a string value, if provided, to an int."""
return val if val is None else int(val)
def upd_conf_with_str_item(conf, attr, sdict):
value = sdict.pop(attr, None)
if value is not None:
conf[attr] = value
def to_list(
val: t.Optional[str],
sep: str = " ",
transform: t.Callable[[str], T] = str.strip,
) -> t.Optional[t.List[T]]:
"""Convert a string value, if provided, to a list.
def upd_conf_with_int_item(conf, attr, sdict):
value = sdict.pop(attr, None)
if value is not None:
conf[attr] = int(value)
:param sep: the separator between items in the string representation
of the list
:param transform: an optional function to call on each string item of
the list
"""
if val is None:
return val
return list(filter(None, map(transform, val.split(sep))))
def upd_conf_with_list_item(conf, attr, sdict, sep=" ", parse=_str_strip):
values = sdict.pop(attr, None)
if values:
conf[attr] = list(filter(None, map(parse, values.split(sep))))
def _make_root(root: str) -> pathlib.Path:
"""Convert a specified string root into an absolute Path instance."""
return pathlib.Path(root.strip()).expanduser().resolve()
def _make_root(root):
root = root.strip()
if root.startswith("~"):
return os.path.expanduser(root)
return root
# A map of config keys we expect in the paste config to the appropriate
# function to parse the string config value. This map includes both
# current and legacy keys.
maps = {
"cache_control": to_int,
"roots": functools.partial(to_list, sep="\n", transform=_make_root),
# root is a deprecated argument for roots
"root": functools.partial(to_list, sep="\n", transform=_make_root),
"disable_fallback": to_bool,
# redirect_to_fallback is a deprecated argument for disable_fallback
"redirect_to_fallback": to_bool,
"overwrite": to_bool,
"authenticate": functools.partial(to_list, sep=" "),
# authenticated is a deprecated argument for authenticate
"authenticated": functools.partial(to_list, sep=" "),
"verbosity": to_int,
}
c = default_config()
# First, convert values from strings to whatever types we need, or leave
# them as strings if there's no mapping function available for them.
mapped_conf = {k: maps.get(k, identity)(v) for k, v in local_conf.items()}
# Convert any legacy key/value pairs into their modern form.
updated_conf = backwards_compat_kwargs(mapped_conf)
upd_conf_with_bool_item(c, "overwrite", local_conf)
upd_conf_with_bool_item(c, "redirect_to_fallback", local_conf)
upd_conf_with_list_item(c, "authenticated", local_conf, sep=" ")
upd_conf_with_list_item(c, "root", local_conf, sep="\n", parse=_make_root)
upd_conf_with_int_item(c, "verbosity", local_conf)
str_items = [
"fallback_url",
"hash_algo",
"log_err_frmt",
"log_file",
"log_frmt",
"log_req_frmt",
"log_res_frmt",
"password_file",
"welcome_file",
]
for str_item in str_items:
upd_conf_with_str_item(c, str_item, local_conf)
# cache_control is undocumented; don't know what type is expected:
# upd_conf_with_str_item(c, 'cache_control', local_conf)
return app(**c)
def _logwrite(logger, level, msg):
if msg:
line_endings = ["\r\n", "\n\r", "\n"]
for le in line_endings:
if msg.endswith(le):
msg = msg[: -len(le)]
if msg:
logger.log(level, msg)
return app(**updated_conf)

@ -3,14 +3,12 @@
from __future__ import print_function
import getopt
import logging
import os
import re
import sys
import textwrap
import typing as t
import functools as ft
from pypiserver.config import Config, UpdateConfig
log = logging.getLogger("pypiserver.main")
@ -20,7 +18,7 @@ def init_logging(
level=logging.NOTSET,
frmt=None,
filename=None,
stream=sys.stderr,
stream: t.Optional[t.IO] = sys.stderr,
logger=None,
):
logger = logger or logging.getLogger()
@ -38,332 +36,77 @@ def init_logging(
logger.addHandler(handler)
def usage():
return textwrap.dedent(
"""\
pypi-server [OPTIONS] [PACKAGES_DIRECTORY...]
start PyPI compatible package server serving packages from
PACKAGES_DIRECTORY. If PACKAGES_DIRECTORY is not given on the
command line, it uses the default ~/packages. pypiserver scans this
directory recursively for packages. It skips packages and
directories starting with a dot. Multiple package directories can be
specified.
pypi-server understands the following options:
-p, --port PORT
Listen on port PORT (default: 8080).
-i, --interface INTERFACE
Listen on interface INTERFACE (default: 0.0.0.0, any interface).
-a, --authenticate (update|download|list), ...
Comma-separated list of (case-insensitive) actions to authenticate.
Requires to have set the password (-P option).
To password-protect package downloads (in addition to uploads) while
leaving listings public, use:
-P foo/htpasswd.txt -a update,download
To allow unauthorized access, use:
-P . -a .
Note that when uploads are not protected, the `register` command
is not necessary, but `~/.pypirc` still need username and password fields,
even if bogus.
By default, only 'update' is password-protected.
-P, --passwords PASSWORD_FILE
Use apache htpasswd file PASSWORD_FILE to set usernames & passwords when
authenticating certain actions (see -a option).
To allow unauthorized access, use:
-P . -a .
--disable-fallback
Disable redirect to real PyPI index for packages not found in the
local index.
--fallback-url FALLBACK_URL
For packages not found in the local index, this URL will be used to
redirect to (default: https://pypi.org/simple/).
--server METHOD
Use METHOD to run the server. Valid values include paste,
cherrypy, twisted, gunicorn, gevent, wsgiref, auto. The
default is to use "auto" which chooses one of paste, cherrypy,
twisted or wsgiref.
-r, --root PACKAGES_DIRECTORY
[deprecated] Serve packages from PACKAGES_DIRECTORY.
-o, --overwrite
Allow overwriting existing package files.
--hash-algo ALGO
Any `hashlib` available algo used as fragments on package links.
Set one of (0, no, off, false) to disabled it (default: md5).
--welcome HTML_FILE
Uses the ASCII contents of HTML_FILE as welcome message response.
-v
Enable verbose logging; repeat for more verbosity.
--log-file FILE
Write logging info into this FILE, as well as to stdout or stderr, if configured.
--log-stream STREAM
Log messages to the specified STREAM. Valid values are "stdout", "stderr", or "none"
--log-frmt FORMAT
The logging format-string. (see `logging.LogRecord` class from standard python library)
[Default: %(asctime)s|%(name)s|%(levelname)s|%(thread)d|%(message)s]
--log-req-frmt FORMAT
A format-string selecting Http-Request properties to log; set to '%s' to see them all.
[Default: %(bottle.request)s]
--log-res-frmt FORMAT
A format-string selecting Http-Response properties to log; set to '%s' to see them all.
[Default: %(status)s]
--log-err-frmt FORMAT
A format-string selecting Http-Error properties to log; set to '%s' to see them all.
[Default: %(body)s: %(exception)s \n%(traceback)s]
--cache-control AGE
Add "Cache-Control: max-age=AGE, public" header to package downloads.
Pip 6+ needs this for caching.
pypi-server -h, --help
Show this help message.
pypi-server --version
Show pypi-server's version.
pypi-server -U [OPTIONS] [PACKAGES_DIRECTORY...]
Update packages in PACKAGES_DIRECTORY. This command searches
pypi.org for updates and shows a pip command line which
updates the package.
The following additional options can be specified with -U:
-x
Execute the pip commands instead of only showing them.
-d DOWNLOAD_DIRECTORY
Download package updates to this directory. The default is to use
the directory which contains the latest version of the package to
be updated.
-u
Allow updating to unstable version (alpha, beta, rc, dev versions).
--blacklist-file BLACKLIST_FILE
Don't update packages listed in this file (one package name per line,
without versions, '#' comments honored). This can be useful if you upload
private packages into pypiserver, but also keep a mirror of public
packages that you regularly update. Attempting to pull an update of
a private package from `pypi.org` might pose a security risk - e.g. a
malicious user might publish a higher version of the private package,
containing arbitrary code.
Visit https://pypi.org/project/pypiserver/ for more information.
"""
)
def main(argv=None):
"""Application entrypoint for pypiserver.
This function drives the application (as opposed to the library)
implementation of pypiserver. Usage from the commandline will result in
this function being called.
"""
import pypiserver
if argv is None:
argv = sys.argv
# The first item in sys.argv is the name of the python file being
# executed, which we don't need
argv = sys.argv[1:]
command = "serve"
c = pypiserver.Configuration(**pypiserver.default_config())
update_dry_run = True
update_directory = None
update_stable_only = True
update_blacklist_file = None
try:
opts, roots = getopt.getopt(
argv[1:],
"i:p:a:r:d:P:Uuvxoh",
[
"interface=",
"passwords=",
"authenticate=",
"port=",
"root=",
"server=",
"fallback-url=",
"disable-fallback",
"overwrite",
"hash-algo=",
"blacklist-file=",
"log-file=",
"log-stream=",
"log-frmt=",
"log-req-frmt=",
"log-res-frmt=",
"log-err-frmt=",
"welcome=",
"cache-control=",
"version",
"help",
],
)
except getopt.GetoptError:
err = sys.exc_info()[1]
sys.exit(f"usage error: {err}")
for k, v in opts:
if k in ("-p", "--port"):
try:
c.port = int(v)
except Exception:
err = sys.exc_info()[1]
sys.exit(f"Invalid port({v!r}) due to: {err}")
elif k in ("-a", "--authenticate"):
c.authenticated = [
a.lower() for a in re.split("[, ]+", v.strip(" ,")) if a
]
if c.authenticated == ["."]:
c.authenticated = []
else:
actions = ("list", "download", "update")
for a in c.authenticated:
if a not in actions:
errmsg = (
f"Action '{a}' for option `{k}`"
f" not one of {actions}!"
)
sys.exit(errmsg)
elif k in ("-i", "--interface"):
c.host = v
elif k in ("-r", "--root"):
roots.append(v)
elif k == "--disable-fallback":
c.redirect_to_fallback = False
elif k == "--fallback-url":
c.fallback_url = v
elif k == "--server":
c.server = v
elif k == "--welcome":
c.welcome_file = v
elif k == "--version":
print(f"pypiserver {pypiserver.__version__}\n")
return
elif k == "-U":
command = "update"
elif k == "-x":
update_dry_run = False
elif k == "-u":
update_stable_only = False
elif k == "-d":
update_directory = v
elif k == "--blacklist-file":
update_blacklist_file = v
elif k in ("-P", "--passwords"):
c.password_file = v
elif k in ("-o", "--overwrite"):
c.overwrite = True
elif k == "--hash-algo":
c.hash_algo = None if not pypiserver.str2bool(v, c.hash_algo) else v
elif k == "--log-file":
c.log_file = v
elif k == "--log-stream":
c.log_stream = v
elif k == "--log-frmt":
c.log_frmt = v
elif k == "--log-req-frmt":
c.log_req_frmt = v
elif k == "--log-res-frmt":
c.log_res_frmt = v
elif k == "--log-err-frmt":
c.log_err_frmt = v
elif k == "--cache-control":
c.cache_control = v
elif k == "-v":
c.verbosity += 1
elif k in ("-h", "--help"):
print(usage())
sys.exit(0)
if (
not c.authenticated
and c.password_file != "."
or c.authenticated
and c.password_file == "."
):
sys.exit(
"When auth-ops-list is empty (-a=.), password-file"
f" (-P={c.password_file!r}) must also be empty ('.')!"
)
if len(roots) == 0:
roots.append(os.path.expanduser("~/packages"))
roots = [os.path.abspath(x) for x in roots]
c.root = roots
verbose_levels = [
logging.WARNING,
logging.INFO,
logging.DEBUG,
logging.NOTSET,
]
log_level = list(zip(verbose_levels, range(c.verbosity)))[-1][0]
valid_streams = {"none": None, "stderr": sys.stderr, "stdout": sys.stdout}
if c.log_stream not in valid_streams:
sys.exit(
f"Invalid log stream {c.log_stream}."
f" Choose one of {', '.join(valid_streams.keys())}"
)
config = Config.from_args(argv)
init_logging(
level=log_level,
filename=c.log_file,
frmt=c.log_frmt,
stream=valid_streams[c.log_stream],
level=config.log_level,
filename=config.log_file,
frmt=config.log_frmt,
stream=config.log_stream,
)
if command == "update":
# Check to see if we were asked to run an update command instead of running
# the server
if isinstance(config, UpdateConfig):
from pypiserver.manage import update_all_packages
update_all_packages(
roots,
update_directory,
dry_run=update_dry_run,
stable_only=update_stable_only,
blacklist_file=update_blacklist_file,
config.roots,
config.download_directory,
dry_run=not config.execute,
stable_only=config.allow_unstable,
ignorelist=config.ignorelist,
)
return
# Fixes #49:
# The gevent server adapter needs to patch some
# modules BEFORE importing bottle!
if c.server and c.server.startswith("gevent"):
if config.server_method.startswith("gevent"):
import gevent.monkey # @UnresolvedImport
gevent.monkey.patch_all()
from pypiserver import bottle
if c.server not in bottle.server_names:
sys.exit(
f"Unknown server {c.server}."
f" Choose one of {', '.join(bottle.server_names.keys())}"
)
bottle.debug(c.verbosity > 1)
bottle.debug(config.verbosity > 1)
bottle._stderr = ft.partial(
pypiserver._logwrite, logging.getLogger(bottle.__name__), logging.INFO
_logwrite, logging.getLogger(bottle.__name__), logging.INFO
)
app = pypiserver.app(**vars(c))
bottle.run(app=app, host=c.host, port=c.port, server=c.server)
# Here `app` is a Bottle instance, which we pass to bottle.run() to run
# the server
app = pypiserver.app_from_config(config)
bottle.run(
app=app,
host=config.host,
port=config.port,
server=config.server_method,
)
def _logwrite(logger, level, msg):
if msg:
line_endings = ["\r\n", "\n\r", "\n"]
for le in line_endings:
if msg.endswith(le):
msg = msg[: -len(le)]
if msg:
logger.log(level, msg)
if __name__ == "__main__":

@ -6,6 +6,7 @@ import re
import zipfile
import xml.dom.minidom
from pypiserver.config import RunConfig
from . import __version__
from . import core
from .bottle import (
@ -35,8 +36,7 @@ except ImportError: # PY2
log = logging.getLogger(__name__)
packages = None
config = None
config: RunConfig
app = Bottle()
@ -49,7 +49,7 @@ class auth:
def __call__(self, method):
def protector(*args, **kwargs):
if self.action in config.authenticated:
if self.action in config.authenticate:
if not request.auth or request.auth[1] is None:
raise HTTPError(
401, headers={"WWW-Authenticate": 'Basic realm="pypi"'}
@ -104,8 +104,9 @@ def root():
fp = request.custom_fullpath
try:
numpkgs = len(list(packages()))
except:
numpkgs = len(list(config.iter_packages()))
except Exception as exc:
log.error(f"Could not list packages: {exc}")
numpkgs = 0
# Ensure template() does not consider `msg` as filename!
@ -150,7 +151,7 @@ def remove_pkg():
pkgs = list(
filter(
lambda pkg: pkg.pkgname == name and pkg.version == version,
core.find_packages(packages()),
core.find_packages(config.iter_packages()),
)
)
if len(pkgs) == 0:
@ -186,7 +187,9 @@ def file_upload():
):
raise HTTPError(400, f"Bad filename: {uf.raw_filename}")
if not config.overwrite and core.exists(packages.root, uf.raw_filename):
if not config.overwrite and core.exists(
config.package_root, uf.raw_filename
):
log.warning(
f"Cannot upload {uf.raw_filename!r} since it already exists! \n"
" You may start server with `--overwrite` option. "
@ -197,7 +200,7 @@ def file_upload():
" You may start server with `--overwrite` option.",
)
core.store(packages.root, uf.raw_filename, uf.save)
core.store(config.package_root, uf.raw_filename, uf.save)
if request.auth:
user = request.auth[0]
else:
@ -254,7 +257,7 @@ def handle_rpc():
)
response = []
ordering = 0
for p in packages():
for p in config.iter_packages():
if p.pkgname.count(value) > 0:
# We do not presently have any description/summary, returning
# version instead
@ -275,7 +278,7 @@ def handle_rpc():
@app.route("/simple/")
@auth("list")
def simpleindex():
links = sorted(core.get_prefixes(packages()))
links = sorted(core.get_prefixes(config.iter_packages()))
tmpl = """\
<html>
<head>
@ -301,11 +304,11 @@ def simple(prefix=""):
return redirect("/simple/{0}/".format(normalized), 301)
files = sorted(
core.find_packages(packages(), prefix=prefix),
core.find_packages(config.iter_packages(), prefix=prefix),
key=lambda x: (x.parsed_version, x.relfn),
)
if not files:
if config.redirect_to_fallback:
if not config.disable_fallback:
return redirect(f"{config.fallback_url.rstrip('/')}/{prefix}/")
return HTTPError(404, f"Not Found ({normalized} does not exist)\n\n")
@ -338,7 +341,7 @@ def simple(prefix=""):
def list_packages():
fp = request.custom_fullpath
files = sorted(
core.find_packages(packages()),
core.find_packages(config.iter_packages()),
key=lambda x: (os.path.dirname(x.relfn), x.pkgname, x.parsed_version),
)
links = [
@ -364,7 +367,7 @@ def list_packages():
@app.route("/packages/:filename#.*#")
@auth("download")
def server_static(filename):
entries = core.find_packages(packages())
entries = core.find_packages(config.iter_packages())
for x in entries:
f = x.relfn_unix
if f == filename:

@ -6,42 +6,55 @@ the current config logic, but has not yet been integrated.
To add a config option:
- If it should be available for all subcommands (run, update), add it to
the `global_args` parser.
the `add_common_args()` function
- If it should only be available for the `run` command, add it to the
`run_parser`.
`run_parser` in the `get_parser()` function.
- If it should only be available for the `update` command, add it to the
`update_parser`.
`update_parser` in the `get_parser() function`.
- Add it to the appropriate Config class, `_ConfigCommon` for global options,
`RunConfig` for `run` options, and `UpdateConfig` for `update` options.
- This requires adding it as an `__init__()` kwarg, setting it as an instance
attribute in `__init__()`, and ensuring it will be parsed from the argparse
namespace in the `kwargs_from_namespace()` method
- Ensure your config option is tested in `tests/test_config.py`.
The `Config` class provides a `.from_args()` static method, which returns
either a `RunConfig` or an `UpdateConfig`, depending on which subcommand
is specified in the args.
The `Config` class is a factory class only. Config objects do not inherit from
it, but from `_ConfigCommon`. The `Config` provides the following constructors:
- `default_with_overrides(**overrides: Any)`: construct a `RunConfig` (since
run is the default pypiserver action) with default values, applying any
specified overrides
- `from_args(args: Optional[Sequence[str]])`: construct a config from the
provided arguments. Depending on arguments, the config will be either a
`RunConfig` or an `UpdateConfig`
Legacy commandline arguments did not require a subcommand. This form is
still supported, but deprecated. A warning is printing to stderr if
the legacy commandline format is used.
Command line arguments should be parsed as early as possible, using
custom functions like the `auth_*` functions below if needed. For example,
if an option were to take JSON as an argument, that JSON should be parsed
into a dict by the argument parser.
"""
import argparse
import contextlib
import hashlib
import itertools
import io
import itertools
import logging
import pathlib
import pkg_resources
import re
import textwrap
import sys
import textwrap
import typing as t
from distutils.util import strtobool as strtoint
from pypiserver import __version__
# The `passlib` requirement is optional, so we need to verify its import here.
try:
from passlib.apache import HtpasswdFile
except ImportError:
HtpasswdFile = None
from pypiserver import core
# The "strtobool" function in distutils does a nice job at parsing strings,
@ -64,7 +77,7 @@ class DEFAULTS:
LOG_REQ_FRMT = "%(bottle.request)s"
LOG_RES_FRMT = "%(status)s"
LOG_STREAM = sys.stdout
PACKAGE_DIRECTORIES = ["~/packages"]
PACKAGE_DIRECTORIES = [pathlib.Path("~/packages").expanduser().resolve()]
PORT = 8080
SERVER_METHOD = "auto"
@ -86,15 +99,14 @@ def auth_arg(arg: str) -> t.List[str]:
"Invalid authentication options. `.` (no authentication) "
"must be specified alone."
)
return items
# The "." is just an indicator for no auth, so we return an empty auth list
# if it was present.
return [i for i in items if not i == "."]
def hash_algo_arg(arg: str) -> t.Optional[str]:
"""Parse a hash algorithm from the string."""
# The standard hashing algorithms are all made available via fully
# lowercase names, along with (sometimes) variously cased versions
# as well.
arg = arg.lower()
if arg in hashlib.algorithms_available:
return arg
try:
@ -130,9 +142,35 @@ def ignorelist_file_arg(arg: t.Optional[str]) -> t.List[str]:
"""Parse the ignorelist and return the list of ignored files."""
if arg is None or arg == "pypiserver/no-ignores":
return []
with open(arg) as f:
stripped_lines = (ln.strip() for ln in f.readlines())
return [ln for ln in stripped_lines if ln and not ln.startswith("#")]
fpath = pathlib.Path(arg)
if not fpath.exists():
raise argparse.ArgumentTypeError(f"No such ignorelist-file '{arg}'")
try:
lines = (ln.strip() for ln in fpath.read_text().splitlines())
return [ln for ln in lines if ln and not ln.startswith("#")]
except Exception as exc:
raise argparse.ArgumentTypeError(
f"Could not parse ignorelist-file '{arg}': {exc}"
) from exc
def package_directory_arg(arg: str) -> pathlib.Path:
"""Convert any package directory argument into its absolute path."""
pkg_dir = pathlib.Path(arg).expanduser().resolve()
try:
# Attempt to grab the first item from the directory. The directory may
# be empty, in which case we'll get back None, but if the directory does
# not exist or we do not have permission to read it, we can catch th
# OSError and exit with a useful message.
next(pkg_dir.iterdir(), None)
except OSError as exc:
raise argparse.ArgumentTypeError(
"Error: while trying to access package directory "
f"({pkg_dir}): {exc}"
)
return pkg_dir
# We need to capture this at compile time, because we replace sys.stderr
@ -161,6 +199,9 @@ def log_stream_arg(arg: str) -> t.Optional[t.IO]:
def add_common_args(parser: argparse.ArgumentParser) -> None:
"""Add common arguments to a parser."""
# Don't update at top-level to avoid circular imports in __init__
from pypiserver import __version__
parser.add_argument(
"-v",
"--verbose",
@ -237,6 +278,7 @@ def get_parser() -> argparse.ArgumentParser:
"package_directory",
default=DEFAULTS.PACKAGE_DIRECTORIES,
nargs="*",
type=package_directory_arg,
help="The directory from which to serve packages.",
)
@ -249,7 +291,10 @@ def get_parser() -> argparse.ArgumentParser:
)
run_parser.add_argument(
"-i",
"-H",
"--interface",
"--host",
dest="host",
default=DEFAULTS.INTERFACE,
help="Listen on interface INTERFACE (default: 0.0.0.0)",
)
@ -324,7 +369,7 @@ def get_parser() -> argparse.ArgumentParser:
),
type=str.lower,
help=(
"Use METHOD to run th eserver. Valid values include paste, "
"Use METHOD to run the server. Valid values include paste, "
"cherrypy, twisted, gunicorn, gevent, wsgiref, and auto. The "
'default is to use "auto", which chooses one of paste, cherrypy, '
"twisted, or wsgiref."
@ -412,6 +457,7 @@ def get_parser() -> argparse.ArgumentParser:
"package_directory",
default=DEFAULTS.PACKAGE_DIRECTORIES,
nargs="*",
type=package_directory_arg,
help="The directory from which to serve packages.",
)
@ -457,15 +503,83 @@ def get_parser() -> argparse.ArgumentParser:
return parser
TConf = t.TypeVar("TConf", bound="_ConfigCommon")
class _ConfigCommon:
def __init__(self, namespace: argparse.Namespace) -> None:
def __init__(
self,
roots: t.List[pathlib.Path],
verbosity: int,
log_frmt: str,
log_file: t.Optional[str],
log_stream: t.Optional[t.IO],
) -> None:
"""Construct a RuntimeConfig."""
# Global arguments
self.verbosity: int = namespace.verbose
self.log_file: t.Optional[str] = namespace.log_file
self.log_stream: t.Optional[t.IO] = namespace.log_stream
self.log_frmt: str = namespace.log_frmt
self.roots: t.List[str] = namespace.package_directory
self.verbosity = verbosity
self.log_file = log_file
self.log_stream = log_stream
self.log_frmt = log_frmt
self.roots = roots
# Derived properties are directly based on other properties and are not
# included in equality checks.
self._derived_properties: t.Tuple[str, ...] = (
"iter_packages",
"package_root",
)
# The first package directory is considered the root. This is used
# for uploads.
self.package_root = self.roots[0]
@classmethod
def from_namespace(
cls: t.Type[TConf], namespace: argparse.Namespace
) -> TConf:
"""Construct a config from an argparse namespace."""
return cls(**cls.kwargs_from_namespace(namespace))
@staticmethod
def kwargs_from_namespace(
namespace: argparse.Namespace,
) -> t.Dict[str, t.Any]:
"""Convert a namespace into __init__ kwargs for this class."""
return dict(
verbosity=namespace.verbose,
log_file=namespace.log_file,
log_stream=namespace.log_stream,
log_frmt=namespace.log_frmt,
roots=namespace.package_directory,
)
@property
def log_level(self) -> int:
"""Return an appropriate log-level for the config's verbosity."""
levels = {
0: logging.WARNING,
1: logging.INFO,
2: logging.DEBUG,
}
# Return a log-level from warning through not set (log all messages).
# If we've specified 3 or more levels of verbosity, just return not set.
return levels.get(self.verbosity, logging.NOTSET)
def iter_packages(self) -> t.Iterator[core.PkgFile]:
"""Iterate over packages in root directories."""
yield from (
itertools.chain.from_iterable(
core.listdir(str(r)) for r in self.roots
)
)
def with_updates(self: TConf, **kwargs: t.Any) -> TConf:
"""Create a new config with the specified updates.
The current config is used as a base. Any properties not specified in
keyword arguments will remain unchanged.
"""
return self.__class__(**{**dict(self), **kwargs}) # type: ignore
def __repr__(self) -> str:
"""A string representation indicating the class and its properties."""
@ -482,52 +596,176 @@ class _ConfigCommon:
"""Configs are equal if their public values are equal."""
if not isinstance(other, self.__class__):
return False
return all(getattr(other, k) == v for k, v in self) # type: ignore
return all(
getattr(other, k) == v
for k, v in self
if not k in self._derived_properties
)
def __iter__(self) -> t.Iterable[t.Tuple[str, t.Any]]:
def __iter__(self) -> t.Iterator[t.Tuple[str, t.Any]]:
"""Iterate over config (k, v) pairs."""
yield from (
(k, v) for k, v in vars(self).items() if not k.startswith("_")
(k, v)
for k, v in vars(self).items()
if not k.startswith("_") and k not in self._derived_properties
)
class RunConfig(_ConfigCommon):
"""A config for the Run command."""
def __init__(self, namespace: argparse.Namespace) -> None:
def __init__(
self,
port: int,
host: str,
authenticate: t.List[str],
password_file: t.Optional[str],
disable_fallback: bool,
fallback_url: str,
server_method: str,
overwrite: bool,
hash_algo: t.Optional[str],
welcome_msg: str,
cache_control: t.Optional[int],
log_req_frmt: str,
log_res_frmt: str,
log_err_frmt: str,
auther: t.Callable[[str, str], bool] = None,
**kwargs: t.Any,
) -> None:
"""Construct a RuntimeConfig."""
super().__init__(namespace)
self.port: int = namespace.port
self.interface: str = namespace.interface
self.authenticate: t.List[str] = namespace.authenticate
self.password_file: t.Optional[str] = namespace.passwords
self.disable_fallback: bool = namespace.disable_fallback
self.fallback_url: str = namespace.fallback_url
self.server_method: str = namespace.server
self.overwrite: bool = namespace.overwrite
self.hash_algo: t.Optional[str] = namespace.hash_algo
self.welcome_msg: str = namespace.welcome
self.cache_control: t.Optional[int] = namespace.cache_control
self.log_req_frmt: str = namespace.log_req_frmt
self.log_res_frmt: str = namespace.log_res_frmt
self.log_err_frmt: str = namespace.log_err_frmt
super().__init__(**kwargs)
self.port = port
self.host = host
self.authenticate = authenticate
self.password_file = password_file
self.disable_fallback = disable_fallback
self.fallback_url = fallback_url
self.server_method = server_method
self.overwrite = overwrite
self.hash_algo = hash_algo
self.welcome_msg = welcome_msg
self.cache_control = cache_control
self.log_req_frmt = log_req_frmt
self.log_res_frmt = log_res_frmt
self.log_err_frmt = log_err_frmt
# Derived properties
self._derived_properties = self._derived_properties + ("auther",)
self.auther = self.get_auther(auther)
@classmethod
def kwargs_from_namespace(
cls, namespace: argparse.Namespace
) -> t.Dict[str, t.Any]:
"""Convert a namespace into __init__ kwargs for this class."""
return {
**super(RunConfig, cls).kwargs_from_namespace(namespace),
"port": namespace.port,
"host": namespace.host,
"authenticate": namespace.authenticate,
"password_file": namespace.passwords,
"disable_fallback": namespace.disable_fallback,
"fallback_url": namespace.fallback_url,
"server_method": namespace.server,
"overwrite": namespace.overwrite,
"hash_algo": namespace.hash_algo,
"welcome_msg": namespace.welcome,
"cache_control": namespace.cache_control,
"log_req_frmt": namespace.log_req_frmt,
"log_res_frmt": namespace.log_res_frmt,
"log_err_frmt": namespace.log_err_frmt,
}
def get_auther(
self, passed_auther: t.Optional[t.Callable[[str, str], bool]]
) -> t.Callable[[str, str], bool]:
"""Create or retrieve an authentication function."""
# The auther may be specified directly as a kwarg in the API interface
if passed_auther:
return passed_auther
# Otherwise we check to see if we need to authenticate
if self.password_file == "." or self.authenticate == []:
# It's illegal to specify no authentication without also specifying
# no password file, and vice-versa.
if self.password_file != "." or self.authenticate != []:
sys.exit(
"When auth-ops-list is empty (-a=.), password-file"
f" (-P={self.password_file!r}) must also be empty ('.')!"
)
# Return an auther that always returns true.
return lambda _uname, _pw: True
# Now, if there was no password file specified, we can return an auther
# that always returns False, since there is no way to authenticate.
if self.password_file is None:
return lambda _uname, _pw: False
# Finally, if a password file was specified, we'll load it up with
# Htpasswd and return a callable that checks it.
if HtpasswdFile is None:
sys.exit(
"apache.passlib library is not available. You must install "
"pypiserver with the optional 'passlib' dependency (`pip "
"install pypiserver['passlib']`) in order to use password "
"authentication"
)
loaded_pw_file = HtpasswdFile(self.password_file)
# Construct a local closure over the loaded PW file and return as our
# authentication function.
def auther(uname: str, pw: str) -> bool:
loaded_pw_file.load_if_changed()
return loaded_pw_file.check_password(uname, pw)
return auther
class UpdateConfig(_ConfigCommon):
"""A config for the Update command."""
def __init__(self, namespace: argparse.Namespace) -> None:
def __init__(
self,
execute: bool,
download_directory: t.Optional[str],
allow_unstable: bool,
ignorelist: t.List[str],
**kwargs: t.Any,
) -> None:
"""Construct an UpdateConfig."""
super().__init__(namespace)
self.execute: bool = namespace.execute
self.download_directory: t.Optional[str] = namespace.download_directory
self.allow_unstable: bool = namespace.allow_unstable
self.ignorelist: t.List[str] = namespace.ignorelist_file
super().__init__(**kwargs)
self.execute = execute
self.download_directory = download_directory
self.allow_unstable = allow_unstable
self.ignorelist = ignorelist
@classmethod
def kwargs_from_namespace(
cls, namespace: argparse.Namespace
) -> t.Dict[str, t.Any]:
"""Convert a namespace into __init__ kwargs for this class."""
return {
**super(UpdateConfig, cls).kwargs_from_namespace(namespace),
"execute": namespace.execute,
"download_directory": namespace.download_directory,
"allow_unstable": namespace.allow_unstable,
"ignorelist": namespace.ignorelist_file,
}
class Config:
"""Config constructor for building a config from args."""
@classmethod
def default_with_overrides(cls, **overrides: t.Any) -> RunConfig:
"""Construct a RunConfig with default arguments, plus overrides.
Overrides must be valid arguments to the `__init__()` function
of `RunConfig`.
"""
default_config = cls.from_args(["run"])
assert isinstance(default_config, RunConfig)
return default_config.with_updates(**overrides)
@classmethod
def from_args(
cls, args: t.Sequence[str] = None
@ -599,9 +837,9 @@ class Config:
raise
if parsed.cmd == "run":
return RunConfig(parsed)
return RunConfig.from_namespace(parsed)
if parsed.cmd == "update":
return UpdateConfig(parsed)
return UpdateConfig.from_namespace(parsed)
raise SystemExit(parser.format_usage())
@staticmethod
@ -622,26 +860,26 @@ class Config:
# Don't actually update the passed list, in case it was the global
# sys.argv.
args = list(args)
updated_args = list(args)
# Find the update index. For "reasons", python's index search throws
# if the value isn't found, so manually wrap in the usual "return -1
# if not found" standard
try:
update_idx = args.index("-U")
update_idx = updated_args.index("-U")
except ValueError:
update_idx = -1
if update_idx == -1:
# We were a "run" command.
args.insert(insertion_idx, "run")
updated_args.insert(insertion_idx, "run")
else:
# Remove the -U from the args and add the "update" command at the
# start of the arg list.
args.pop(update_idx)
args.insert(insertion_idx, "update")
updated_args.pop(update_idx)
updated_args.insert(insertion_idx, "update")
return args
return updated_args
@contextlib.contextmanager

@ -1,102 +1,18 @@
#! /usr/bin/env python
"""minimal PyPI like server for use with pip/easy_install"""
import functools
import hashlib
import io
import itertools
import logging
import mimetypes
import os
import re
import sys
try: # PY3
from urllib.parse import quote
except ImportError: # PY2
from urllib import quote
import pkg_resources
from pypiserver import Configuration
import typing as t
from urllib.parse import quote
log = logging.getLogger(__name__)
def configure(**kwds):
"""
:return: a 2-tuple (Configure, package-list)
"""
c = Configuration(**kwds)
log.info(f"+++Pypiserver invoked with: {c}")
if c.root is None:
c.root = os.path.expanduser("~/packages")
roots = c.root if isinstance(c.root, (list, tuple)) else [c.root]
roots = [os.path.abspath(r) for r in roots]
for r in roots:
try:
os.listdir(r)
except OSError:
err = sys.exc_info()[1]
sys.exit(f"Error: while trying to list root({r}): {err}")
packages = lambda: itertools.chain(*[listdir(r) for r in roots])
packages.root = roots[0]
if not c.authenticated:
c.authenticated = []
if not callable(c.auther):
if c.password_file and c.password_file != ".":
from passlib.apache import HtpasswdFile
htPsswdFile = HtpasswdFile(c.password_file)
else:
c.password_file = htPsswdFile = None
c.auther = functools.partial(auth_by_htpasswd_file, htPsswdFile)
# Read welcome-msg from external file or failback to the embedded-msg
try:
if not c.welcome_file:
c.welcome_file = "welcome.html"
c.welcome_msg = pkg_resources.resource_string( # @UndefinedVariable
__name__, "welcome.html"
).decode(
"utf-8"
) # @UndefinedVariable
else:
with io.open(c.welcome_file, "r", encoding="utf-8") as fd:
c.welcome_msg = fd.read()
except Exception:
log.warning(
f"Could not load welcome-file({c.welcome_file})!", exc_info=True
)
if c.fallback_url is None:
c.fallback_url = "https://pypi.org/simple"
if c.hash_algo:
try:
halgos = hashlib.algorithms_available
except AttributeError:
halgos = ["md5", "sha1", "sha224", "sha256", "sha384", "sha512"]
if c.hash_algo not in halgos:
sys.exit(f"Hash-algorithm {c.hash_algo} not one of: {halgos}")
log.info(f"+++Pypiserver started with: {c}")
return c, packages
def auth_by_htpasswd_file(htPsswdFile, username, password):
"""The default ``config.auther``."""
if htPsswdFile is not None:
htPsswdFile.load_if_changed()
return htPsswdFile.check_password(username, password)
mimetypes.add_type("application/octet-stream", ".egg")
mimetypes.add_type("application/octet-stream", ".whl")
mimetypes.add_type("text/plain", ".asc")
@ -255,7 +171,7 @@ class PkgFile:
return self._fname_and_hash
def _listdir(root):
def _listdir(root: str) -> t.Iterable[PkgFile]:
root = os.path.abspath(root)
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [x for x in dirnames if is_allowed_path(x)]
@ -278,30 +194,6 @@ def _listdir(root):
)
def read_lines(filename):
"""
Read the contents of `filename`, stripping empty lines and '#'-comments.
Return a list of strings, containing the lines of the file.
"""
lines = []
try:
with open(filename) as f:
lines = [
line
for line in (ln.strip() for ln in f.readlines())
if line and not line.startswith("#")
]
except Exception:
log.error(
f'Failed to read package blacklist file "{filename}". '
"Aborting server startup, please fix this."
)
raise
return lines
def find_packages(pkgs, prefix=""):
prefix = normalize_pkgname(prefix)
for x in pkgs:
@ -351,7 +243,7 @@ def _digest_file(fpath, hash_algo):
From http://stackoverflow.com/a/21565932/548792
"""
blocksize = 2 ** 16
digester = getattr(hashlib, hash_algo)()
digester = hashlib.new(hash_algo)
with open(fpath, "rb") as f:
for block in iter(lambda: f.read(blocksize), b""):
digester.update(block)
@ -361,7 +253,7 @@ def _digest_file(fpath, hash_algo):
try:
from .cache import cache_manager
def listdir(root):
def listdir(root: str) -> t.Iterable[PkgFile]:
# root must be absolute path
return cache_manager.listdir(root, _listdir)

@ -169,18 +169,11 @@ def update(pkgset, destdir=None, dry_run=False, stable_only=True):
def update_all_packages(
roots, destdir=None, dry_run=False, stable_only=True, blacklist_file=None
roots, destdir=None, dry_run=False, stable_only=True, ignorelist=None
):
all_packages = itertools.chain(*[core.listdir(r) for r in roots])
skip_packages = set()
if blacklist_file:
skip_packages = set(core.read_lines(blacklist_file))
print(
'Skipping update of blacklisted packages (listed in "{}"): {}'.format(
blacklist_file, ", ".join(sorted(skip_packages))
)
)
skip_packages = set(ignorelist or ())
packages = frozenset(
[pkg for pkg in all_packages if pkg.pkgname not in skip_packages]

@ -7,6 +7,11 @@ universal=1
[egg_info]
tag_build =
[flake8]
max-line-length = 80
per-file-ignores =
**/__init__.py:F401
[mypy]
check_untyped_defs = True
follow_imports = silent

@ -1,5 +1,6 @@
#! /usr/bin/env python3
import re
from pathlib import Path
from setuptools import setup
@ -22,8 +23,15 @@ def read_file(rel_path: str):
def get_version():
locals_ = {}
version_line = re.compile(
r'^[\w =]*__version__ = "\d+\.\d+\.\d+\.?\w*\d*"$'
)
try:
exec(read_file("pypiserver/__init__.py"), locals_)
for ln in filter(
version_line.match,
read_file("pypiserver/__init__.py").splitlines(),
):
exec(ln, locals_)
except (ImportError, RuntimeError):
pass
return locals_["__version__"]

@ -3,6 +3,7 @@
# Builtin imports
import logging
import os
import pathlib
try: # python 3
@ -34,16 +35,15 @@ import tests.test_core as test_core
__main__.init_logging()
@pytest.fixture()
def _app(app):
return app.module
@pytest.fixture
def app(tmpdir):
from pypiserver import app
return app(root=tmpdir.strpath, authenticated=[])
return app(
roots=[pathlib.Path(tmpdir.strpath)],
authenticate=[],
password_file=".",
)
@pytest.fixture
@ -192,14 +192,14 @@ def test_favicon(testapp):
testapp.get("/favicon.ico", status=404)
def test_fallback(root, _app, testapp):
assert _app.config.redirect_to_fallback
def test_fallback(testapp):
assert not testapp.app._pypiserver_config.disable_fallback
resp = testapp.get("/simple/pypiserver/", status=302)
assert resp.headers["Location"] == "https://pypi.org/simple/pypiserver/"
def test_no_fallback(root, _app, testapp):
_app.config.redirect_to_fallback = False
def test_no_fallback(testapp):
testapp.app._pypiserver_config.disable_fallback = True
testapp.get("/simple/pypiserver/", status=404)
@ -413,8 +413,8 @@ def test_simple_index_list_name_with_underscore_no_egg(root, testapp):
assert hrefs == {"foo-bar/"}
def test_no_cache_control_set(root, _app, testapp):
assert not _app.config.cache_control
def test_no_cache_control_set(root, testapp):
assert not testapp.app._pypiserver_config.cache_control
root.join("foo_bar-1.0.tar.gz").write("")
resp = testapp.get("/packages/foo_bar-1.0.tar.gz")
assert "Cache-Control" not in resp.headers
@ -431,13 +431,13 @@ def test_cache_control_set(root):
assert resp.headers["Cache-Control"] == f"public, max-age={AGE}"
def test_upload_noAction(root, testapp):
def test_upload_noAction(testapp):
resp = testapp.post("/", expect_errors=1)
assert resp.status == "400 Bad Request"
assert "Missing ':action' field!" in unescape(resp.text)
def test_upload_badAction(root, testapp):
def test_upload_badAction(testapp):
resp = testapp.post("/", params={":action": "BAD"}, expect_errors=1)
assert resp.status == "400 Bad Request"
assert "Unsupported ':action' field: BAD" in unescape(resp.text)

@ -10,6 +10,12 @@ import pytest
from pypiserver.config import DEFAULTS, Config, RunConfig, UpdateConfig
FILE_DIR = pathlib.Path(__file__).parent.resolve()
# Username and password stored in the htpasswd.a.a test file.
HTPASS_TEST_FILE = str(FILE_DIR / "htpasswd.a.a")
HTPASS_TEST_USER = "a"
HTPASS_TEST_PASS = "a"
TEST_WELCOME_FILE = str(pathlib.Path(__file__).parent / "sample_msg.html")
TEST_IGNORELIST_FILE = str(pathlib.Path(__file__).parent / "test-ignorelist")
@ -26,7 +32,9 @@ class ConfigTestCase(t.NamedTuple):
exp_config_type: t.Type
# Expected values in the config. These don't necessarily need to be
# exclusive. Instead, they should just look at the attributes relevant
# to the test case at hand.
# to the test case at hand. A special "_test" key, if present, should
# map to a function that takes the config as an argument. If this
# returns a falsey value, the test will be failed.
exp_config_values: t.Dict[str, t.Any]
@ -100,27 +108,37 @@ _CONFIG_TEST_PARAMS: t.Tuple[ConfigTestCase, ...] = (
),
*generate_subcommand_test_cases(
case="single package directory specified",
extra_args=["foo"],
exp_config_values={"roots": ["foo"]},
extra_args=[str(FILE_DIR)],
exp_config_values={"roots": [FILE_DIR]},
),
*generate_subcommand_test_cases(
case="multiple package directory specified",
extra_args=["foo", "bar"],
exp_config_values={"roots": ["foo", "bar"]},
extra_args=[str(FILE_DIR), str(FILE_DIR.parent)],
exp_config_values={
"roots": [
FILE_DIR,
FILE_DIR.parent,
]
},
),
ConfigTestCase(
case="update with package directory (out-of-order legacy order)",
args=["update", "foo"],
legacy_args=["foo", "-U"],
args=["update", str(FILE_DIR)],
legacy_args=[str(FILE_DIR), "-U"],
exp_config_type=UpdateConfig,
exp_config_values={"roots": ["foo"]},
exp_config_values={"roots": [FILE_DIR]},
),
ConfigTestCase(
case="update with multiple package directories (weird ordering)",
args=["update", "foo", "bar"],
legacy_args=["foo", "-U", "bar"],
args=["update", str(FILE_DIR), str(FILE_DIR.parent)],
legacy_args=[str(FILE_DIR), "-U", str(FILE_DIR.parent)],
exp_config_type=UpdateConfig,
exp_config_values={"roots": ["foo", "bar"]},
exp_config_values={
"roots": [
FILE_DIR,
FILE_DIR.parent,
]
},
),
# verbosity
*(
@ -210,21 +228,35 @@ _CONFIG_TEST_PARAMS: t.Tuple[ConfigTestCase, ...] = (
args=["run"],
legacy_args=[],
exp_config_type=RunConfig,
exp_config_values={"interface": DEFAULTS.INTERFACE},
exp_config_values={"host": DEFAULTS.INTERFACE},
),
ConfigTestCase(
case="Run: interface specified",
args=["run", "-i", "1.1.1.1"],
legacy_args=["-i", "1.1.1.1"],
exp_config_type=RunConfig,
exp_config_values={"interface": "1.1.1.1"},
exp_config_values={"host": "1.1.1.1"},
),
ConfigTestCase(
case="Run: interface specified (long form)",
args=["run", "--interface", "1.1.1.1"],
legacy_args=["--interface", "1.1.1.1"],
exp_config_type=RunConfig,
exp_config_values={"interface": "1.1.1.1"},
exp_config_values={"host": "1.1.1.1"},
),
ConfigTestCase(
case="Run: host specified",
args=["run", "-H", "1.1.1.1"],
legacy_args=["-H", "1.1.1.1"],
exp_config_type=RunConfig,
exp_config_values={"host": "1.1.1.1"},
),
ConfigTestCase(
case="Run: host specified (long form)",
args=["run", "--host", "1.1.1.1"],
legacy_args=["--host", "1.1.1.1"],
exp_config_type=RunConfig,
exp_config_values={"host": "1.1.1.1"},
),
# authenticate
ConfigTestCase(
@ -250,10 +282,14 @@ _CONFIG_TEST_PARAMS: t.Tuple[ConfigTestCase, ...] = (
),
ConfigTestCase(
case="Run: authenticate specified with dot",
args=["run", "-a", "."],
legacy_args=["-a", "."],
# both auth and pass must be specified as empty if one of them is empty.
args=["run", "-a", ".", "-P", "."],
legacy_args=["-a", ".", "-P", "."],
exp_config_type=RunConfig,
exp_config_values={"authenticate": ["."]},
exp_config_values={
"authenticate": [],
"_test": lambda conf: bool(conf.auther("foo", "bar")) is True,
},
),
# passwords
ConfigTestCase(
@ -265,17 +301,42 @@ _CONFIG_TEST_PARAMS: t.Tuple[ConfigTestCase, ...] = (
),
ConfigTestCase(
"Run: passwords file specified",
args=["run", "-P", "foo"],
legacy_args=["-P", "foo"],
args=["run", "-P", HTPASS_TEST_FILE],
legacy_args=["-P", HTPASS_TEST_FILE],
exp_config_type=RunConfig,
exp_config_values={"password_file": "foo"},
exp_config_values={
"password_file": HTPASS_TEST_FILE,
"_test": lambda conf: (
bool(conf.auther("foo", "bar")) is False
and bool(conf.auther("a", "a")) is True
),
},
),
ConfigTestCase(
"Run: passwords file specified (long-form)",
args=["run", "--passwords", "foo"],
legacy_args=["--passwords", "foo"],
args=["run", "--passwords", HTPASS_TEST_FILE],
legacy_args=["--passwords", HTPASS_TEST_FILE],
exp_config_type=RunConfig,
exp_config_values={"password_file": "foo"},
exp_config_values={
"password_file": HTPASS_TEST_FILE,
"_test": (
lambda conf: (
bool(conf.auther("foo", "bar")) is False
and conf.auther("a", "a") is True
)
),
},
),
ConfigTestCase(
"Run: passwords file empty ('.')",
# both auth and pass must be specified as empty if one of them is empty.
args=["run", "-P", ".", "-a", "."],
legacy_args=["-P", ".", "-a", "."],
exp_config_type=RunConfig,
exp_config_values={
"password_file": ".",
"_test": lambda conf: bool(conf.auther("foo", "bar")) is True,
},
),
# disable-fallback
ConfigTestCase(

@ -110,25 +110,6 @@ def test_listdir_bad_name(tmpdir):
assert res == []
def test_read_lines(tmpdir):
filename = "pkg_blacklist"
file_contents = (
"# Names of private packages that we don't want to upgrade\n"
"\n"
"my_private_pkg \n"
" \t# This is a comment with starting space and tab\n"
" my_other_private_pkg"
)
f = tmpdir.join(filename).ensure()
f.write(file_contents)
assert core.read_lines(f.strpath) == [
"my_private_pkg",
"my_other_private_pkg",
]
hashes = (
# empty-sha256
(

@ -1,32 +1,26 @@
"""
Test module for . . .
Test module for app initialization
"""
# Standard library imports
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
import logging
from os.path import abspath, dirname, join, realpath
from sys import path
import pathlib
import typing as t
# Third party imports
import pytest
# Local imports
import pypiserver
logger = logging.getLogger(__name__)
test_dir = realpath(dirname(__file__))
src_dir = abspath(join(test_dir, ".."))
path.append(src_dir)
print(path)
import pypiserver
TEST_DIR = pathlib.Path(__file__).parent
HTPASS_FILE = TEST_DIR / "htpasswd.a.a"
WELCOME_FILE = TEST_DIR / "sample_msg.html"
# TODO: make these tests meaningful
@pytest.mark.parametrize(
"conf_options",
[
@ -35,22 +29,77 @@ import pypiserver
{
"root": "~/unstable_packages",
"authenticated": "upload",
"passwords": "~/htpasswd",
"passwords": str(HTPASS_FILE),
},
# Verify that the strip parser works properly.
{"authenticated": str("upload")},
],
)
def test_paste_app_factory(conf_options, monkeypatch):
def test_paste_app_factory(conf_options: dict) -> None:
"""Test the paste_app_factory method"""
monkeypatch.setattr(
"pypiserver.core.configure", lambda **x: (x, [x.keys()])
)
pypiserver.paste_app_factory({}, **conf_options)
pypiserver.paste_app_factory({}, **conf_options) # type: ignore
def test_app_factory(monkeypatch):
monkeypatch.setattr(
"pypiserver.core.configure", lambda **x: (x, [x.keys()])
)
def test_app_factory() -> None:
assert pypiserver.app() is not pypiserver.app()
@pytest.mark.parametrize(
"incoming, updated",
(
(
{"authenticated": []},
{"authenticate": []},
),
(
{"passwords": "./foo"},
{"password_file": "./foo"},
),
(
{"root": str(TEST_DIR)},
{"roots": [TEST_DIR.expanduser().resolve()]},
),
(
{"root": [str(TEST_DIR), str(TEST_DIR)]},
{
"roots": [
TEST_DIR.expanduser().resolve(),
TEST_DIR.expanduser().resolve(),
]
},
),
(
{"redirect_to_fallback": False},
{"disable_fallback": True},
),
(
{"server": "auto"},
{"server_method": "auto"},
),
(
{"welcome_file": str(WELCOME_FILE.resolve())},
{"welcome_msg": WELCOME_FILE.read_text()},
),
),
)
def test_backwards_compat_kwargs_conversion(
incoming: t.Dict[str, t.Any], updated: t.Dict[str, t.Any]
) -> None:
"""Test converting legacy kwargs to modern ones."""
assert pypiserver.backwards_compat_kwargs(incoming) == updated
@pytest.mark.parametrize(
"kwargs",
(
{"redirect_to_fallback": False, "disable_fallback": False},
{"disable_fallback": False, "redirect_to_fallback": False},
),
)
def test_backwards_compat_kwargs_duplicate_check(
kwargs: t.Dict[str, t.Any]
) -> None:
"""Duplicate legacy and modern kwargs cause an error."""
with pytest.raises(ValueError) as err:
pypiserver.backwards_compat_kwargs(kwargs)
assert "('redirect_to_fallback', 'disable_fallback')" in str(err.value)

@ -1,22 +1,38 @@
#! /usr/bin/env py.test
import logging
import os
import pathlib
import sys
import typing as t
from unittest import mock
import pytest
import sys, os, pytest, logging
from pypiserver import __main__
from pypiserver.bottle import Bottle
try:
from unittest import mock
except ImportError:
import mock
THIS_DIR = pathlib.Path(__file__).parent
HTPASS_FILE = THIS_DIR / "htpasswd.a.a"
IGNORELIST_FILE = THIS_DIR / "test-ignorelist"
class main_wrapper:
app: t.Optional[Bottle]
run_kwargs: t.Optional[dict]
update_args: t.Optional[tuple]
update_kwargs: t.Optional[dict]
def __init__(self):
self.app = None
self.run_kwargs = None
self.pkgdir = None
self.update_args = None
self.update_kwargs = None
def __call__(self, argv):
sys.stdout.write(f"Running {argv}\n")
__main__.main(["pypi-server"] + argv)
# always sets the package directory to this directory, regardless of
# other passed args.
__main__.main([str(THIS_DIR)] + argv)
return self.run_kwargs
@ -31,24 +47,25 @@ def main(monkeypatch):
main.app = app
main.run_kwargs = kwargs
def listdir(pkgdir):
main.pkgdir = pkgdir
return []
def update(*args, **kwargs):
main.update_args = args
main.update_kwargs = kwargs
monkeypatch.setattr("pypiserver.bottle.run", run)
monkeypatch.setattr("os.listdir", listdir)
monkeypatch.setattr("pypiserver.manage.update_all_packages", update)
return main
def test_default_pkgdir(main):
main([])
assert os.path.normpath(main.pkgdir) == os.path.normpath(
os.path.expanduser("~/packages")
)
assert main.app._pypiserver_config.roots == [THIS_DIR]
def test_noargs(main):
# Assert we're calling with the default host, port, and server, and
# assume that we've popped `app` off of the bottle args in our `main`
# fixture.
assert main([]) == {"host": "0.0.0.0", "port": 8080, "server": "auto"}
@ -64,59 +81,53 @@ def test_server(main):
assert main(["--server", "cherrypy"])["server"] == "cherrypy"
def test_root(main):
main(["--root", "."])
assert main.app.module.packages.root == os.path.abspath(".")
assert main.pkgdir == os.path.abspath(".")
def test_root_r(main):
main(["-r", "."])
assert main.app.module.packages.root == os.path.abspath(".")
assert main.pkgdir == os.path.abspath(".")
# def test_root_multiple(main):
# pytest.raises(SystemExit, main, [".", "."])
# pytest.raises(SystemExit, main, ["-r", ".", "."])
def test_root_multiple(main):
# Remember we're already setting THIS_DIR as a root in the `main` fixture
main([str(THIS_DIR.parent)])
assert main.app._pypiserver_config.roots == [
THIS_DIR,
THIS_DIR.parent,
]
def test_fallback_url(main):
main(["--fallback-url", "https://pypi.mirror/simple"])
assert main.app.module.config.fallback_url == "https://pypi.mirror/simple"
assert (
main.app._pypiserver_config.fallback_url == "https://pypi.mirror/simple"
)
def test_fallback_url_default(main):
main([])
assert main.app.module.config.fallback_url == "https://pypi.org/simple"
assert (
main.app._pypiserver_config.fallback_url == "https://pypi.org/simple/"
)
def test_hash_algo_default(main):
main([])
assert main.app.module.config.hash_algo == "md5"
assert main.app._pypiserver_config.hash_algo == "md5"
def test_hash_algo(main):
main(["--hash-algo=sha256"])
assert main.app.module.config.hash_algo == "sha256"
assert main.app._pypiserver_config.hash_algo == "sha256"
def test_hash_algo_off(main):
main(["--hash-algo=off"])
assert main.app.module.config.hash_algo is None
assert main.app._pypiserver_config.hash_algo is None
main(["--hash-algo=0"])
assert main.app.module.config.hash_algo is None
assert main.app._pypiserver_config.hash_algo is None
main(["--hash-algo=no"])
assert main.app.module.config.hash_algo is None
assert main.app._pypiserver_config.hash_algo is None
main(["--hash-algo=false"])
assert main.app.module.config.hash_algo is None
assert main.app._pypiserver_config.hash_algo is None
def test_hash_algo_BAD(main):
with pytest.raises(SystemExit) as excinfo:
main(["--hash-algo BAD"])
# assert excinfo.value.message == 'some info' main(['--hash-algo BAD'])
print(excinfo)
def test_logging(main, tmpdir):
@ -175,19 +186,19 @@ def test_init_logging_with_none_stream_doesnt_add_stream_handler(dummy_logger):
def test_welcome_file(main):
sample_msg_file = os.path.join(os.path.dirname(__file__), "sample_msg.html")
main(["--welcome", sample_msg_file])
assert "Hello pypiserver tester!" in main.app.module.config.welcome_msg
assert "Hello pypiserver tester!" in main.app._pypiserver_config.welcome_msg
def test_welcome_file_default(main):
main([])
assert "Welcome to pypiserver!" in main.app.module.config.welcome_msg
assert "Welcome to pypiserver!" in main.app._pypiserver_config.welcome_msg
def test_password_without_auth_list(main, monkeypatch):
sysexit = mock.MagicMock(side_effect=ValueError("BINGO"))
monkeypatch.setattr("sys.exit", sysexit)
with pytest.raises(ValueError) as ex:
main(["-P", "pswd-file", "-a", ""])
main(["-P", str(HTPASS_FILE), "-a", ""])
assert ex.value.args[0] == "BINGO"
with pytest.raises(ValueError) as ex:
@ -205,16 +216,13 @@ def test_password_without_auth_list(main, monkeypatch):
def test_password_alone(main, monkeypatch):
monkeypatch.setitem(sys.modules, "passlib", mock.MagicMock())
monkeypatch.setitem(sys.modules, "passlib.apache", mock.MagicMock())
main(["-P", "pswd-file"])
assert main.app.module.config.authenticated == ["update"]
main(["-P", str(HTPASS_FILE)])
assert main.app._pypiserver_config.authenticate == ["update"]
def test_dot_password_without_auth_list(main, monkeypatch):
main(["-P", ".", "-a", ""])
assert main.app.module.config.authenticated == []
main(["-P", ".", "-a", "."])
assert main.app.module.config.authenticated == []
assert main.app._pypiserver_config.authenticate == []
def test_blacklist_file(main):
@ -222,5 +230,5 @@ def test_blacklist_file(main):
Test that calling the app with the --blacklist-file argument does not
throw a getopt error
"""
blacklist_file = "/root/pkg_blacklist"
main(["--blacklist-file", blacklist_file])
main(["-U", "--blacklist-file", str(IGNORELIST_FILE)])
assert main.update_kwargs["ignorelist"] == ["mypiserver", "something"]

@ -221,23 +221,20 @@ def test_update_all_packages(monkeypatch):
return roots_mock.get(directory, [])
monkeypatch.setattr(manage.core, "listdir", core_listdir_mock)
monkeypatch.setattr(manage.core, "read_lines", Mock(return_value=[]))
monkeypatch.setattr(manage, "update", Mock(return_value=None))
destdir = None
dry_run = False
stable_only = True
blacklist_file = None
update_all_packages(
roots=list(roots_mock.keys()),
destdir=destdir,
dry_run=dry_run,
stable_only=stable_only,
blacklist_file=blacklist_file,
ignorelist=None,
)
manage.core.read_lines.assert_not_called() # pylint: disable=no-member
manage.update.assert_called_once_with( # pylint: disable=no-member
frozenset([public_pkg_1, public_pkg_2, private_pkg_1, private_pkg_2]),
destdir,
@ -246,7 +243,7 @@ def test_update_all_packages(monkeypatch):
)
def test_update_all_packages_with_blacklist(monkeypatch):
def test_update_all_packages_with_ignorelist(monkeypatch):
"""Test calling update_all_packages()"""
public_pkg_1 = PkgFile("Flask", "1.0")
public_pkg_2 = PkgFile("requests", "1.0")
@ -265,29 +262,20 @@ def test_update_all_packages_with_blacklist(monkeypatch):
return roots_mock.get(directory, [])
monkeypatch.setattr(manage.core, "listdir", core_listdir_mock)
monkeypatch.setattr(
manage.core,
"read_lines",
Mock(return_value=["my_private_pkg", "my_other_private_pkg"]),
)
monkeypatch.setattr(manage, "update", Mock(return_value=None))
destdir = None
dry_run = False
stable_only = True
blacklist_file = "/root/pkg_blacklist"
update_all_packages(
roots=list(roots_mock.keys()),
destdir=destdir,
dry_run=dry_run,
stable_only=stable_only,
blacklist_file=blacklist_file,
ignorelist=["my_private_pkg", "my_other_private_pkg"],
)
manage.update.assert_called_once_with( # pylint: disable=no-member
frozenset([public_pkg_1, public_pkg_2]), destdir, dry_run, stable_only
)
manage.core.read_lines.assert_called_once_with(
blacklist_file
) # pylint: disable=no-member

15
tox.ini

@ -1,27 +1,28 @@
[tox]
envlist = py36,py37,py38,pypy3
envlist = py36, py37, py38, pypy3
[testenv]
deps=-r{toxinidir}/requirements/dev.pip
allowlist_externals=
/bin/sh
mypy
which
sitepackages=False
[testenv::py]
[testenv:py{36,37,38}]
commands=
/bin/sh -c "{env:PYPISERVER_SETUP_CMD:true}"
# individual mypy files for now, until we get the rest
# of the project typechecking
mypy pypiserver/config.py
pytest --cov=pypiserver []
mypy \
pypiserver/config.py \
tests/test_init.py
pytest --cov=pypiserver {posargs}
[testenv::pypy]
[testenv:pypy3]
commands=
/bin/sh -c "{env:PYPISERVER_SETUP_CMD:true}"
# no mypy in pypy
pytest --cov=pypiserver []
pytest --cov=pypiserver {posargs}
[pytest]