1022 lines
36 KiB
Python
1022 lines
36 KiB
Python
"""Pypiserver configuration management.
|
|
|
|
NOTE: THIS CONFIG IS NOT YET IN USE. It is the intended replacement for
|
|
the current config logic, but has not yet been integrated.
|
|
|
|
To add a config option:
|
|
|
|
- If it should be available for all subcommands (run, update), add it to
|
|
the `add_common_args()` function
|
|
- If it should only be available for the `run` command, add it to the
|
|
`run_parser` in the `get_parser()` function.
|
|
- If it should only be available for the `update` command, add it to the
|
|
`update_parser` in the `get_parser() function`.
|
|
- Add it to the appropriate Config class, `_ConfigCommon` for global options,
|
|
`RunConfig` for `run` options, and `UpdateConfig` for `update` options.
|
|
- This requires adding it as an `__init__()` kwarg, setting it as an instance
|
|
attribute in `__init__()`, and ensuring it will be parsed from the argparse
|
|
namespace in the `kwargs_from_namespace()` method
|
|
- Ensure your config option is tested in `tests/test_config.py`.
|
|
|
|
The `Config` class is a factory class only. Config objects do not inherit from
|
|
it, but from `_ConfigCommon`. The `Config` provides the following constructors:
|
|
|
|
- `default_with_overrides(**overrides: Any)`: construct a `RunConfig` (since
|
|
run is the default pypiserver action) with default values, applying any
|
|
specified overrides
|
|
- `from_args(args: Optional[Sequence[str]])`: construct a config from the
|
|
provided arguments. Depending on arguments, the config will be either a
|
|
`RunConfig` or an `UpdateConfig`
|
|
|
|
Legacy commandline arguments did not require a subcommand. This form is
|
|
still supported, but deprecated. A warning is printing to stderr if
|
|
the legacy commandline format is used.
|
|
"""
|
|
|
|
import argparse
|
|
import contextlib
|
|
import hashlib
|
|
import io
|
|
import logging
|
|
import pathlib
|
|
import re
|
|
import sys
|
|
import textwrap
|
|
import typing as t
|
|
|
|
try:
|
|
# `importlib_resources` is required for Python versions below 3.12
|
|
# See more in the package docs: https://pypi.org/project/importlib-resources/
|
|
try:
|
|
from importlib_resources import files as import_files
|
|
except ImportError:
|
|
from importlib.resources import files as import_files
|
|
|
|
def get_resource_bytes(package: str, resource: str) -> bytes:
|
|
ref = import_files(package).joinpath(resource)
|
|
return ref.read_bytes()
|
|
|
|
except ImportError:
|
|
# The `pkg_resources` is deprecated in Python 3.12
|
|
import pkg_resources
|
|
|
|
def get_resource_bytes(package: str, resource: str) -> bytes:
|
|
return pkg_resources.resource_string(package, resource)
|
|
|
|
|
|
from pypiserver.backend import (
|
|
SimpleFileBackend,
|
|
CachingFileBackend,
|
|
Backend,
|
|
IBackend,
|
|
get_file_backend,
|
|
BackendProxy,
|
|
)
|
|
|
|
# The `passlib` requirement is optional, so we need to verify its import here.
|
|
try:
|
|
from passlib.apache import HtpasswdFile
|
|
except ImportError:
|
|
HtpasswdFile = None
|
|
|
|
|
|
def legacy_strtoint(val: str) -> int:
|
|
"""Convert a string representation of truth to true (1) or false (0).
|
|
|
|
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
|
|
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
|
|
'val' is anything else.
|
|
|
|
The "strtobool" function in distutils does a nice job at parsing strings,
|
|
but returns an integer. This just wraps it in a boolean call so that we
|
|
get a bool.
|
|
|
|
Borrowed from deprecated distutils.
|
|
"""
|
|
val = val.lower()
|
|
if val in ("y", "yes", "t", "true", "on", "1"):
|
|
return 1
|
|
elif val in ("n", "no", "f", "false", "off", "0"):
|
|
return 0
|
|
else:
|
|
raise ValueError("invalid truth value {!r}".format(val))
|
|
|
|
|
|
strtobool: t.Callable[[str], bool] = lambda val: bool(legacy_strtoint(val))
|
|
|
|
|
|
# Specify defaults here so that we can use them in tests &c. and not need
|
|
# to update things in multiple places if a default changes.
|
|
class DEFAULTS:
|
|
"""Config defaults."""
|
|
|
|
AUTHENTICATE = ["update"]
|
|
FALLBACK_URL = "https://pypi.org/simple/"
|
|
HEALTH_ENDPOINT = "/health"
|
|
HASH_ALGO = "sha256"
|
|
INTERFACE = "0.0.0.0"
|
|
LOG_FRMT = "%(asctime)s|%(name)s|%(levelname)s|%(thread)d|%(message)s"
|
|
LOG_ERR_FRMT = "%(body)s: %(exception)s \n%(traceback)s"
|
|
LOG_REQ_FRMT = "%(bottle.request)s"
|
|
LOG_RES_FRMT = "%(status)s"
|
|
LOG_STREAM = sys.stdout
|
|
PACKAGE_DIRECTORIES = [pathlib.Path("~/packages").expanduser().resolve()]
|
|
PORT = 8080
|
|
SERVER_METHOD = "auto"
|
|
BACKEND = "auto"
|
|
|
|
|
|
def auth_arg(arg: str) -> t.List[str]:
|
|
"""Parse the authentication argument."""
|
|
# Split on commas, remove duplicates, remove whitespace, ensure lowercase.
|
|
# Sort so that they'll have a consistent ordering.
|
|
items = sorted(list(set(i.strip().lower() for i in arg.split(","))))
|
|
# Throw for any invalid options
|
|
if any(i not in ("download", "list", "update", ".") for i in items):
|
|
raise ValueError(
|
|
"Invalid authentication option. Valid values are download, list, "
|
|
"and update, or . (for no authentication)."
|
|
)
|
|
# The "no authentication" option must be specified in isolation.
|
|
if "." in items and len(items) > 1:
|
|
raise argparse.ArgumentTypeError(
|
|
"Invalid authentication options. `.` (no authentication) "
|
|
"must be specified alone."
|
|
)
|
|
|
|
# The "." is just an indicator for no auth, so we return an empty auth list
|
|
# if it was present.
|
|
return [i for i in items if not i == "."]
|
|
|
|
|
|
def hash_algo_arg(arg: str) -> t.Optional[str]:
|
|
"""Parse a hash algorithm from the string."""
|
|
if arg in hashlib.algorithms_available:
|
|
return arg
|
|
try:
|
|
if not strtobool(arg):
|
|
return None
|
|
except ValueError:
|
|
# strtobool raises if the string doesn't seem like a truthiness-
|
|
# indicating string. We do want to raise in this case, but we want
|
|
# to raise our own custom message rather than raising the ValueError
|
|
# raised by strtobool.
|
|
pass
|
|
# At this point we either had an invalid hash algorithm or a truthy string
|
|
# like 'yes' or 'true'. In either case, we want to throw an error.
|
|
raise argparse.ArgumentTypeError(
|
|
f"Hash algorithm '{arg}' is not available. Please select one "
|
|
f"of {hashlib.algorithms_available}, or turn off hashing by "
|
|
"setting --hash-algo to 'off', '0', or 'false'"
|
|
)
|
|
|
|
|
|
def health_endpoint_arg(arg: str) -> str:
|
|
"""Verify the health_endpoint and raises ValueError if invalid."""
|
|
rule_regex = r"^/[a-z0-9/_-]+$"
|
|
if re.fullmatch(rule_regex, arg, re.I) is not None:
|
|
return arg
|
|
|
|
raise argparse.ArgumentTypeError(
|
|
"Invalid path for the health endpoint. Make sure that it contains only "
|
|
"alphanumeric characters, hyphens, forward slashes and underscores. "
|
|
f"In other words, make sure to match the following regex: {rule_regex}"
|
|
)
|
|
|
|
|
|
def html_file_arg(arg: t.Optional[str]) -> str:
|
|
"""Parse the provided HTML file and return its contents."""
|
|
if arg is None or arg == "pypiserver/welcome.html":
|
|
return get_resource_bytes(__name__, "welcome.html").decode("utf-8")
|
|
with open(arg, "r", encoding="utf-8") as f:
|
|
msg = f.read()
|
|
return msg
|
|
|
|
|
|
def ignorelist_file_arg(arg: t.Optional[str]) -> t.List[str]:
|
|
"""Parse the ignorelist and return the list of ignored files."""
|
|
if arg is None or arg == "pypiserver/no-ignores":
|
|
return []
|
|
|
|
fpath = pathlib.Path(arg)
|
|
if not fpath.exists():
|
|
raise argparse.ArgumentTypeError(f"No such ignorelist-file '{arg}'")
|
|
|
|
try:
|
|
lines = (ln.strip() for ln in fpath.read_text().splitlines())
|
|
return [ln for ln in lines if ln and not ln.startswith("#")]
|
|
except Exception as exc:
|
|
raise argparse.ArgumentTypeError(
|
|
f"Could not parse ignorelist-file '{arg}': {exc}"
|
|
) from exc
|
|
|
|
|
|
def package_directory_arg(arg: str) -> pathlib.Path:
|
|
"""Convert any package directory argument into its absolute path."""
|
|
pkg_dir = pathlib.Path(arg).expanduser().resolve()
|
|
try:
|
|
# Attempt to grab the first item from the directory. The directory may
|
|
# be empty, in which case we'll get back None, but if the directory does
|
|
# not exist or we do not have permission to read it, we can catch th
|
|
# OSError and exit with a useful message.
|
|
next(pkg_dir.iterdir(), None)
|
|
except OSError as exc:
|
|
raise argparse.ArgumentTypeError(
|
|
"Error: while trying to access package directory "
|
|
f"({pkg_dir}): {exc}"
|
|
)
|
|
return pkg_dir
|
|
|
|
|
|
# We need to capture this at compile time, because we replace sys.stderr
|
|
# during config parsing in order to better control error output when we
|
|
# encounter legacy cmdline arguments.
|
|
_ORIG_STDERR = sys.stderr
|
|
|
|
|
|
def log_stream_arg(arg: str) -> t.Optional[t.IO]:
|
|
"""Parse the log-stream argument."""
|
|
lower = arg.lower()
|
|
# Convert a `none` string to a real none.
|
|
val = lower if lower != "none" else None
|
|
# Ensure the remaining value is a valid stream type, and return it.
|
|
if val is None:
|
|
return val
|
|
if val == "stdout":
|
|
return sys.stdout
|
|
if val == "stderr":
|
|
return _ORIG_STDERR
|
|
raise argparse.ArgumentTypeError(
|
|
"Invalid option for --log-stream. Value must be one of stdout, "
|
|
"stderr, or none."
|
|
)
|
|
|
|
|
|
def add_common_args(parser: argparse.ArgumentParser) -> None:
|
|
"""Add common arguments to a parser."""
|
|
# Don't update at top-level to avoid circular imports in __init__
|
|
from pypiserver import __version__
|
|
|
|
parser.add_argument(
|
|
"-v",
|
|
"--verbose",
|
|
action="count",
|
|
default=0,
|
|
help="Enable verbose logging; repeat for more verbosity.",
|
|
)
|
|
parser.add_argument(
|
|
"--log-file",
|
|
metavar="FILE",
|
|
help=(
|
|
"Write logging info into this FILE, as well as to stdout or "
|
|
"stderr, if configured."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--log-stream",
|
|
metavar="STREAM",
|
|
default=DEFAULTS.LOG_STREAM,
|
|
type=log_stream_arg,
|
|
help=(
|
|
"Log messages to the specified STREAM. Valid values are stdout, "
|
|
"stderr, and none"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--log-frmt",
|
|
metavar="FORMAT",
|
|
default=DEFAULTS.LOG_FRMT,
|
|
help=(
|
|
"The logging format-string. (see `logging.LogRecord` class from "
|
|
"standard python library)"
|
|
),
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--hash-algo",
|
|
default=DEFAULTS.HASH_ALGO,
|
|
type=hash_algo_arg,
|
|
help=(
|
|
"Any `hashlib` available algorithm to use for generating fragments "
|
|
"on package links. Can be disabled with one of (0, no, off, false)."
|
|
),
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--backend",
|
|
default=DEFAULTS.BACKEND,
|
|
choices=("auto", "simple-dir", "cached-dir"),
|
|
dest="backend_arg",
|
|
help=(
|
|
"A backend implementation. Keep the default 'auto' to automatically"
|
|
" determine whether to activate caching or not"
|
|
),
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--version",
|
|
action="version",
|
|
version=__version__,
|
|
)
|
|
|
|
|
|
def get_parser() -> argparse.ArgumentParser:
|
|
"""Return an ArgumentParser."""
|
|
parser = argparse.ArgumentParser(
|
|
description=(
|
|
"start PyPI compatible package server serving packages from "
|
|
"PACKAGES_DIRECTORY. If PACKAGES_DIRECTORY is not given on the "
|
|
"command line, it uses the default ~/packages. pypiserver scans "
|
|
"this directory recursively for packages. It skips packages and "
|
|
"directories starting with a dot. Multiple package directories "
|
|
"may be specified."
|
|
),
|
|
formatter_class=PreserveWhitespaceRawTextHelpFormatter,
|
|
epilog=(
|
|
"Visit https://github.com/pypiserver/pypiserver "
|
|
"for more information\n \n"
|
|
),
|
|
)
|
|
|
|
add_common_args(parser)
|
|
|
|
subparsers = parser.add_subparsers(dest="cmd")
|
|
|
|
run_parser = subparsers.add_parser(
|
|
"run",
|
|
formatter_class=PreserveWhitespaceRawTextHelpFormatter,
|
|
help="Run pypiserver, serving packages from PACKAGES_DIRECTORY",
|
|
)
|
|
|
|
add_common_args(run_parser)
|
|
|
|
run_parser.add_argument(
|
|
"package_directory",
|
|
default=DEFAULTS.PACKAGE_DIRECTORIES,
|
|
nargs="*",
|
|
type=package_directory_arg,
|
|
help="The directory from which to serve packages.",
|
|
)
|
|
|
|
run_parser.add_argument(
|
|
"-p",
|
|
"--port",
|
|
type=int,
|
|
default=DEFAULTS.PORT,
|
|
help="Listen on port PORT (default: 8080)",
|
|
)
|
|
run_parser.add_argument(
|
|
"-i",
|
|
"-H",
|
|
"--interface",
|
|
"--host",
|
|
dest="host",
|
|
default=DEFAULTS.INTERFACE,
|
|
help="Listen on interface INTERFACE (default: 0.0.0.0)",
|
|
)
|
|
run_parser.add_argument(
|
|
"-a",
|
|
"--authenticate",
|
|
default=DEFAULTS.AUTHENTICATE,
|
|
type=auth_arg,
|
|
help=(
|
|
"Comma-separated list of (case-insensitive) actions to "
|
|
"authenticate (options: download, list, update; default: update)."
|
|
"\n\n "
|
|
"Any actions not specified are not authenticated, so to "
|
|
"authenticate downloads and updates, but allow unauthenticated "
|
|
"viewing of the package list, you would use: "
|
|
"\n\n"
|
|
" pypi-server -a 'download, update' -P ./my_passwords.htaccess"
|
|
"\n\n"
|
|
"To disable authentication, use:"
|
|
"\n\n"
|
|
" pypi-server -a . -P ."
|
|
"\n\n"
|
|
"See the `-P` option for configuring users and passwords. "
|
|
"\n\n"
|
|
"Note that when uploads are not protected, the `register` command "
|
|
"is not necessary, but `~/.pypirc` still needs username and "
|
|
"password fields, even if bogus."
|
|
),
|
|
)
|
|
run_parser.add_argument(
|
|
"-P",
|
|
"--passwords",
|
|
metavar="PASSWORD_FILE",
|
|
help=(
|
|
"Use an apache htpasswd file PASSWORD_FILE to set usernames and "
|
|
"passwords for authentication."
|
|
"\n\n"
|
|
"To allow unauthorized access, use:"
|
|
"\n\n"
|
|
" pypi-server -a . -P ."
|
|
"\n\n"
|
|
),
|
|
)
|
|
run_parser.add_argument(
|
|
"--disable-fallback",
|
|
action="store_true",
|
|
help=(
|
|
"Disable the default redirect to PyPI for packages not found in "
|
|
"the local index."
|
|
),
|
|
)
|
|
run_parser.add_argument(
|
|
"--fallback-url",
|
|
default=DEFAULTS.FALLBACK_URL,
|
|
help=(
|
|
"Redirect to FALLBACK_URL for packages not found in the local "
|
|
"index."
|
|
),
|
|
)
|
|
run_parser.add_argument(
|
|
"--health-endpoint",
|
|
default=DEFAULTS.HEALTH_ENDPOINT,
|
|
type=health_endpoint_arg,
|
|
help=(
|
|
"Configure a custom liveness endpoint. It always returns 200 Ok if "
|
|
"the service is up. Otherwise, it means that the service is not responsive."
|
|
),
|
|
)
|
|
run_parser.add_argument(
|
|
"--server",
|
|
metavar="METHOD",
|
|
default=DEFAULTS.SERVER_METHOD,
|
|
choices=(
|
|
"auto",
|
|
"cherrypy",
|
|
"gevent",
|
|
"gunicorn",
|
|
"paste",
|
|
"twisted",
|
|
"wsgiref",
|
|
),
|
|
type=str.lower,
|
|
help=(
|
|
"Use METHOD to run the server. Valid values include paste, "
|
|
"cherrypy, twisted, gunicorn, gevent, wsgiref, and auto. The "
|
|
'default is to use "auto", which chooses one of paste, cherrypy, '
|
|
"twisted, or wsgiref."
|
|
),
|
|
)
|
|
run_parser.add_argument(
|
|
"-o",
|
|
"--overwrite",
|
|
action="store_true",
|
|
help="Allow overwriting existing package files during upload.",
|
|
)
|
|
run_parser.add_argument(
|
|
"--welcome",
|
|
metavar="HTML_FILE",
|
|
# we want to run our `html_file_arg` function to get our default value
|
|
# if the value isn't provided, but if we specify `None` as a default
|
|
# or let argparse handle the default logic, it will not call that
|
|
# function with the None value. So, we set it to a custom value.
|
|
default="pypiserver/welcome.html",
|
|
type=html_file_arg,
|
|
help=(
|
|
"Use the contents of HTML_FILE as a custom welcome message "
|
|
"on the home page."
|
|
),
|
|
)
|
|
run_parser.add_argument(
|
|
"--cache-control",
|
|
metavar="AGE",
|
|
type=int,
|
|
help=(
|
|
'Add "Cache-Control: max-age=AGE" header to package downloads. '
|
|
"Pip 6+ requires this for caching."
|
|
"AGE is specified in seconds."
|
|
),
|
|
)
|
|
run_parser.add_argument(
|
|
"--log-req-frmt",
|
|
metavar="FORMAT",
|
|
default=DEFAULTS.LOG_REQ_FRMT,
|
|
help=(
|
|
"A format-string selecting Http-Request properties to log; set "
|
|
"to '%%s' to see them all."
|
|
),
|
|
)
|
|
run_parser.add_argument(
|
|
"--log-res-frmt",
|
|
metavar="FORMAT",
|
|
default=DEFAULTS.LOG_RES_FRMT,
|
|
help=(
|
|
"A format-string selecting Http-Response properties to log; set "
|
|
"to '%%s' to see them all."
|
|
),
|
|
)
|
|
run_parser.add_argument(
|
|
"--log-err-frmt",
|
|
metavar="FORMAT",
|
|
default=DEFAULTS.LOG_ERR_FRMT,
|
|
help=(
|
|
"A format-string selecting Http-Error properties to log; set "
|
|
"to '%%s' to see them all."
|
|
),
|
|
)
|
|
|
|
update_parser = subparsers.add_parser(
|
|
"update",
|
|
help=textwrap.dedent(
|
|
"Handle updates of packages managed by pypiserver. By default, "
|
|
"a pip command to update the packages is printed to stdout for "
|
|
"introspection or pipelining. See the `-x` option for updating "
|
|
"packages directly."
|
|
),
|
|
)
|
|
|
|
add_common_args(update_parser)
|
|
|
|
update_parser.add_argument(
|
|
"package_directory",
|
|
default=DEFAULTS.PACKAGE_DIRECTORIES,
|
|
nargs="*",
|
|
type=package_directory_arg,
|
|
help="The directory from which to serve packages.",
|
|
)
|
|
|
|
update_parser.add_argument(
|
|
"-x",
|
|
"--execute",
|
|
action="store_true",
|
|
help="Execute the pip commands rather than printing to stdout",
|
|
)
|
|
update_parser.add_argument(
|
|
"-d",
|
|
"--download-directory",
|
|
help=(
|
|
"Specify a directory where packages updates will be downloaded. "
|
|
"The default behavior is to use the directory which contains "
|
|
"the package being updated."
|
|
),
|
|
)
|
|
update_parser.add_argument(
|
|
"-u",
|
|
"--allow-unstable",
|
|
action="store_true",
|
|
help=(
|
|
"Allow updating to unstable versions (alpha, beta, rc, dev, etc.)"
|
|
),
|
|
)
|
|
update_parser.add_argument(
|
|
"--blacklist-file",
|
|
"--ignorelist-file",
|
|
dest="ignorelist_file",
|
|
default="pypiserver/no-ignores",
|
|
type=ignorelist_file_arg,
|
|
help=(
|
|
"Don't update packages listed in this file (one package name per "
|
|
"line, without versions, '#' comments honored). This can be useful "
|
|
"if you upload private packages into pypiserver, but also keep a "
|
|
"mirror of public packages that you regularly update. Attempting "
|
|
"to pull an update of a private package from `pypi.org` might pose "
|
|
"a security risk - e.g. a malicious user might publish a higher "
|
|
"version of the private package, containing arbitrary code."
|
|
),
|
|
)
|
|
return parser
|
|
|
|
|
|
TConf = t.TypeVar("TConf", bound="_ConfigCommon")
|
|
BackendFactory = t.Callable[["_ConfigCommon"], Backend]
|
|
|
|
|
|
class _ConfigCommon:
|
|
hash_algo: t.Optional[str] = None
|
|
|
|
def __init__(
|
|
self,
|
|
roots: t.List[pathlib.Path],
|
|
verbosity: int,
|
|
log_frmt: str,
|
|
log_file: t.Optional[str],
|
|
log_stream: t.Optional[t.IO],
|
|
hash_algo: t.Optional[str],
|
|
backend_arg: str,
|
|
) -> None:
|
|
"""Construct a RuntimeConfig."""
|
|
# Global arguments
|
|
self.verbosity = verbosity
|
|
self.log_file = log_file
|
|
self.log_stream = log_stream
|
|
self.log_frmt = log_frmt
|
|
|
|
self.roots = roots
|
|
self.hash_algo = hash_algo
|
|
self.backend_arg = backend_arg
|
|
|
|
# Derived properties are directly based on other properties and are not
|
|
# included in equality checks.
|
|
self._derived_properties: t.Tuple[str, ...] = (
|
|
"iter_packages",
|
|
"package_root",
|
|
"backend",
|
|
)
|
|
# The first package directory is considered the root. This is used
|
|
# for uploads.
|
|
self.package_root = self.roots[0]
|
|
|
|
self.backend = self.get_backend(backend_arg)
|
|
|
|
@classmethod
|
|
def from_namespace(
|
|
cls: t.Type[TConf], namespace: argparse.Namespace
|
|
) -> TConf:
|
|
"""Construct a config from an argparse namespace."""
|
|
return cls(**cls.kwargs_from_namespace(namespace))
|
|
|
|
@staticmethod
|
|
def kwargs_from_namespace(
|
|
namespace: argparse.Namespace,
|
|
) -> t.Dict[str, t.Any]:
|
|
"""Convert a namespace into __init__ kwargs for this class."""
|
|
return dict(
|
|
verbosity=namespace.verbose,
|
|
log_file=namespace.log_file,
|
|
log_stream=namespace.log_stream,
|
|
log_frmt=namespace.log_frmt,
|
|
roots=namespace.package_directory,
|
|
hash_algo=namespace.hash_algo,
|
|
backend_arg=namespace.backend_arg,
|
|
)
|
|
|
|
@property
|
|
def log_level(self) -> int:
|
|
"""Return an appropriate log-level for the config's verbosity."""
|
|
levels = {
|
|
0: logging.WARNING,
|
|
1: logging.INFO,
|
|
2: logging.DEBUG,
|
|
}
|
|
# Return a log-level from warning through not set (log all messages).
|
|
# If we've specified 3 or more levels of verbosity, just return not set.
|
|
return levels.get(self.verbosity, logging.NOTSET)
|
|
|
|
def get_backend(self, arg: str) -> IBackend:
|
|
available_backends: t.Dict[str, BackendFactory] = {
|
|
"auto": get_file_backend,
|
|
"simple-dir": SimpleFileBackend,
|
|
"cached-dir": CachingFileBackend,
|
|
}
|
|
|
|
backend = available_backends[arg]
|
|
|
|
return BackendProxy(backend(self))
|
|
|
|
def with_updates(self: TConf, **kwargs: t.Any) -> TConf:
|
|
"""Create a new config with the specified updates.
|
|
|
|
The current config is used as a base. Any properties not specified in
|
|
keyword arguments will remain unchanged.
|
|
"""
|
|
return self.__class__(**{**dict(self), **kwargs})
|
|
|
|
def __repr__(self) -> str:
|
|
"""A string representation indicating the class and its properties."""
|
|
return "{}({})".format(
|
|
self.__class__.__name__,
|
|
", ".join(
|
|
f"{k}={v}"
|
|
for k, v in vars(self).items()
|
|
if not k.startswith("_")
|
|
),
|
|
)
|
|
|
|
def __eq__(self, other: t.Any) -> bool:
|
|
"""Configs are equal if their public values are equal."""
|
|
if not isinstance(other, self.__class__):
|
|
return False
|
|
return all(
|
|
getattr(other, k) == v
|
|
for k, v in self
|
|
if k not in self._derived_properties
|
|
)
|
|
|
|
def __iter__(self) -> t.Iterator[t.Tuple[str, t.Any]]:
|
|
"""Iterate over config (k, v) pairs."""
|
|
yield from (
|
|
(k, v)
|
|
for k, v in vars(self).items()
|
|
if not k.startswith("_") and k not in self._derived_properties
|
|
)
|
|
|
|
|
|
class RunConfig(_ConfigCommon):
|
|
"""A config for the Run command."""
|
|
|
|
def __init__(
|
|
self,
|
|
port: int,
|
|
host: str,
|
|
authenticate: t.List[str],
|
|
password_file: t.Optional[str],
|
|
disable_fallback: bool,
|
|
fallback_url: str,
|
|
health_endpoint: str,
|
|
server_method: str,
|
|
overwrite: bool,
|
|
welcome_msg: str,
|
|
cache_control: t.Optional[int],
|
|
log_req_frmt: str,
|
|
log_res_frmt: str,
|
|
log_err_frmt: str,
|
|
auther: t.Optional[t.Callable[[str, str], bool]] = None,
|
|
**kwargs: t.Any,
|
|
) -> None:
|
|
"""Construct a RuntimeConfig."""
|
|
super().__init__(**kwargs)
|
|
self.port = port
|
|
self.host = host
|
|
self.authenticate = authenticate
|
|
self.password_file = password_file
|
|
self.disable_fallback = disable_fallback
|
|
self.fallback_url = fallback_url
|
|
self.health_endpoint = health_endpoint
|
|
self.server_method = server_method
|
|
self.overwrite = overwrite
|
|
self.welcome_msg = welcome_msg
|
|
self.cache_control = cache_control
|
|
self.log_req_frmt = log_req_frmt
|
|
self.log_res_frmt = log_res_frmt
|
|
self.log_err_frmt = log_err_frmt
|
|
# Derived properties
|
|
self._derived_properties = self._derived_properties + ("auther",)
|
|
self.auther = self.get_auther(auther)
|
|
|
|
@classmethod
|
|
def kwargs_from_namespace(
|
|
cls, namespace: argparse.Namespace
|
|
) -> t.Dict[str, t.Any]:
|
|
"""Convert a namespace into __init__ kwargs for this class."""
|
|
return {
|
|
**super(RunConfig, cls).kwargs_from_namespace(namespace),
|
|
"port": namespace.port,
|
|
"host": namespace.host,
|
|
"authenticate": namespace.authenticate,
|
|
"password_file": namespace.passwords,
|
|
"disable_fallback": namespace.disable_fallback,
|
|
"fallback_url": namespace.fallback_url,
|
|
"health_endpoint": namespace.health_endpoint,
|
|
"server_method": namespace.server,
|
|
"overwrite": namespace.overwrite,
|
|
"welcome_msg": namespace.welcome,
|
|
"cache_control": namespace.cache_control,
|
|
"log_req_frmt": namespace.log_req_frmt,
|
|
"log_res_frmt": namespace.log_res_frmt,
|
|
"log_err_frmt": namespace.log_err_frmt,
|
|
}
|
|
|
|
def get_auther(
|
|
self, passed_auther: t.Optional[t.Callable[[str, str], bool]]
|
|
) -> t.Callable[[str, str], bool]:
|
|
"""Create or retrieve an authentication function."""
|
|
# The auther may be specified directly as a kwarg in the API interface
|
|
if passed_auther:
|
|
return passed_auther
|
|
# Otherwise we check to see if we need to authenticate
|
|
if self.password_file == "." or self.authenticate == []:
|
|
# It's illegal to specify no authentication without also specifying
|
|
# no password file, and vice-versa.
|
|
if self.password_file != "." or self.authenticate != []:
|
|
sys.exit(
|
|
"When auth-ops-list is empty (-a=.), password-file"
|
|
f" (-P={self.password_file!r}) must also be empty ('.')!"
|
|
)
|
|
# Return an auther that always returns true.
|
|
return lambda _uname, _pw: True
|
|
# Now, if there was no password file specified, we can return an auther
|
|
# that always returns False, since there is no way to authenticate.
|
|
if self.password_file is None:
|
|
return lambda _uname, _pw: False
|
|
# Finally, if a password file was specified, we'll load it up with
|
|
# Htpasswd and return a callable that checks it.
|
|
if HtpasswdFile is None:
|
|
sys.exit(
|
|
"apache.passlib library is not available. You must install "
|
|
"pypiserver with the optional 'passlib' dependency (`pip "
|
|
"install pypiserver['passlib']`) in order to use password "
|
|
"authentication"
|
|
)
|
|
|
|
loaded_pw_file = HtpasswdFile(self.password_file)
|
|
|
|
# Construct a local closure over the loaded PW file and return as our
|
|
# authentication function.
|
|
def auther(uname: str, pw: str) -> bool:
|
|
loaded_pw_file.load_if_changed()
|
|
return loaded_pw_file.check_password(uname, pw)
|
|
|
|
return auther
|
|
|
|
|
|
class UpdateConfig(_ConfigCommon):
|
|
"""A config for the Update command."""
|
|
|
|
def __init__(
|
|
self,
|
|
execute: bool,
|
|
download_directory: t.Optional[str],
|
|
allow_unstable: bool,
|
|
ignorelist: t.List[str],
|
|
**kwargs: t.Any,
|
|
) -> None:
|
|
"""Construct an UpdateConfig."""
|
|
super().__init__(**kwargs)
|
|
self.execute = execute
|
|
self.download_directory = download_directory
|
|
self.allow_unstable = allow_unstable
|
|
self.ignorelist = ignorelist
|
|
|
|
@classmethod
|
|
def kwargs_from_namespace(
|
|
cls, namespace: argparse.Namespace
|
|
) -> t.Dict[str, t.Any]:
|
|
"""Convert a namespace into __init__ kwargs for this class."""
|
|
return {
|
|
**super(UpdateConfig, cls).kwargs_from_namespace(namespace),
|
|
"execute": namespace.execute,
|
|
"download_directory": namespace.download_directory,
|
|
"allow_unstable": namespace.allow_unstable,
|
|
"ignorelist": namespace.ignorelist_file,
|
|
}
|
|
|
|
|
|
Configuration = t.Union[RunConfig, UpdateConfig]
|
|
|
|
|
|
class Config:
|
|
"""Config constructor for building a config from args."""
|
|
|
|
@classmethod
|
|
def default_with_overrides(cls, **overrides: t.Any) -> RunConfig:
|
|
"""Construct a RunConfig with default arguments, plus overrides.
|
|
|
|
Overrides must be valid arguments to the `__init__()` function
|
|
of `RunConfig`.
|
|
"""
|
|
default_config = cls.from_args(["run"])
|
|
assert isinstance(default_config, RunConfig)
|
|
return default_config.with_updates(**overrides)
|
|
|
|
@classmethod
|
|
def from_args(
|
|
cls, args: t.Optional[t.Sequence[str]] = None
|
|
) -> Configuration:
|
|
"""Construct a Config from the passed args or sys.argv."""
|
|
# If pulling args from sys.argv (commandline arguments), argv[0] will
|
|
# be the program name, (i.e. pypi-server), so we don't need to
|
|
# worry about it.
|
|
args = args if args is not None else sys.argv[1:]
|
|
parser = get_parser()
|
|
|
|
try:
|
|
with capture_stderr() as cap:
|
|
parsed = parser.parse_args(args)
|
|
# There's a special case we need to handle where no arguments
|
|
# whatsoever were provided. Because we need to introspect
|
|
# what subcommand is called, via the `add_subparsers(dest='cmd')`
|
|
# call, calling with no subparser is _not_ an error. We will
|
|
# treat it as such, so that we then trigger the legacy argument
|
|
# handling logic.
|
|
if parsed.cmd is None:
|
|
sys.exit(1)
|
|
except SystemExit as exc:
|
|
# A SystemExit is raised in some non-error cases, like
|
|
# printing the help or the version. Reraise in those cases.
|
|
cap.seek(0)
|
|
first_txt = cap.read()
|
|
if not exc.code or exc.code == 0:
|
|
# There usually won't have been any error text in these
|
|
# cases, but if there was, print it.
|
|
if first_txt:
|
|
print(first_txt, file=sys.stderr)
|
|
raise
|
|
# Otherwise, it's possible they're using the older, non-subcommand
|
|
# form of the CLI. In this case, attempt to parse with adjusted
|
|
# arguments. If the parse is successful, warn them about using
|
|
# deprecated arguments and continue. If this parse _also_ fails,
|
|
# show them the parsing error for their original arguments,
|
|
# not for the adjusted arguments.
|
|
try:
|
|
with capture_stderr() as cap:
|
|
parsed = parser.parse_args(cls._adjust_old_args(args))
|
|
print(
|
|
"WARNING: You are using deprecated arguments to pypiserver.\n\n"
|
|
"Please run `pypi-server --help` and update your command "
|
|
"to align with the current interface.\n\n"
|
|
"In most cases, this will be as simple as just using\n\n"
|
|
" pypi-server run [args]\n\n"
|
|
"instead of\n\n"
|
|
" pypi-server [args]\n",
|
|
file=sys.stderr,
|
|
)
|
|
except SystemExit:
|
|
cap.seek(0)
|
|
second_txt = cap.read()
|
|
if not exc.code or exc.code == 0:
|
|
# Again, usually nothing to stderr in these cases,
|
|
# but if there was, print it and re-raise.
|
|
if second_txt:
|
|
print(second_txt, file=sys.stderr)
|
|
raise
|
|
# Otherwise, we print the original error text instead of
|
|
# the error text from the call with our adjusted args,
|
|
# and then raise. Showing the original error text will
|
|
# provide a usage error for the new argument form, which
|
|
# should help folks to upgrade.
|
|
if first_txt:
|
|
print(first_txt, file=sys.stderr)
|
|
raise
|
|
|
|
if parsed.cmd == "run":
|
|
return RunConfig.from_namespace(parsed)
|
|
if parsed.cmd == "update":
|
|
return UpdateConfig.from_namespace(parsed)
|
|
raise SystemExit(parser.format_usage())
|
|
|
|
@staticmethod
|
|
def _adjust_old_args(args: t.Sequence[str]) -> t.List[str]:
|
|
"""Adjust args for backwards compatibility.
|
|
|
|
Should only be called once args have been verified to be unparsable.
|
|
"""
|
|
# Backwards compatibility hack: for most of pypiserver's life, "run"
|
|
# and "update" were not separate subcommands. The `-U` flag being
|
|
# present on the cmdline, regardless of other arguments, would lead
|
|
# to update behavior. In order to allow subcommands without
|
|
# breaking backwards compatibility, we need to insert "run" or
|
|
# "update" as a positional arg before any other arguments.
|
|
|
|
# We will be adding our subcommand as the first argument.
|
|
insertion_idx = 0
|
|
|
|
# Don't actually update the passed list, in case it was the global
|
|
# sys.argv.
|
|
updated_args = list(args)
|
|
|
|
# Find the update index. For "reasons", python's index search throws
|
|
# if the value isn't found, so manually wrap in the usual "return -1
|
|
# if not found" standard
|
|
try:
|
|
update_idx = updated_args.index("-U")
|
|
except ValueError:
|
|
update_idx = -1
|
|
|
|
if update_idx == -1:
|
|
# We were a "run" command.
|
|
updated_args.insert(insertion_idx, "run")
|
|
else:
|
|
# Remove the -U from the args and add the "update" command at the
|
|
# start of the arg list.
|
|
updated_args.pop(update_idx)
|
|
updated_args.insert(insertion_idx, "update")
|
|
|
|
return updated_args
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def capture_stderr() -> t.Iterator[t.IO]:
|
|
"""Capture stderr and yield as a buffer."""
|
|
orig = sys.stderr
|
|
cap = io.StringIO()
|
|
sys.stderr = cap
|
|
try:
|
|
yield cap
|
|
finally:
|
|
sys.stderr = orig
|
|
|
|
|
|
# Note: this is adapted from this StackOverflow answer:
|
|
# https://stackoverflow.com/a/35925919 -- the normal "raw" help
|
|
# text formatters provided with the argparse library don't do
|
|
# a great job of maintaining whitespace while still keeping
|
|
# subsequent lines properly intended.
|
|
class PreserveWhitespaceRawTextHelpFormatter(
|
|
argparse.RawDescriptionHelpFormatter
|
|
):
|
|
"""A help text formatter allowing more customization of newlines."""
|
|
|
|
def __add_whitespace(self, idx: int, iWSpace: int, text: str) -> str:
|
|
if idx == 0:
|
|
return text
|
|
return (" " * iWSpace) + text
|
|
|
|
def _split_lines(self, text: str, width: int) -> t.List[str]:
|
|
textRows = text.splitlines()
|
|
for idx, line in enumerate(textRows):
|
|
search = re.search(r"\s*[0-9\-]{0,}\.?\s*", line)
|
|
if line.strip() == "":
|
|
textRows[idx] = " "
|
|
elif search:
|
|
lWSpace = search.end()
|
|
lines = [
|
|
self.__add_whitespace(i, lWSpace, x)
|
|
for i, x in enumerate(textwrap.wrap(line, width))
|
|
]
|
|
textRows[idx] = lines # type: ignore
|
|
|
|
return [item for sublist in textRows for item in sublist]
|