Compare commits

...

31 Commits

Author SHA1 Message Date
Matthew Planchard
3e74a01e3f ensured pip installs plugin from correct path 2018-11-04 15:44:11 -06:00
Matthew Planchard
1b2bef415f Updated tests to work with passlib plugin 2018-10-30 21:12:20 -05:00
Matthew Planchard
8585ee4aa9 remove dead code 2018-10-30 20:53:57 -05:00
Matthew Planchard
4d3ea613e4 Moved itests to their own folder; using tmp pythonpath now instead of venvs for pkg installs 2018-10-30 20:52:51 -05:00
Matthew Planchard
32fc2b1f66 Allow passing positional args to pytest via tox 2018-10-30 19:14:38 -05:00
Matthew Planchard
138bf8e522 test updates 2018-08-06 17:14:26 -05:00
Matthew Planchard
8e670fd2dd passlib int. tests, fix config for passlib plugin name, version to 2.0.0 2018-08-05 09:55:20 -05:00
Matthew Planchard
438697fa37 Merge branch 'master' into _config_updates 2018-08-05 08:57:56 -05:00
Matthew Planchard
824c2dd24f Removed passlib to plugin, no-auth defaults, integration tests 2018-08-04 21:22:14 -05:00
Matthew Planchard
84158ab881 Working on integration tests 2018-08-04 12:24:19 -05:00
Matthew Planchard
42f024e8c2 config tests, pep8 for test_server 2018-07-27 20:47:42 -05:00
Matthew Planchard
93c9c67136 Travis doesn't have 3.7 yet :) 2018-07-27 19:12:44 -05:00
Matthew Planchard
820b0aac95 underscore in test_docker 2018-07-27 19:11:37 -05:00
Matthew Planchard
db749322bd Remove standalone code, add Python 3.7 to tests 2018-07-27 19:09:49 -05:00
Matthew Planchard
0742577a4d Only check pw file if defined 2018-07-26 21:17:32 -05:00
Matthew Planchard
af4037fff5 Added no-auth auth backend, fixed standalone test 2018-07-26 21:13:38 -05:00
Matthew Planchard
083cc03530 Test fixes, use of the htpasswd plugin, defaults
* Update `__main__.py` to differentiate new and old configs depending on
the value of `argv[0]`
* Update deprecation warning to always show for all pythons
* Added a `convert_legacy` function to
`plugins.authenticators.interface` to convert a legacy `auther` into a
new interface compliant class
* Added logic and deprecation warnings for handling most (all?) of the
old `auther` cases, plus handling `--auth-backend` from the new config
* Set default `--auth-bakend` to `htpasswd` if available
* Updated `paste` config to work with new config
* Added ability to set `--password-file` to `'.'`
2018-07-26 20:57:01 -05:00
Matthew Planchard
c10f31339d Ensured plugins are installed with package
* Used `find_packages()` in `setup.py` to ensure non-top-level packges
would also be installed
* Updated `.dockerignore` to include some other unnecessary items
* Updated `__main__.py` to not bomb when `update_packages` is not in the
`config` object (i.e. when using `pypiserver run`)
2018-07-26 19:26:56 -05:00
Matthew Planchard
5ea2a9eb5c Pulled out some configure() logic 2018-07-18 21:04:56 -05:00
Matthew Planchard
02f3d5af61 Load plugins in configure() 2018-07-18 20:59:26 -05:00
Matthew Planchard
26d35cd9e9 Renamed ConfigFactory -> Config 2018-07-18 19:36:52 -05:00
Matthew Planchard
915a64cc54 Fixed bug with slots declaration 2018-07-18 19:31:55 -05:00
Matthew Planchard
9efe756ce0 Added docker to introductory material 2018-07-18 19:31:37 -05:00
Matthew Planchard
8581bc24fa Working auth plugin 2018-07-17 20:41:50 -05:00
Matthew Planchard
068cb69c92 Forgot some changes in the last commit 2018-07-10 21:38:28 -05:00
Matthew Planchard
938985a901 Beginnings of general & auth plugin infrastructure 2018-07-10 21:18:10 -05:00
Matthew Planchard
0da6c03c72 Backwards compatibility, deprecation warnings 2018-07-09 21:21:21 -05:00
Matthew Planchard
0763077124 Standalone conf, Dockerfile updates, pub interface
* Fixed config for standalone package to handle the expected error in
`pkg_resources.resource_file()`
* Ensured the `pypiserver` user in the Dockerfile is part of the
`pypiserver` group
* Ensured `__updated__` is available in the public interface by moving
it back into `__init__.py`
* Added `const.py` for defining, you guessed it, constants
2018-07-05 19:27:39 -05:00
Matthew Planchard
38d51dfbce * minor formatting 2018-07-03 21:35:42 -05:00
Matthew Planchard
02b3802876 Replaced config, updated lots of things
* Full utilization of the new argparse-based config
* Removed everythong from `__init__.py`
* Moved all app definition into a factory function in `_app.py`
* Moved `app()` into `_app.py` and imported into `__init__.py`
* The former now just calls the factory function, no more hacks around
module imports
* Moved version stuff into `_version.py`
* Moved the paste factory into its own module
* Lots of PEP008 formatting
* Generally moved constants to top of files
* Simplified `exec` calls to get the version
* Added `ipdb` to dev requirements
2018-07-03 21:30:50 -05:00
Matthew Planchard
1b0ee1cd82 ArgumentParser done, working on tests 2018-06-28 21:09:23 -05:00
44 changed files with 3005 additions and 1297 deletions

@ -13,6 +13,9 @@
.~
.DS_Store
.ropeproject
.cache
.pytest_cache
.standalone
ID
__pycache__/
/build/

@ -1,21 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- ECLIPSE project file
Just copy this file long with `.pydevproject` in wour working-dir
but without the `.sample` suffix, and import it into eclipse/liclise.
-->
<projectDescription>
<name>Pypiserver</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.python.pydev.PyDevBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.python.pydev.pythonNature</nature>
</natures>
</projectDescription>

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?><pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 3.0</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/${PROJECT_DIR_NAME}</path>
</pydev_pathproperty>
</pydev_project>

@ -14,8 +14,7 @@ install:
- pip install -U setuptools pip sphinx tox tox-travis
script:
- ./bin/test-docker.sh
- ./bin/test_standalone.sh
- ./bin/test_docker.sh
- tox
- ./bin/check_readme.sh

@ -9,6 +9,62 @@ Changelog
- ENH: Improved Dockerfile and ``docker-compose`` example, docs for using
the docker image, automatic docker builds
New Features
~~~~~~~~~~~~
- Dockerfile, ``docker-compose`` example, and Docker Hub integration for
automatic builds.
- New ``config`` module with ``argparse``-based ``Config`` class.
Essential for further improvements in pluggability!
- New ``pypiserver`` command interface (as opposed to ``pypi-server``),
also essential for further improvements in pluggability.
Deprecations
~~~~~~~~~~~~
- The ``pypi-server`` command has been deprecated and will be removed in the
next major release.
- The ``pypiserver.app()`` interface now takes a ``config`` object as its
primary argument. Configuration keyword arguments are deprecated and the
capacity to specify them will be removed in the next major release. Use
``pypiserver.config.Config().from_kwargs()`` instead.
Backwards Incompatible Changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
**Changed:**
- ``pypiserver.core.configure()`` no longer takes arbitrary keyword arguments.
It should now be passed a ``config`` namespace, such as one produced by
``pypiserver.config.ConfigFactyr().from_kwargs()``.
**Removed:**
- ``pypiserver.Config`` - use ``pypiserver.config.Config`` instead.
- ``pypiserver.default_config()`` - use
``pypiserver.config.Config().get_default()`` instead.
- ``pypiserver.DEFAULT_SERVER`` - use
``pypiserver.config.Config().get_default().server`` if this is for
some reason necessary
- ``pypiserver.__main__.usage()`` - use
``pyiserver.config.Config().get_parser().print_help()``
**Moved:**
- ``pypiserver.str2bool()`` -> ``pypiserver.config.str2bool()``
- ``pypiserver.paste_app_factory()`` ->
``pypiserver.paste.paste_app_factory()``
1.2.2 (2018-06-12)
------------------

@ -4,10 +4,9 @@ FROM python:3.6-alpine
COPY . /code
WORKDIR /code
RUN adduser -S -u 9898 pypiserver && \
addgroup -S -g 9898 pypiserver && \
python setup.py install && \
pip install passlib && \
RUN addgroup -S -g 9898 pypiserver && \
adduser -S -u 9898 -G pypiserver pypiserver && \
pip install .[passlib] && \
cd / && \
rm -rf /code && \
mkdir -p /data/packages && \
@ -21,5 +20,7 @@ USER pypiserver
WORKDIR /data
EXPOSE 8080
ENTRYPOINT ["pypi-server", "-p", "8080"]
CMD ["packages"]
ENV PYPISERVER_PORT=8080
ENTRYPOINT ["pypiserver"]
CMD ["run", "packages"]

@ -9,11 +9,12 @@ pypiserver - minimal PyPI server for use with pip/easy_install
==============================================================================
|pypi-ver| |travis-status| |dependencies| |python-ver| |proj-license|
:Version: 1.2.3
:Version: 2.0.0
:Date: 2018-08-04 11:49:30
:Source: https://github.com/pypiserver/pypiserver
:PyPI: https://pypi.org/project/pypiserver/
:Travis: https://travis-ci.org/pypiserver/pypiserver
:Docker: https://hub.docker.com/r/pypiserver/pypiserver/
:Maintainers: Kostis Anagnostopoulos <ankostis@gmail.com>,
Matthew Planchard <mplanchard@gmail.com>
:License: zlib/libpng + MIT

@ -1,41 +0,0 @@
#! /bin/sh
#
## Create an executable file and add it into `standalone` branch
#
# Invoke it directly on the commmit "tagged" for a release.
# Invoke it with any arg to avoid committing into `standalone` branch.
set -x
my_dir="$(dirname "$0")"
cd $my_dir/..
git_wdir="./bin/git-new-workdir"
git_wdir_url="https://raw.githubusercontent.com/git/git/master/contrib/workdir/git-new-workdir"
if [ ! -x "$git_wdir" ]; then
wget "$git_wdir_url" -O "$git_wdir"
chmod a+x "$git_wdir"
fi
## Ensure `standalone` branch exists (fails in travis).
git fetch pypiserver standalone:pypiserver/standalone -f && \
git branch --track standalone pypiserver/standalone
set -o errexit
gitversion=$(git describe --tags)
rm -rf .standalone
if nwd_dump=$( "$git_wdir" . .standalone standalone 2>&1 ); then
./bin/gen-standalone.sh
mkdir -p .standalone
cp -p pypi-server-standalone.py .standalone
cd .standalone
if [ $# -lt 1 ]; then
git add .
git commit -m "Add pypi-server-standalone $gitversion"
fi
else
echo "git-new-workdir: failed due to: $nwd_dump"
exit 1
fi

@ -1,45 +0,0 @@
#! /bin/sh
#
## Create an executable zip file.
# Invoked by `commit-standalone.sh`.
set -x
set -o errexit
exec_zip="./pypi-server-standalone.py"
my_dir="$(dirname "$0")"
cd $my_dir/..
rm -rf ./build/* ./dist/*
python setup.py bdist_wheel
wheel="./dist/pypiserver-*.whl"
## Modify `wheel` archive with `__main__.py` at root,
# add dependencies, and
# prepend it with a python-flashbang + some header-comments >= 10-lines
# so that ``head pypiserver*.py``behaves politely.
#
sudo update-ca-certificates || echo "Failed updating certs (run on travis container?)"
unzip -jo $wheel pypiserver/__main__.py -d ./dist
zip -d $wheel pypiserver/__main__.py
zip -mj $wheel ./dist/__main__.py
wget https://files.pythonhosted.org/packages/2d/a7/1a3363e5d531d438267a79d43d4b8d224655adef489e98fc96678fe16ed1/passlib-1.6.5-py2.py3-none-any.whl \
-O ./dist/passlib-1.6.5-py2.py3-none-any.whl
zip -mj $wheel ./dist/passlib-*.whl
gitversion=$(git describe --tags)
cat - $wheel > "$exec_zip" << EOF
#!/usr/bin/env python
##
## Standalone pypiserver-$gitversion $(date -R)
##
## Execute it like that:
## $exec_zip <packages_dir>
## To get more help, type:
## $exec_zip --help
##
## BINARY CONTENT FOLLOWS
EOF
chmod a+xr "$exec_zip"

@ -1,18 +0,0 @@
#! /bin/sh
## Test standalone generation & execution.
##
set -x
my_dir="$(dirname "$0")"
cd $my_dir/..
git fetch origin standalone:origin/standalone
git branch --track standalone origin/standalone
./bin/commit-standalone.sh no_commit
./pypi-server-standalone.py . &
server_pid=$!
sleep 2
kill $server_pid && echo "Server killed nicely." # Kill fails if server failed.

6
itests/__init__.py Normal file

@ -0,0 +1,6 @@
"""Integration tests for pypiserver.
It is assumed that pypiserver has been installed with ``pip install -e``
or ``setup.py develop`` or otherwise properly installed prior to these
tests being run.
"""

Binary file not shown.

14
itests/simple_pkg.py Normal file

@ -0,0 +1,14 @@
"""A simple package to use to generate a wheel.
This is not called by tests directly, but is the source of the
simple_pkg wheel file used in integration tests.
"""
from setuptools import setup
setup(
name='simple_pkg',
description='A simple package',
version='0.0.0',
options={'bdist_wheel': {'universal': True}}
)

289
itests/test_pypiserver.py Normal file

@ -0,0 +1,289 @@
"""Integration tests for the new pypiserver command."""
from __future__ import (
absolute_import, division, print_function, unicode_literals
)
import os
from os import chdir, getcwd, environ, listdir, path, remove
import sys
from contextlib import contextmanager
from shutil import copy2, rmtree
from subprocess import PIPE, Popen
from tempfile import mkdtemp, mkstemp
if sys.version_info < (3,):
from itertools import ifilter as filter # noqa pylint: disable=no-name-in-module,redefined-builtin
import pytest
from passlib.apache import HtpasswdFile
THIS_DIR = path.abspath(path.dirname(__file__))
ROOT_DIR = path.abspath(path.join(THIS_DIR, '../..'))
SIMPLE_PKG = 'simple_pkg-0.0.0-py2.py3-none-any.whl'
SIMPLE_PKG_PATH = path.join(THIS_DIR, 'files', SIMPLE_PKG)
SIMPLE_DEV_PKG = 'simple_pkg-0.0.0.dev0-py2.py3-none-any.whl'
SIMPLE_DEV_PKG_PATH = path.join(THIS_DIR, 'files', SIMPLE_DEV_PKG)
@contextmanager
def changedir(target):
"""Change to target and then change back."""
start = getcwd()
chdir(target)
yield
chdir(start)
def bin_target(target):
"""Get a binary target relative to the current python executable."""
return path.abspath(path.join(sys.executable, '..', target))
def pypiserver_cmd(root, *args):
"""Yield a command to run pypiserver.
:param str exc: the path to the python executable to use in
running pypiserver.
:param args: extra arguments for ``pypiserver run``
"""
yield bin_target('pypiserver')
yield 'run'
yield root
for arg in args:
yield arg
def pip_cmd(*args):
"""Yield a command to run pip.
:param args: extra arguments for ``pip``
"""
yield bin_target('pip')
for arg in args:
yield arg
if (any(i in args for i in ('install', 'download', 'search'))
and '-i' not in args):
yield '-i'
yield 'http://localhost:8080'
def twine_cmd(*args):
"""Yield a command to run twine.
:param args: arguments for `twine`
"""
yield bin_target('twine')
for arg in args:
yield arg
for part in ('--repository-url', 'http://localhost:8080'):
yield part
if '-u' not in args:
for part in ('-u', 'username'):
yield part
if '-p' not in args:
for part in ('-p', 'password'):
yield part
def run(args, raise_on_err=True, capture=False, **kwargs):
"""Straightforward implementation to run subprocesses.
:param args: command args to pass to Popen
:param kwargs: extra kwargs to pass to Popen
"""
pipe = PIPE if capture else None
proc = Popen(args, stdout=pipe, stderr=pipe, **kwargs)
out, err = proc.communicate()
if raise_on_err and proc.returncode:
raise RuntimeError((proc.returncode, out, err))
if capture:
return out.decode('utf-8')
return proc.returncode
@contextmanager
def add_to_path(target):
"""Adjust the PATH to add the target at the front."""
start = environ['PATH']
environ['PATH'] = '{}:{}'.format(target, start)
yield
environ['PATH'] = start
@contextmanager
def add_to_pythonpath(target):
"""Adjust the PYTHONPATH to add the target at the front."""
start = environ.get('PYTHONPATH', '')
environ['PYTHONPATH'] = '{}:{}'.format(target, start)
yield
environ['PYTHONPATH'] = start
@pytest.fixture()
def site_packages():
"""Return a temporary directory to use as an additional packages dir."""
spdir = mkdtemp()
yield spdir
rmtree(spdir)
@pytest.fixture(scope='class')
def extra_pythonpath():
"""Return a temporary directory added to the front of the pythonpath."""
ppath = mkdtemp()
with add_to_pythonpath(ppath):
yield ppath
rmtree(ppath)
@pytest.fixture(scope='session')
def download_passlib():
"""Download passlib into a temporary directory."""
passlib_dir = mkdtemp()
with changedir(passlib_dir):
run((
bin_target('pip'),
'download',
'--no-deps',
'git+git://github.com/pypiserver/pypiserver-passlib',
))
passlib_file = next(filter(
lambda x: x.endswith('.zip'),
os.listdir(passlib_dir)
))
yield path.abspath(path.join(passlib_dir, passlib_file))
rmtree(passlib_dir)
@pytest.fixture(scope='class')
def install_passlib(download_passlib, extra_pythonpath):
"""Install passlib into the extra pythonpath."""
run((
bin_target('pip'),
'install',
'--no-deps',
download_passlib,
))
@pytest.fixture(scope='class', autouse=True)
def pkg_root():
"""Run pypiserver with no auth."""
pkg_root = mkdtemp()
yield pkg_root
rmtree(pkg_root)
@pytest.fixture()
def clean_pkg_root(pkg_root):
"""Ensure the pkg root is cleaned after each test."""
starts = set(listdir(pkg_root))
yield
for filename in set(listdir(pkg_root)).difference(starts):
remove(path.join(pkg_root, filename))
@pytest.fixture()
def simple_pkg(pkg_root):
"""Add the simple package to the repo."""
copy2(SIMPLE_PKG_PATH, pkg_root)
yield
remove(path.join(pkg_root, SIMPLE_PKG))
@pytest.fixture()
def simple_dev_pkg(pkg_root):
"""Add the simple dev package to the repo."""
copy2(SIMPLE_DEV_PKG_PATH, pkg_root)
yield
remove(path.join(pkg_root, SIMPLE_DEV_PKG))
class TestNoAuth(object):
"""Tests for running pypiserver with no authentication."""
@pytest.fixture(scope='class', autouse=True)
def runserver(self, pkg_root):
"""Run pypiserver with no auth."""
proc = Popen(pypiserver_cmd(pkg_root), env=environ)
yield proc
proc.kill()
@pytest.mark.usefixtures('simple_pkg')
def test_install(self):
"""Test pulling a package with pip from the repo."""
run(pip_cmd('install', 'simple_pkg'))
assert 'simple-pkg' in run(pip_cmd('freeze'), capture=True)
def test_upload(self, pkg_root):
"""Test putting a package into the repo."""
assert SIMPLE_PKG not in listdir(pkg_root)
run(twine_cmd('upload', SIMPLE_PKG_PATH))
assert SIMPLE_PKG in listdir(pkg_root)
@pytest.mark.usefixtures('simple_pkg')
def test_search(self):
"""Test results of pip search."""
out = run(pip_cmd('search', 'simple_pkg'), capture=True)
assert 'simple_pkg' in out
class TestPasslibAuth(object):
"""Test the passlib auth plugin.
Normally we would leave plugin integration testing to be performed
as part of the plugin's test suite. However, because of the
historical bundling of passlib functionality with pypiserver
via the ``[passlib]`` extras requirement, we need to check to be
sure we provide the same functionality here.
The point here isn't so much to test every possible variation of
the ``--authenticate`` option, but to ensure that, when the
``pypiserver-passlib`` plugin is installed, the defaults work the
same way as they did previously (authenticating uploads).
"""
USER = 'pypiserver'
PASS = 'password'
@pytest.fixture(scope='class')
def htpasswd_file(self):
"""Create an HtpasswdFile with a user/pass saved."""
fp, passfile = mkstemp()
os.close(fp)
htpass = HtpasswdFile(passfile)
htpass.set_password(self.USER, self.PASS)
htpass.save()
yield passfile
remove(passfile)
@pytest.fixture(scope='class', autouse=True)
def runserver(self, pkg_root, install_passlib, htpasswd_file):
"""Run with default auth when pypiserver-passlib is installed."""
proc = Popen(
pypiserver_cmd(pkg_root, '-P', htpasswd_file)
)
yield proc
proc.kill()
@pytest.mark.usefixtures('clean_pkg_root')
def test_upload(self, pkg_root):
"""Test putting a package into the repo."""
assert SIMPLE_PKG not in listdir(pkg_root)
run(twine_cmd(
'upload', '-u', self.USER, '-p', self.PASS, SIMPLE_PKG_PATH
))
assert SIMPLE_PKG in listdir(pkg_root)
def test_upload_fail(self, pkg_root):
"""Test putting a package into the repo with bad creds."""
assert SIMPLE_PKG not in listdir(pkg_root)
proc = Popen(
twine_cmd('upload', SIMPLE_PKG_PATH), stdout=PIPE, stderr=PIPE
)
out, err = map(lambda s: s.decode('utf-8'), proc.communicate())
assert '403 Client Error' in err
assert SIMPLE_PKG not in listdir(pkg_root)

@ -27,9 +27,9 @@ from textwrap import dedent
try:
from urllib.request import urlopen
except ImportError:
from urllib import urlopen
from urllib import urlopen # type: ignore
from py import path # @UnresolvedImport
from py import path # @UnresolvedImport pylint: disable=no-name-in-module
import pytest
@ -37,12 +37,14 @@ _BUFF_SIZE = 2**16
_port = 8090
SLEEP_AFTER_SRV = 3 # sec
@pytest.fixture
def port():
global _port
_port += 1
return _port
Srv = namedtuple('Srv', ('proc', 'port', 'package'))
@ -54,8 +56,9 @@ def _run_server(packdir, port, authed, other_cli=''):
'partial': "-Ptests/htpasswd.a.a -a update",
}
pswd_opts = pswd_opt_choices[authed]
cmd = "%s -m pypiserver.__main__ -vvv --overwrite -p %s %s %s %s" % (
sys.executable, port, pswd_opts, other_cli, packdir)
cmd = "pypi-server -vvv --overwrite -p %s %s %s %s" % (
port, pswd_opts, other_cli, packdir
)
proc = subprocess.Popen(cmd.split(), bufsize=_BUFF_SIZE)
time.sleep(SLEEP_AFTER_SRV)
assert proc.poll() is None
@ -188,26 +191,26 @@ def pipdir(tmpdir):
return tmpdir.mkdir("pip")
def test_pipInstall_packageNotFound(empty_packdir, port, pipdir, package):
with new_server(empty_packdir, port) as srv:
def test_pip_install_package_not_found(empty_packdir, port, pipdir, package):
with new_server(empty_packdir, port):
cmd = "centodeps"
assert _run_pip_install(cmd, port, pipdir) != 0
assert not pipdir.listdir()
def test_pipInstall_openOk(open_server, package, pipdir):
def test_pip_install_open_ok(open_server, package, pipdir):
cmd = "centodeps"
assert _run_pip_install(cmd, open_server.port, pipdir) == 0
assert pipdir.join(package.basename).check()
def test_pipInstall_authedFails(protected_server, pipdir):
def test_pip_install_authed_fails(protected_server, pipdir):
cmd = "centodeps"
assert _run_pip_install(cmd, protected_server.port, pipdir) != 0
assert not pipdir.listdir()
def test_pipInstall_authedOk(protected_server, package, pipdir):
def test_pip_install_authed_ok(protected_server, package, pipdir):
cmd = "centodeps"
assert _run_pip_install(cmd, protected_server.port, pipdir,
user='a', pswd='a') == 0
@ -243,18 +246,18 @@ def pypirc_file(txt):
@pytest.mark.parametrize("pkg_frmt", ['bdist', 'bdist_wheel'])
def test_setuptoolsUpload_open(empty_packdir, port, project, package,
pkg_frmt):
def test_setuptools_upload_open(empty_packdir, port, project, package,
pkg_frmt):
url = _build_url(port, None, None)
with pypirc_file(dedent("""\
[distutils]
index-servers: test
[distutils]
index-servers: test
[test]
repository: %s
username: ''
password: ''
""" % url)):
[test]
repository: %s
username: ''
password: ''
""" % url)):
with new_server(empty_packdir, port):
with chdir(project.strpath):
cmd = "setup.py -vvv %s upload -r %s" % (pkg_frmt, url)
@ -266,21 +269,23 @@ def test_setuptoolsUpload_open(empty_packdir, port, project, package,
@pytest.mark.parametrize("pkg_frmt", ['bdist', 'bdist_wheel'])
def test_setuptoolsUpload_authed(empty_packdir, port, project, package,
pkg_frmt, monkeypatch):
def test_setuptools_upload_authed(empty_packdir, port, project, package,
pkg_frmt, monkeypatch):
url = _build_url(port)
with pypirc_file(dedent("""\
[distutils]
index-servers: test
[distutils]
index-servers: test
[test]
repository: %s
username: a
password: a
""" % url)):
[test]
repository: %s
username: a
password: a
""" % url)):
with new_server(empty_packdir, port, authed=True):
with chdir(project.strpath):
cmd = "setup.py -vvv %s register -r test upload -r test" % pkg_frmt
cmd = "setup.py -vvv %s register -r test upload -r test" % (
pkg_frmt
)
for i in range(5):
print('++Attempt #%s' % i)
assert _run_python(cmd) == 0
@ -294,14 +299,14 @@ def test_setuptools_upload_partial_authed(empty_packdir, port, project,
"""Test uploading a package with setuptools with partial auth."""
url = _build_url(port)
with pypirc_file(dedent("""\
[distutils]
index-servers: test
[distutils]
index-servers: test
[test]
repository: %s
username: a
password: a
""" % url)):
[test]
repository: %s
username: a
password: a
""" % url)):
with new_server(empty_packdir, port, authed='partial'):
with chdir(project.strpath):
cmd = ("setup.py -vvv %s register -r test upload -r test" %
@ -356,7 +361,7 @@ def registerer(pypirc, monkeypatch):
@pytest.mark.skipif(sys.version_info[:2] == (3, 2),
reason="urllib3 fails on twine (see https://travis-ci"
".org/ankostis/pypiserver/builds/81044993)")
def test_twineUpload_open(empty_packdir, port, package, uploader, pypirc):
def test_twine_upload_open(empty_packdir, port, package, uploader, pypirc):
"""Test twine upload with no authentication"""
user, pswd = 'foo', 'bar'
update_pypirc(pypirc, port, user=user, pswd=pswd)
@ -374,7 +379,7 @@ def test_twineUpload_open(empty_packdir, port, package, uploader, pypirc):
@pytest.mark.skipif(sys.version_info[:2] == (3, 2),
reason="urllib3 fails on twine (see https://travis-ci"
".org/ankostis/pypiserver/builds/81044993)")
def test_twineUpload_authed(empty_packdir, port, package, uploader, pypirc):
def test_twine_upload_authed(empty_packdir, port, package, uploader, pypirc):
"""Test authenticated twine upload"""
user, pswd = 'a', 'a'
update_pypirc(pypirc, port, user=user, pswd=pswd)
@ -414,7 +419,7 @@ def test_twine_upload_partial_authed(empty_packdir, port, package, uploader,
@pytest.mark.skipif(sys.version_info[:2] == (3, 2),
reason="urllib3 fails on twine (see https://travis-ci"
".org/ankostis/pypiserver/builds/81044993)")
def test_twineRegister_open(open_server, package, registerer, pypirc):
def test_twine_register_open(open_server, package, registerer, pypirc):
"""Test unauthenticated twine registration"""
srv = open_server
update_pypirc(pypirc, srv.port)
@ -427,7 +432,8 @@ def test_twineRegister_open(open_server, package, registerer, pypirc):
@pytest.mark.skipif(sys.version_info[:2] == (3, 2),
reason="urllib3 fails on twine (see https://travis-ci"
".org/ankostis/pypiserver/builds/81044993)")
def test_twineRegister_authedOk(protected_server, package, registerer, pypirc):
def test_twine_register_authed_ok(protected_server, package, registerer,
pypirc):
"""Test authenticated twine registration"""
srv = protected_server
user, pswd = 'a', 'a'

@ -1,210 +1,11 @@
import os
import re as _re
import sys
"""Simple PyPI-compliant package server."""
version = __version__ = "1.2.3"
__version_info__ = tuple(_re.split('[.-]', __version__))
__updated__ = "2018-08-04 12:31:43"
__title__ = "pypiserver"
__summary__ = "A minimal PyPI server for use with pip/easy_install."
__uri__ = "https://github.com/pypiserver/pypiserver"
__updated__ = "2018-06-12 20:15:10"
class Configuration(object):
"""
.. see:: config-options: :func:`pypiserver.configure()`
"""
def __init__(self, **kwds):
vars(self).update(kwds)
def __repr__(self, *args, **kwargs):
return 'Configuration(**%s)' % vars(self)
def __str__(self, *args, **kwargs):
return 'Configuration:\n%s' % '\n'.join('%20s = %s' % (k, v)
for k, v in sorted(vars(self).items()))
def update(self, props):
d = props if isinstance(props, dict) else vars(props)
vars(self).update(d)
DEFAULT_SERVER = "auto"
def default_config(
root=None,
host="0.0.0.0",
port=8080,
server=DEFAULT_SERVER,
redirect_to_fallback=True,
fallback_url=None,
authenticated=['update'],
password_file=None,
overwrite=False,
hash_algo='md5',
verbosity=1,
log_file=None,
log_frmt="%(asctime)s|%(name)s|%(levelname)s|%(thread)d|%(message)s",
log_req_frmt="%(bottle.request)s",
log_res_frmt="%(status)s",
log_err_frmt="%(body)s: %(exception)s \n%(traceback)s",
welcome_file=None,
cache_control=None,
auther=None,
VERSION=__version__):
"""
Fetch default-opts with overridden kwds, capable of starting-up pypiserver.
Does not validate overridden options.
Example usage::
kwds = pypiserver.default_config(<override_kwds> ...)
## More modifications on kwds.
pypiserver.app(**kwds)``.
Kwds correspond to same-named cmd-line opts, with '-' --> '_' substitution.
Non standard args are described below:
:param return_defaults_only:
When `True`, returns defaults, otherwise,
configures "runtime" attributes and returns also the "packages"
found in the roots.
:param root:
A list of paths, derived from the packages specified on cmd-line.
If `None`, defaults to '~/packages'.
:param redirect_to_fallback:
see :option:`--disable-fallback`
:param authenticated:
see :option:`--authenticate`
:param password_file:
see :option:`--passwords`
:param log_file:
see :option:`--log-file`
Not used, passed here for logging it.
:param log_frmt:
see :option:`--log-frmt`
Not used, passed here for logging it.
:param callable auther:
An API-only options that if it evaluates to a callable,
it is invoked to allow access to protected operations
(instead of htpaswd mechanism) like that::
auther(username, password): bool
When defined, `password_file` is ignored.
:param host:
see :option:`--interface`
Not used, passed here for logging it.
:param port:
see :option:`--port`
Not used, passed here for logging it.
:param server:
see :option:`--server`
Not used, passed here for logging it.
:param verbosity:
see :option:`-v`
Not used, passed here for logging it.
:param VERSION:
Not used, passed here for logging it.
:return: a dict of defaults
"""
return locals()
def app(**kwds):
"""
:param dict kwds: Any overrides for defaults, as fetched by
:func:`default_config()`. Check the docstring of this function
for supported kwds.
"""
from . import core
_app = __import__("_app", globals(), locals(), ["."], 1)
sys.modules.pop('pypiserver._app', None)
kwds = default_config(**kwds)
config, packages = core.configure(**kwds)
_app.config = config
_app.packages = packages
_app.app.module = _app # HACK for testing.
return _app.app
def str2bool(s, default):
if s is not None and s != '':
return s.lower() not in ("no", "off", "0", "false")
return default
def _str_strip(string):
"""Provide a generic strip method to pass as a callback."""
return string.strip()
def paste_app_factory(global_config, **local_conf):
"""Parse a paste config and return an app."""
def upd_conf_with_bool_item(conf, attr, sdict):
conf[attr] = str2bool(sdict.pop(attr, None), conf[attr])
def upd_conf_with_str_item(conf, attr, sdict):
value = sdict.pop(attr, None)
if value is not None:
conf[attr] = value
def upd_conf_with_int_item(conf, attr, sdict):
value = sdict.pop(attr, None)
if value is not None:
conf[attr] = int(value)
def upd_conf_with_list_item(conf, attr, sdict, sep=' ', parse=_str_strip):
values = sdict.pop(attr, None)
if values:
conf[attr] = list(filter(None, map(parse, values.split(sep))))
def _make_root(root):
root = root.strip()
if root.startswith("~"):
return os.path.expanduser(root)
return root
c = default_config()
upd_conf_with_bool_item(c, 'overwrite', local_conf)
upd_conf_with_bool_item(c, 'redirect_to_fallback', local_conf)
upd_conf_with_list_item(c, 'authenticated', local_conf, sep=' ')
upd_conf_with_list_item(c, 'root', local_conf, sep='\n', parse=_make_root)
upd_conf_with_int_item(c, 'verbosity', local_conf)
str_items = [
'fallback_url',
'hash_algo',
'log_err_frmt',
'log_file',
'log_frmt',
'log_req_frmt',
'log_res_frmt',
'password_file',
'welcome_file'
]
for str_item in str_items:
upd_conf_with_str_item(c, str_item, local_conf)
# cache_control is undocumented; don't know what type is expected:
# upd_conf_with_str_item(c, 'cache_control', local_conf)
return app(**c)
def _logwrite(logger, level, msg):
if msg:
line_endings = ['\r\n', '\n\r', '\n']
for le in line_endings:
if msg.endswith(le):
msg = msg[:-len(le)]
if msg:
logger.log(level, msg)
# Interface
from ._app import app # noqa
from ._version import __version__, __version_info__, version # noqa

@ -6,20 +6,27 @@
"""
from __future__ import print_function
import getopt
import logging
import os
import re
import sys
import textwrap
import functools as ft
import logging
import sys
import warnings
from os import path
import pypiserver
from pypiserver import bottle
from pypiserver.config import Config
log = logging.getLogger('pypiserver.main')
def init_logging(level=None, frmt=None, filename=None):
"""Initialize the logging system.
:param int level: log level
:param str frmt: log formatting string
:param str filename: a filename to which to log
"""
logging.basicConfig(level=level, format=frmt)
rlog = logging.getLogger()
rlog.setLevel(level)
@ -27,271 +34,95 @@ def init_logging(level=None, frmt=None, filename=None):
rlog.addHandler(logging.FileHandler(filename))
def usage():
return textwrap.dedent("""\
pypi-server [OPTIONS] [PACKAGES_DIRECTORY...]
start PyPI compatible package server serving packages from
PACKAGES_DIRECTORY. If PACKAGES_DIRECTORY is not given on the
command line, it uses the default ~/packages. pypiserver scans this
directory recursively for packages. It skips packages and
directories starting with a dot. Multiple package directories can be
specified.
pypi-server understands the following options:
-p, --port PORT
listen on port PORT (default: 8080)
-i, --interface INTERFACE
listen on interface INTERFACE (default: 0.0.0.0, any interface)
-a, --authenticate (UPDATE|download|list), ...
comma-separated list of (case-insensitive) actions to authenticate
Use '.' or '' for empty. Requires to have set the password (-P option).
For example to password-protect package downloads (in addition to uploads)
while leaving listings public, give:
-P foo/htpasswd.txt -a update,download
To drop all authentications, use:
-P . -a .
Note that when uploads are not protected, the `register` command
is not necessary, but `~/.pypirc` still need username and password fields,
even if bogus.
By default, only 'update' is password-protected.
-P, --passwords PASSWORD_FILE
use apache htpasswd file PASSWORD_FILE to set usernames & passwords when
authenticating certain actions (see -a option).
If you want to allow un-authorized access, set this option and -a
explicitly to empty (either '.' or'').
--disable-fallback
disable redirect to real PyPI index for packages not found in the
local index
--fallback-url FALLBACK_URL
for packages not found in the local index, this URL will be used to
redirect to (default: https://pypi.python.org/simple)
--server METHOD
use METHOD to run the server. Valid values include paste,
cherrypy, twisted, gunicorn, gevent, wsgiref, auto. The
default is to use "auto" which chooses one of paste, cherrypy,
twisted or wsgiref.
-r, --root PACKAGES_DIRECTORY
[deprecated] serve packages from PACKAGES_DIRECTORY
-o, --overwrite
allow overwriting existing package files
--hash-algo ALGO
any `hashlib` available algo used as fragments on package links.
Set one of (0, no, off, false) to disabled it. (default: md5)
--welcome HTML_FILE
uses the ASCII contents of HTML_FILE as welcome message response.
-v
enable verbose logging; repeat for more verbosity.
--log-file <FILE>
write logging info into this FILE.
--log-frmt <FILE>
the logging format-string. (see `logging.LogRecord` class from standard python library)
[Default: %(asctime)s|%(name)s|%(levelname)s|%(thread)d|%(message)s]
--log-req-frmt FORMAT
a format-string selecting Http-Request properties to log; set to '%s' to see them all.
[Default: %(bottle.request)s]
--log-res-frmt FORMAT
a format-string selecting Http-Response properties to log; set to '%s' to see them all.
[Default: %(status)s]
--log-err-frmt FORMAT
a format-string selecting Http-Error properties to log; set to '%s' to see them all.
[Default: %(body)s: %(exception)s \n%(traceback)s]
--cache-control AGE
Add "Cache-Control: max-age=AGE, public" header to package downloads.
Pip 6+ needs this for caching.
def _logwrite(logger, level, msg):
"""Cut newlines off the end of log messages."""
if msg:
line_endings = ['\r\n', '\n\r', '\n']
for le in line_endings:
if msg.endswith(le):
msg = msg[:-len(le)]
if msg:
logger.log(level, msg)
pypi-server -h
pypi-server --help
show this help message
pypi-server --version
show pypi-server's version
pypi-server -U [OPTIONS] [PACKAGES_DIRECTORY...]
update packages in PACKAGES_DIRECTORY. This command searches
pypi.org for updates and shows a pip command line which
updates the package.
The following additional options can be specified with -U:
-x
execute the pip commands instead of only showing them
-d DOWNLOAD_DIRECTORY
download package updates to this directory. The default is to use
the directory which contains the latest version of the package to
be updated.
-u
allow updating to unstable version (alpha, beta, rc, dev versions)
Visit https://pypi.org/project/pypiserver/ for more information.
""")
def _update(config):
"""Output an update command or update packages."""
from pypiserver.manage import update_all_packages
update_all_packages(
config.roots,
config.download_directory,
dry_run=not(config.execute),
stable_only=not(config.unstable)
)
def main(argv=None):
import pypiserver
def _run_app_from_config(config):
"""Run a bottle application for the given config."""
init_logging(
level=config.verbosity, filename=config.log_file, frmt=config.log_frmt
)
if argv is None:
argv = sys.argv
# Handle new config format
if getattr(config, 'command', '') == 'update':
return _update(config)
# Handle deprecated config format
elif getattr(config, 'update_packages', False):
return _update(config)
command = "serve"
# TODO: move this to the auth plugin.
if hasattr(config, 'password_file'):
if (not config.authenticate and config.password_file != '.' or
config.authenticate and config.password_file == '.'):
auth_err = (
"When auth-ops-list is empty (-a=.), password-file (-P=%r) "
"must also be empty ('.')!"
)
sys.exit(auth_err % config.password_file)
c = pypiserver.Configuration(**pypiserver.default_config())
update_dry_run = True
update_directory = None
update_stable_only = True
try:
opts, roots = getopt.getopt(argv[1:], "i:p:a:r:d:P:Uuvxoh", [
"interface=",
"passwords=",
"authenticate=",
"port=",
"root=",
"server=",
"fallback-url=",
"disable-fallback",
"overwrite",
"hash-algo=",
"log-file=",
"log-frmt=",
"log-req-frmt=",
"log-res-frmt=",
"log-err-frmt=",
"welcome=",
"cache-control=",
"version",
"help"
])
except getopt.GetoptError:
err = sys.exc_info()[1]
sys.exit("usage error: %s" % (err,))
for k, v in opts:
if k in ("-p", "--port"):
try:
c.port = int(v)
except Exception:
err = sys.exc_info()[1]
sys.exit("Invalid port(%r) due to: %s" % (v, err))
elif k in ("-a", "--authenticate"):
c.authenticated = [a.lower()
for a in re.split("[, ]+", v.strip(" ,"))
if a]
if c.authenticated == ['.']:
c.authenticated = []
else:
actions = ("list", "download", "update")
for a in c.authenticated:
if a not in actions:
errmsg = "Action '%s' for option `%s` not one of %s!"
sys.exit(errmsg % (a, k, actions))
elif k in ("-i", "--interface"):
c.host = v
elif k in ("-r", "--root"):
roots.append(v)
elif k == "--disable-fallback":
c.redirect_to_fallback = False
elif k == "--fallback-url":
c.fallback_url = v
elif k == "--server":
c.server = v
elif k == "--welcome":
c.welcome_file = v
elif k == "--version":
print("pypiserver %s\n" % pypiserver.__version__)
return
elif k == "-U":
command = "update"
elif k == "-x":
update_dry_run = False
elif k == "-u":
update_stable_only = False
elif k == "-d":
update_directory = v
elif k in ("-P", "--passwords"):
c.password_file = v
elif k in ("-o", "--overwrite"):
c.overwrite = True
elif k in "--hash-algo":
c.hash_algo = None if not pypiserver.str2bool(v, c.hash_algo) else v
elif k == "--log-file":
c.log_file = v
elif k == "--log-frmt":
c.log_frmt = v
elif k == "--log-req-frmt":
c.log_req_frmt = v
elif k == "--log-res-frmt":
c.log_res_frmt = v
elif k == "--log-err-frmt":
c.log_err_frmt = v
elif k == "--cache-control":
c.cache_control = v
elif k == "-v":
c.verbosity += 1
elif k in ("-h", "--help"):
print(usage())
sys.exit(0)
if (not c.authenticated and c.password_file != '.' or
c.authenticated and c.password_file == '.'):
auth_err = "When auth-ops-list is empty (-a=.), password-file (-P=%r) must also be empty ('.')!"
sys.exit(auth_err % c.password_file)
if len(roots) == 0:
roots.append(os.path.expanduser("~/packages"))
roots=[os.path.abspath(x) for x in roots]
c.root = roots
verbose_levels=[
logging.WARNING, logging.INFO, logging.DEBUG, logging.NOTSET]
log_level=list(zip(verbose_levels, range(c.verbosity)))[-1][0]
init_logging(level=log_level, filename=c.log_file, frmt=c.log_frmt)
if command == "update":
from pypiserver.manage import update_all_packages
update_all_packages(roots, update_directory,
dry_run=update_dry_run, stable_only=update_stable_only)
return
# Fixes #49:
# The gevent server adapter needs to patch some
# modules BEFORE importing bottle!
if c.server and c.server.startswith('gevent'):
if config.server and config.server.startswith('gevent'):
import gevent.monkey # @UnresolvedImport
gevent.monkey.patch_all()
from pypiserver import bottle
if c.server not in bottle.server_names:
sys.exit("unknown server %r. choose one of %s" % (
c.server, ", ".join(bottle.server_names.keys())))
bottle.debug(config.verbosity < logging.INFO)
bottle._stderr = ft.partial(
_logwrite,
logging.getLogger(bottle.__name__),
logging.INFO
)
app = pypiserver.app(config)
bottle.run(
app=app,
host=config.host,
port=config.port,
server=config.server,
)
bottle.debug(c.verbosity > 1)
bottle._stderr = ft.partial(pypiserver._logwrite,
logging.getLogger(bottle.__name__), logging.INFO)
app = pypiserver.app(**vars(c))
bottle.run(app=app, host=c.host, port=c.port, server=c.server)
def _warn_deprecation():
"""Set warning filters to show a deprecation warning."""
warnings.filterwarnings('always', category=DeprecationWarning)
warnings.warn(DeprecationWarning(
'The "pypi-server" command has been deprecated and will be removed '
'in the next major release. Please use "pypiserver run" or '
'"pypiserver update" instead.'
))
warnings.filterwarnings('default', category=DeprecationWarning)
def main(argv=None):
"""Run the deprecated pypi-server command."""
caller = path.basename(sys.argv[0])
if caller == 'pypi-server':
_warn_deprecation()
elif caller != 'pypiserver':
# Allow calling via API from other scripts, but adjust the caller
# to get the default config type.
caller = 'pypiserver'
config = Config(parser_type=caller).get_parser().parse_args(args=argv)
_run_app_from_config(config)
if __name__ == "__main__":

@ -1,19 +1,14 @@
from collections import namedtuple
"""Define endpoints for the server application."""
import logging
import mimetypes
import os
import re
import zipfile
import xml.dom.minidom
from . import __version__
from . import core
from .bottle import static_file, redirect, request, response, HTTPError, Bottle, template
try:
import xmlrpc.client as xmlrpclib # py3
except ImportError:
import xmlrpclib # py2
import zipfile
from collections import namedtuple
from warnings import warn
try:
from io import BytesIO
@ -25,89 +20,82 @@ try: # PY3
except ImportError: # PY2
from urlparse import urljoin
try:
import xmlrpc.client as xmlrpclib # py3
except ImportError:
import xmlrpclib # py2
from . import core
from ._version import __version__
from .plugins.authenticators.interface import convert_legacy
from .bottle import (
redirect,
request,
response,
static_file,
template,
Bottle,
HTTPError
)
log = logging.getLogger(__name__)
packages = None
config = None
app = Bottle()
class auth(object):
"""decorator to apply authentication if specified for the decorated method & action"""
def __init__(self, action):
self.action = action
def __call__(self, method):
def protector(*args, **kwargs):
if self.action in config.authenticated:
if not request.auth or request.auth[1] is None:
raise HTTPError(
401, headers={"WWW-Authenticate": 'Basic realm="pypi"'}
)
if not config.auther(*request.auth):
raise HTTPError(403)
return method(*args, **kwargs)
return protector
@app.hook('before_request')
def log_request():
log.info(config.log_req_frmt, request.environ)
@app.hook('after_request')
def log_response():
log.info(config.log_res_frmt, { # vars(response)) ## DOES NOT WORK!
'response': response,
'status': response.status, 'headers': response.headers,
'body': response.body, 'cookies': response._cookies,
})
@app.error
def log_error(http_error):
log.info(config.log_err_frmt, vars(http_error))
@app.route("/favicon.ico")
def favicon():
return HTTPError(404)
@app.route('/')
def root():
fp = request.fullpath
try:
numpkgs = len(list(packages()))
except:
numpkgs = 0
# Ensure template() does not consider `msg` as filename!
msg = config.welcome_msg + '\n'
return template(msg,
URL=request.url,
VERSION=__version__,
NUMPKGS=numpkgs,
PACKAGES=urljoin(fp, "packages/"),
SIMPLE=urljoin(fp, "simple/")
)
_bottle_upload_filename_re = re.compile(r'^[a-z0-9_.!+-]+$', re.I)
Upload = namedtuple('Upload', 'pkg sig')
def app(config=None, auther=None, **kwargs):
"""Return a hydrated app using the provided config and ``auther``.
:param argparse.Namespace config: a hydrated config namespace
:param callable auther: a callable authenticator
:param dict kwargs: DEPRECATED. Config keyword arguments.
"""
config, packages = core.configure(config)
if auther is not None:
warn(DeprecationWarning(
'Passing an authentication callable to app() is deprecated. '
'Please create and use an authenticator plugin instead.'
))
config.auther = convert_legacy(auther)
elif callable(getattr(config, 'auther', None)):
warn(DeprecationWarning(
'Passing an authentication callable manually via the config '
'is deprecated. Please create and use an authenticator plugin '
'instead.'
))
config.auther = convert_legacy(config.auther)
else:
config.auther = config.plugins['authenticators'][config.auth_backend](
config
)
if kwargs:
warn(DeprecationWarning(
'Passing arbitrary keyword arguments to app() has been '
'deprecated. Please use config.Config to generate '
'a config and pass it to this function.'
))
for key, value in kwargs:
if key in config:
setattr(config, key, value)
return create_app(config, packages)
def is_valid_pkg_filename(fname):
"""See https://github.com/pypiserver/pypiserver/issues/102"""
"""See https://github.com/pypiserver/pypiserver/issues/102."""
return _bottle_upload_filename_re.match(fname) is not None
def doc_upload():
try:
content = request.files['content']
content = request.files['content'] # pylint: disable=E1136
except KeyError:
raise HTTPError(400, "Missing 'content' file-field!")
zip_data = content.file.read()
@ -118,220 +106,314 @@ def doc_upload():
raise HTTPError(400, "not a zip file")
def remove_pkg():
name = request.forms.get("name")
version = request.forms.get("version")
if not name or not version:
msg = "Missing 'name'/'version' fields: name=%s, version=%s"
raise HTTPError(400, msg % (name, version))
found = None
for pkg in core.find_packages(packages()):
if pkg.pkgname == name and pkg.version == version:
found = pkg
break
if found is None:
raise HTTPError(404, "%s (%s) not found" % (name, version))
os.unlink(found.fn)
def create_app(config, packages):
"""Create a bottle application serving pypiserver."""
app = Bottle()
app.config = config
app.packages = packages
class auth(object):
"""Apply authentication for the decorated method & action."""
Upload = namedtuple('Upload', 'pkg sig')
def __init__(self, action):
self.action = action
def __call__(self, method):
def file_upload():
ufiles = Upload._make(
request.files.get(f, None) for f in ('content', 'gpg_signature'))
if not ufiles.pkg:
raise HTTPError(400, "Missing 'content' file-field!")
if (ufiles.sig and
'%s.asc' % ufiles.pkg.raw_filename != ufiles.sig.raw_filename):
raise HTTPError(400, "Unrelated signature %r for package %r!",
ufiles.sig, ufiles.pkg)
def protector(*args, **kwargs):
# if self.action in config.authenticated:
if self.action in config.authenticate:
if not request.auth or request.auth[1] is None:
raise HTTPError(
401,
headers={"WWW-Authenticate": 'Basic realm="pypi"'}
)
if not config.auther.authenticate(request):
raise HTTPError(403)
return method(*args, **kwargs)
for uf in ufiles:
if not uf:
continue
if (not is_valid_pkg_filename(uf.raw_filename) or
core.guess_pkgname_and_version(uf.raw_filename) is None):
raise HTTPError(400, "Bad filename: %s" % uf.raw_filename)
return protector
if not config.overwrite and core.exists(packages.root, uf.raw_filename):
log.warn("Cannot upload %r since it already exists! \n"
" You may start server with `--overwrite` option. ",
uf.raw_filename)
raise HTTPError(409, "Package %r already exists!\n"
" You may start server with `--overwrite` option.",
uf.raw_filename)
def remove_pkg():
name = request.forms.get("name") # pylint: disable=E1101
version = request.forms.get("version") # pylint: disable=E1101
if not name or not version:
msg = "Missing 'name'/'version' fields: name=%s, version=%s"
raise HTTPError(400, msg % (name, version))
found = None
for pkg in core.find_packages(packages()):
if pkg.pkgname == name and pkg.version == version:
found = pkg
break
if found is None:
raise HTTPError(404, "%s (%s) not found" % (name, version))
os.unlink(found.fn)
core.store(packages.root, uf.raw_filename, uf.save)
if request.auth:
user = request.auth[0]
def file_upload():
ufiles = Upload._make(
request.files.get(f, None) for f in ('content', 'gpg_signature') # noqa pylint: disable=E1101
)
if not ufiles.pkg:
raise HTTPError(400, "Missing 'content' file-field!")
if (ufiles.sig and
'%s.asc' % ufiles.pkg.raw_filename != ufiles.sig.raw_filename):
raise HTTPError(400, "Unrelated signature %r for package %r!",
ufiles.sig, ufiles.pkg)
for uf in ufiles:
if not uf:
continue
if (not is_valid_pkg_filename(uf.raw_filename) or
core.guess_pkgname_and_version(uf.raw_filename) is None):
raise HTTPError(400, "Bad filename: %s" % uf.raw_filename)
if (not config.overwrite
and core.exists(packages.root, uf.raw_filename)):
log.warn(
"Cannot upload %r since it already exists! \n"
" You may start server with `--overwrite` option. ",
uf.raw_filename
)
raise HTTPError(
409, "Package %r already exists!\n"
" You may start server with `--overwrite` option.",
uf.raw_filename
)
core.store(packages.root, uf.raw_filename, uf.save)
if request.auth:
user = request.auth[0]
else:
user = 'anon'
log.info('User %r stored %r.', user, uf.raw_filename)
@app.hook('before_request')
def log_request():
log.info(config.log_req_frmt, request.environ)
@app.hook('after_request')
def log_response():
log.info(config.log_res_frmt, { # vars(response)) ## DOES NOT WORK!
'response': response,
'status': response.status, 'headers': response.headers,
'body': response.body, 'cookies': response._cookies,
})
@app.error
def log_error(http_error):
log.info(config.log_err_frmt, vars(http_error))
@app.route("/favicon.ico")
def favicon():
return HTTPError(404)
@app.route('/')
def root():
fp = request.fullpath
try:
numpkgs = len(list(packages()))
except Exception:
numpkgs = 0
# Ensure template() does not consider `msg` as filename!
msg = config.welcome_msg + '\n'
return template(
msg,
URL=request.url,
VERSION=__version__,
NUMPKGS=numpkgs,
PACKAGES=urljoin(fp, "packages/"),
SIMPLE=urljoin(fp, "simple/")
)
@app.post('/')
@auth("update")
def update():
try:
action = request.forms[':action'] # noqa pylint: disable=unsubscriptable-object
except KeyError:
raise HTTPError(400, "Missing ':action' field!")
if action in ("verify", "submit"):
log.warning("Ignored ':action': %s", action)
elif action == "doc_upload":
doc_upload()
elif action == "remove_pkg":
remove_pkg()
elif action == "file_upload":
file_upload()
else:
user = 'anon'
log.info('User %r stored %r.', user, uf.raw_filename)
raise HTTPError(400, "Unsupported ':action' field: %s" % action)
return ""
@app.post('/')
@auth("update")
def update():
try:
action = request.forms[':action']
except KeyError:
raise HTTPError(400, "Missing ':action' field!")
@app.route("/simple")
@app.route("/simple/:prefix")
@app.route('/packages')
@auth("list")
def pep_503_redirects(prefix=None):
return redirect(request.fullpath + "/", 301)
if action in ("verify", "submit"):
log.warning("Ignored ':action': %s", action)
elif action == "doc_upload":
doc_upload()
elif action == "remove_pkg":
remove_pkg()
elif action == "file_upload":
file_upload()
else:
raise HTTPError(400, "Unsupported ':action' field: %s" % action)
@app.post('/RPC2')
@auth("list")
def handle_rpc():
"""Handle pip-style RPC2 search requests"""
parser = xml.dom.minidom.parse(request.body)
methodname = parser.getElementsByTagName(
"methodName")[0].childNodes[0].wholeText.strip()
log.info("Processing RPC2 request for '%s'", methodname)
if methodname == 'search':
value = parser.getElementsByTagName(
"string")[0].childNodes[0].wholeText.strip()
response = []
ordering = 0
for p in packages():
if p.pkgname.count(value) > 0:
# We do not presently have any description/summary,
# returning version instead
response.append({
'_pypi_ordering': ordering,
'version': p.version,
'name': p.pkgname,
'summary': p.version
})
ordering += 1
call_string = xmlrpclib.dumps(
(response,),
'search',
methodresponse=True
)
return call_string
return ""
@app.route("/simple/")
@auth("list")
def simpleindex():
links = sorted(core.get_prefixes(packages()))
tmpl = """\
<html>
<head>
<title>Simple Index</title>
</head>
<body>
<h1>Simple Index</h1>
% for p in links:
<a href="{{p}}/">{{p}}</a><br>
% end
</body>
</html>
"""
return template(tmpl, links=links)
@app.route("/simple/:prefix/")
@auth("list")
def simple(prefix=""):
# PEP 503: require normalized prefix
normalized = core.normalize_pkgname(prefix)
if prefix != normalized:
return redirect('/simple/{0}/'.format(normalized), 301)
@app.route("/simple")
@app.route("/simple/:prefix")
@app.route('/packages')
@auth("list")
def pep_503_redirects(prefix=None):
return redirect(request.fullpath + "/", 301)
files = sorted(
core.find_packages(packages(), prefix=prefix),
key=lambda x: (x.parsed_version, x.relfn)
)
if not files:
if config.redirect_to_fallback:
return redirect(
"%s/%s/" % (config.fallback_url.rstrip("/"), prefix)
)
return HTTPError(
404,
'Not Found (%s does not exist)\n\n' % normalized
)
fp = request.fullpath
links = [
(
os.path.basename(f.relfn),
urljoin(
fp,
"../../packages/%s" % f.fname_and_hash(config.hash_algo)
)
)
for f in files
]
tmpl = """\
<html>
<head>
<title>Links for {{prefix}}</title>
</head>
<body>
<h1>Links for {{prefix}}</h1>
% for file, href in links:
<a href="{{href}}">{{file}}</a><br>
% end
</body>
</html>
"""
return template(tmpl, prefix=prefix, links=links)
@app.post('/RPC2')
@auth("list")
def handle_rpc():
"""Handle pip-style RPC2 search requests"""
parser = xml.dom.minidom.parse(request.body)
methodname = parser.getElementsByTagName(
"methodName")[0].childNodes[0].wholeText.strip()
log.info("Processing RPC2 request for '%s'", methodname)
if methodname == 'search':
value = parser.getElementsByTagName(
"string")[0].childNodes[0].wholeText.strip()
response = []
ordering = 0
for p in packages():
if p.pkgname.count(value) > 0:
# We do not presently have any description/summary, returning
# version instead
d = {'_pypi_ordering': ordering, 'version': p.version,
'name': p.pkgname, 'summary': p.version}
response.append(d)
ordering += 1
call_string = xmlrpclib.dumps((response,), 'search',
methodresponse=True)
return call_string
@app.route('/packages/')
@auth("list")
def list_packages():
fp = request.fullpath
files = sorted(
core.find_packages(packages()),
key=lambda x: (
os.path.dirname(x.relfn),
x.pkgname,
x.parsed_version
)
)
links = [
(f.relfn_unix, urljoin(fp, f.fname_and_hash(config.hash_algo)))
for f in files
]
tmpl = """\
<html>
<head>
<title>Index of packages</title>
</head>
<body>
<h1>Index of packages</h1>
% for file, href in links:
<a href="{{href}}">{{file}}</a><br>
% end
</body>
</html>
"""
return template(tmpl, links=links)
@app.route('/packages/:filename#.*#')
@auth("download")
def server_static(filename):
entries = core.find_packages(packages())
for x in entries:
f = x.relfn_unix
if f == filename:
response = static_file(
filename,
root=x.root,
mimetype=mimetypes.guess_type(filename)[0]
)
if config.cache_control:
response.set_header(
"Cache-Control",
"public, max-age=%s" % config.cache_control
)
return response
@app.route("/simple/")
@auth("list")
def simpleindex():
links = sorted(core.get_prefixes(packages()))
tmpl = """\
<html>
<head>
<title>Simple Index</title>
</head>
<body>
<h1>Simple Index</h1>
% for p in links:
<a href="{{p}}/">{{p}}</a><br>
% end
</body>
</html>
"""
return template(tmpl, links=links)
return HTTPError(404, 'Not Found (%s does not exist)\n\n' % filename)
@app.route('/:prefix')
@app.route('/:prefix/')
def bad_url(prefix):
p = request.fullpath
if p.endswith("/"):
p = p[:-1]
p = p.rsplit('/', 1)[0]
p += "/simple/%s/" % prefix
@app.route("/simple/:prefix/")
@auth("list")
def simple(prefix=""):
# PEP 503: require normalized prefix
normalized = core.normalize_pkgname(prefix)
if prefix != normalized:
return redirect('/simple/{0}/'.format(normalized), 301)
files = sorted(core.find_packages(packages(), prefix=prefix),
key=lambda x: (x.parsed_version, x.relfn))
if not files:
if config.redirect_to_fallback:
return redirect("%s/%s/" % (config.fallback_url.rstrip("/"), prefix))
return HTTPError(404, 'Not Found (%s does not exist)\n\n' % normalized)
fp = request.fullpath
links = [(os.path.basename(f.relfn),
urljoin(fp, "../../packages/%s" % f.fname_and_hash(config.hash_algo)))
for f in files]
tmpl = """\
<html>
<head>
<title>Links for {{prefix}}</title>
</head>
<body>
<h1>Links for {{prefix}}</h1>
% for file, href in links:
<a href="{{href}}">{{file}}</a><br>
% end
</body>
</html>
"""
return template(tmpl, prefix=prefix, links=links)
@app.route('/packages/')
@auth("list")
def list_packages():
fp = request.fullpath
files = sorted(core.find_packages(packages()),
key=lambda x: (os.path.dirname(x.relfn),
x.pkgname,
x.parsed_version))
links = [(f.relfn_unix, urljoin(fp, f.fname_and_hash(config.hash_algo)))
for f in files]
tmpl = """\
<html>
<head>
<title>Index of packages</title>
</head>
<body>
<h1>Index of packages</h1>
% for file, href in links:
<a href="{{href}}">{{file}}</a><br>
% end
</body>
</html>
"""
return template(tmpl, links=links)
@app.route('/packages/:filename#.*#')
@auth("download")
def server_static(filename):
entries = core.find_packages(packages())
for x in entries:
f = x.relfn_unix
if f == filename:
response = static_file(
filename, root=x.root, mimetype=mimetypes.guess_type(filename)[0])
if config.cache_control:
response.set_header(
"Cache-Control", "public, max-age=%s" % config.cache_control)
return response
return HTTPError(404, 'Not Found (%s does not exist)\n\n' % filename)
@app.route('/:prefix')
@app.route('/:prefix/')
def bad_url(prefix):
p = request.fullpath
if p.endswith("/"):
p = p[:-1]
p = p.rsplit('/', 1)[0]
p += "/simple/%s/" % prefix
return redirect(p)
return redirect(p)
return app

6
pypiserver/_version.py Normal file

@ -0,0 +1,6 @@
"""Pypiserver version info."""
import re
__version__ = version = "2.0.0"
__version_info__ = tuple(re.split('[.-]', __version__))

@ -1,27 +1,27 @@
#
# The cache implementation is only used when the watchdog package
# is installed
#
"""A naive caching implementation using watchdog.
The cache implementation is only used when the watchdog package is installed
"""
from os.path import dirname
from watchdog.observers import Observer
import threading
class CacheManager(object):
"""
A naive cache implementation for listdir and digest_file
"""A naive cache implementation for listdir and digest_file
The listdir_cache is just a giant list of PkgFile objects, and
for simplicity it is invalidated anytime a modification occurs
within the directory it represents. If we were smarter about
the way that the listdir data structure were created/stored,
then we could do more granular invalidation. In practice, this
is good enough for now.
The listdir_cache is just a giant list of PkgFile objects, and
for simplicity it is invalidated anytime a modification occurs
within the directory it represents. If we were smarter about
the way that the listdir data structure were created/stored,
then we could do more granular invalidation. In practice, this
is good enough for now.
The digest_cache exists on a per-file basis, because computing
hashes on large files can get expensive, and it's very easy to
invalidate specific filenames.
The digest_cache exists on a per-file basis, because computing
hashes on large files can get expensive, and it's very easy to
invalidate specific filenames.
"""
def __init__(self):
@ -65,7 +65,7 @@ class CacheManager(object):
cache = self.digest_cache[hash_algo]
except KeyError:
cache = self.digest_cache.setdefault(hash_algo, {})
try:
return cache[fpath]
except KeyError:
@ -85,6 +85,7 @@ class CacheManager(object):
self.watched.add(root)
self.observer.schedule(_EventHandler(self, root), root, recursive=True)
class _EventHandler(object):
def __init__(self, cache, root):
@ -117,4 +118,5 @@ class _EventHandler(object):
for path in paths:
subcache.pop(path, None)
cache_manager = CacheManager()

590
pypiserver/config.py Normal file

@ -0,0 +1,590 @@
"""Define utilities for parsing and consuming config options."""
import logging
import re
from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser
from hashlib import algorithms_available
from os import environ, path
from textwrap import dedent
import pkg_resources
from . import __version__
from .bottle import server_names
from .core import load_plugins
from .const import STANDALONE_WELCOME
_AUTH_RE = re.compile(r'[, ]+')
_AUTH_ACTIONS = ('download', 'list', 'update')
_FALSES = ('no', 'off', '0', 'false')
def str2bool(string):
"""Convert a string into a boolean."""
return string.lower() not in _FALSES
def _get_welcome_file():
"""Get the welcome file or set a constant for the standalone package."""
try:
return pkg_resources.resource_filename(
'pypiserver', _Defaults.welcome_file
)
except NotImplementedError: # raised in standalone zipfile.
return STANDALONE_WELCOME
class _Defaults(object):
"""Define default constants."""
authenticate = 'update'
fallback_url = 'https://pypi.org/simple'
hash_algo = 'md5'
host = '0.0.0.0'
log_fmt = '%(asctime)s|%(name)s|%(levelname)s|%(thread)d|%(message)s'
log_req_fmt = '%(bottle.request)s'
log_res_fmt = '%(status)s'
log_err_fmt = '%(body)s: %(exception)s \n%(traceback)s'
overwrite = False
port = 8080
redirect_to_fallback = True
roots = ['~/packages']
server = 'auto'
welcome_file = 'welcome.html'
class _HelpFormatter(ArgumentDefaultsHelpFormatter):
"""A custom formatter to flip our one confusing argument.
``--disable-fallback`` is stored as ``redirect_to_fallback``,
so the actual boolean value is opposite of what one would expect.
The :ref:`ArgumentDefaultsHelpFormatter` doesn't really have any
way of dealing with this situation, so we special case it.
"""
def _get_help_string(self, action):
"""Return the help string for the action."""
if '--disable-fallback' in action.option_strings:
return action.help + ' (default: False)'
else:
return super(_HelpFormatter, self)._get_help_string(action)
class _CustomParsers(object):
"""Collect custom parsers."""
@staticmethod
def auth(auth_str):
"""Parse the auth string to yield a list of authenticated actions.
:param str auth_str: a string of comma-separated auth actions
:return: a list of validated auth actions
:rtype: List[str]
"""
authed = [
a.lower() for a in _AUTH_RE.split(auth_str.strip(' ,')) if a
]
if len(authed) == 1 and authed[0] == '.':
return []
for a in authed:
if a not in _AUTH_ACTIONS:
errmsg = 'Authentication action "%s" not one of %s!'
raise ValueError(errmsg % (a, _AUTH_ACTIONS))
return authed
@staticmethod
def hash_algo(hash_algo):
"""Parse the hash algorithm.
If the user set the algorithm to a falsey string, return ``None``.
Otherwise, return the set or default value.
"""
if not str2bool(hash_algo):
return None
return hash_algo
@staticmethod
def roots(roots):
"""Expand user home and update roots to absolute paths."""
return [path.abspath(path.expanduser(r)) for r in roots]
@staticmethod
def verbosity(verbosity):
"""Convert the verbosity level to a logging level.
:param int verbosity: the count of -v values from the commandline
:return: a logging constant appropriate for the specified verbosity
:rtype: int
"""
verbosities = (
logging.WARNING, logging.INFO, logging.DEBUG, logging.NOTSET
)
try:
return verbosities[verbosity]
except IndexError:
return verbosities[-1]
class _PypiserverParser(ArgumentParser):
"""Allow extra actions following the final parse.
Actions like "count", and regular "store" actions when "nargs" is
specified, do not allow the specification of a "type" function,
which means we can't do on-the-fly massaging of those values. Instead,
we call them separately here.
"""
extra_parsers = {
'authenticate': _CustomParsers.auth,
'hash_algo': _CustomParsers.hash_algo,
'roots': _CustomParsers.roots,
'verbosity': _CustomParsers.verbosity,
}
def parse_args(self, args=None, namespace=None):
"""Parse arguments."""
parsed = super(_PypiserverParser, self).parse_args(
args=args, namespace=namespace
)
if not hasattr(parsed, 'authenticate'):
# Ensure a useful value is present even when no auth
# plugins are installed.
parsed.authenticate = _Defaults.authenticate
for attr, parser in self.extra_parsers.items():
if hasattr(parsed, attr):
setattr(parsed, attr, parser(getattr(parsed, attr)))
return parsed
class Config(object):
"""Factory for pypiserver configs and parsers."""
def __init__(self, parser_cls=_PypiserverParser,
help_formatter=_HelpFormatter, parser_type='pypiserver'):
"""Instantiate the factory.
:param argparse.HelpFormatter help_formatter: the HelpForamtter class
to use for the parser
:param str parser_type: one of 'pypi-server' (deprecated) or
'pypiserver'
"""
self.help_formatter = help_formatter
self.parser_cls = parser_cls
self.parser_type = parser_type
self._plugins = load_plugins()
def get_default(self, subcommand='run'):
"""Return a parsed config with default argument values.
:param str subcommand:
the subcommand for which to return default arguments.
:rtype: argparse.Namespace
"""
return self.get_parser().parse_args([subcommand])
def get_parsed(self):
"""Return arguments parsed from the commandline.
:rtype: argparse.Namespace
"""
return self.get_parser().parse_args()
def get_parser(self):
"""Return an ArgumentParser instance with all arguments populated.
:rtype: PypiserverParser
"""
if self.parser_type == 'pypiserver':
return self._get_parser()
elif self.parser_type == 'pypi-server':
return self._get_deprecated_parser()
else:
raise ValueError(
'Unsupported parser_type: {}'.format(self.parser_type)
)
def from_kwargs(self, **kwargs):
"""Return a default config updated with the provided kwargs.
:param dict kwargs: key-value pairs with which to populate the
config. Keys may be provided that are not in the default
config.
"""
conf = self.get_default()
for key, value in kwargs.items():
setattr(conf, key, value)
return conf
def _get_parser(self):
"""Return a hydrated parser."""
parser = self.parser_cls(
description='PyPI-compatible package server',
formatter_class=self.help_formatter
)
self.add_root_args(parser)
self.add_logging_arg_group(parser)
subparsers = parser.add_subparsers(dest='command', help='commands')
self.add_run_subcommand(subparsers)
self.add_update_subcommand(subparsers)
return parser
def _get_deprecated_parser(self):
"""Return the deprecated parser."""
parser = self.parser_cls(
description='PyPI-compatible package server',
formatter_class=self.help_formatter
)
self.add_root_args(parser)
self.add_server_arg_group(parser)
self.add_security_arg_group(parser)
self.add_logging_arg_group(parser)
self.add_http_logging_group(parser)
self.add_deprecated_update_arg_group(parser)
return parser
@staticmethod
def add_root_args(parser):
"""Add root-level arguments to the parser.
:param ArgumentParser parser: an ArgumentParser instance
"""
parser.add_argument(
'-v', '--verbose',
dest='verbosity',
action='count',
default=0,
help=(
'Increase verbosity. May be specified multiple times for '
'extra verbosity'
)
)
parser.add_argument(
'--version',
action='version',
version='%(prog)s {}'.format(__version__),
)
def add_run_subcommand(self, subparsers):
"""Add the "update" command to the subparsers instance.
:param subparsers: an ArgumentParser subparser.
"""
run = subparsers.add_parser('run', help='run pypiserver')
self.add_server_arg_group(run)
self.add_security_arg_group(run)
self.add_plugin_args_run(run)
self.add_http_logging_group(run)
@staticmethod
def add_update_subcommand(subparsers):
"""Add the "update" command to the subparsers instance.
:param subparsers: an ArgumentParser subparser.
"""
update = subparsers.add_parser('update', help='update packages')
update.add_argument(
'roots',
nargs='*',
default=_Defaults.roots,
metavar='root',
help=('update packages in root(s). This command '
'searches pypi.org for updates and outputs a pip command '
'which can be run to update the packages')
)
update.add_argument(
'-x', '--execute',
action='store_true',
help='execute the pip commands instead of only showing them'
)
update.add_argument(
'--pre',
action='store_true',
help='allow updating to prerelease versions (alpha, beta, rc, dev)'
)
update.add_argument(
'--download-directory',
help=('download updates to this directory. The default is to use '
'the directory containing the packages to be updated')
)
@staticmethod
def add_server_arg_group(parser):
"""Add arguments for running pypiserver.
:param ArgumentParser parser: an ArgumentParser instance
"""
server = parser.add_argument_group(
title='Server',
description='Configure the pypiserver instance'
)
server.add_argument(
'roots',
default=_Defaults.roots,
metavar='root',
nargs='*',
help=(dedent('''\
serve packages from the specified root directory. Multiple
root directories may be specified. If no root directory is
provided, %(default)s will be used. Root directories will
be scanned recursively for packages. Files and directories
starting with a dot are ignored.
'''))
)
server.add_argument(
'-i', '--interface',
default=environ.get('PYPISERVER_INTERFACE', _Defaults.host),
dest='host',
help='listen on interface INTERFACE'
)
server.add_argument(
'-p', '--port',
default=environ.get('PYPISERVER_PORT', _Defaults.port),
type=int,
help='listen on port PORT',
)
server.add_argument(
'-o', '--overwrite',
action='store_true',
default=environ.get('PYPISERVER_OVERWRITE', _Defaults.overwrite),
help='allow overwriting existing package files',
)
server.add_argument(
'--fallback-url',
default=environ.get(
'PYPISERVER_FALLBACK_URL',
_Defaults.fallback_url,
),
help=('for packages not found in the local index, return a '
'redirect to this URL')
)
server.add_argument(
'--disable-fallback',
action='store_false',
default=environ.get(
'PYPISERVER_DISABLE_FALLBACK',
_Defaults.redirect_to_fallback,
),
dest='redirect_to_fallback',
help=('disable redirect to real PyPI index for packages not found '
'in the local index')
)
server.add_argument(
'--server',
choices=server_names,
default=environ.get('PYPISERVER_SERVER', _Defaults.server),
metavar='METHOD',
help=(dedent('''\
use METHOD to run the server. Valid values include paste,
cherrypy, twisted, gunicorn, gevent, wsgiref, auto. The
default is to use "auto" which chooses one of paste, cherrypy,
twisted or wsgiref
'''))
)
server.add_argument(
'--hash-algo',
choices=tuple(a.lower() for a in algorithms_available) + _FALSES,
default=environ.get('PYPISERVER_HASH_ALGO', _Defaults.hash_algo),
metavar='ALGO',
help=('any `hashlib` available algo used as fragments on package '
'links. Set one of (0, no, off, false) to disabled it')
)
server.add_argument(
'--welcome',
default=environ.get(
'PYPISERVER_WELCOME',
_get_welcome_file()
),
dest='welcome_file',
metavar='HTML_FILE',
help='uses the ASCII contents of HTML_FILE as welcome message'
)
server.add_argument(
'--cache-control',
default=environ.get('PYPISERVER_CACHE_CONTROL'),
metavar='AGE',
type=int,
help=('Add "Cache-Control: max-age=AGE, public" header to package '
'downloads. Pip 6+ needs this for caching')
)
def add_security_arg_group(self, parser):
"""Add security arguments to the parser.
:param ArgumentParser parser: an ArgumentParser instance
"""
auth_plugins_available = bool(
set(self._plugins['authenticators']).difference(
set(('no-auth',))
)
)
security = parser.add_argument_group(
title='Security',
description='Configure pypiserver access controls'
)
if auth_plugins_available or self.parser_type == 'pypi-server':
# Do not bother to show authentication arguments when no
# non-dummy auth plugins are installed.
security.add_argument(
'-a', '--authenticate',
default=environ.get(
'PYPISERVER_AUTHENTICATE',
_Defaults.authenticate,
),
# TODO: pull some of this long stuff out into an epilog
help=dedent('''\
comma-separated list of (case-insensitive) actions to
authenticate. Use "." for no authentication. Requires the
password (-P option) to be set. For example to
password-protect package downloads (in addition to
uploads), while leaving listings public, use:
`-P foo/htpasswd.txt -a update,download`.
To drop all authentications, use: `-P . -a .`.
Note that when uploads are not protected, the `register`
command is not necessary, but `~/.pypirc` still requires
username and password fields, even if bogus. By default,
only %(default)s is password-protected
''')
)
if self.parser_type == 'pypi-server':
# This argument is created by the `pypiserver-passlib` plugin
# for pypiserver>=2.0
security.add_argument(
'-P', '--passwords',
dest='password_file',
default=environ.get('PYPISERVER_PASSWORD_FILE'),
help=dedent('''\
use apache htpasswd file PASSWORD_FILE to set usernames &
passwords when authenticating certain actions (see
-a option). If you want to allow unauthorized access,
set this option and -a to '.'
''')
)
security.add_argument(
'--auth-backend',
default=environ.get(
'PYPISERVER_AUTH_BACKEND',
'passlib' if 'passlib' in self._plugins['authenticators']
else 'no-auth'
),
choices=self._plugins['authenticators'].keys(),
help=(
'Specify an authentication backend. By default, will attempt '
'to use an htpasswd file if provided. If specified, must '
'correspond to an installed auth plugin.'
)
)
@staticmethod
def add_plugin_group(parser, name, plugin):
"""Add a plugin group to the parser."""
group = parser.add_argument_group(
title='{} ({} plugin)'.format(plugin.plugin_name, name),
description=plugin.plugin_help,
)
plugin.update_parser(group)
def add_plugin_args_run(self, parser):
"""Add plugin args for the "run" subcommand.
:param ArgumentParser parser: the "run" subcommand parser
"""
for name, plugin in self._plugins['authenticators'].items():
self.add_plugin_group(parser, name, plugin)
@staticmethod
def add_logging_arg_group(parser):
"""Add pypiserver logging arguments.
:param ArgumentParser parser: an ArgumentParser instance
"""
logs = parser.add_argument_group(title='logs')
logs.add_argument(
'--log-file',
default=environ.get('PYPISERVER_LOG_FILE'),
help='write logging info into LOG_FILE'
)
logs.add_argument(
'--log-frmt',
default=environ.get('PYPISERVER_LOG_FRMT', _Defaults.log_fmt),
metavar='FORMAT',
help=('the logging format string. (see `logging.LogRecord` class '
'from standard python library)')
)
@staticmethod
def add_http_logging_group(parser):
"""Add a group to with HTTP logging arguments.
:param ArgumentParser parser: an ArgumentParser instance
"""
http_logs = parser.add_argument_group(
title='HTTP logs',
description='Define the logging format for HTTP events'
)
http_logs.add_argument(
'--log-req-frmt',
default=environ.get(
'PYPISERVER_LOG_REQ_FRMT',
_Defaults.log_req_fmt,
),
metavar='FORMAT',
help=('a format-string selecting Http-Request properties to log; '
'set to "%s" to see them all')
)
http_logs.add_argument(
'--log-res-frmt',
default=environ.get(
'PYPISERVER_LOG_RES_FRMT',
_Defaults.log_res_fmt
),
metavar='FORMAT',
help=('a format-string selecting Http-Response properties to log; '
'set to "%s" to see them all')
)
http_logs.add_argument(
'--log-err-frmt',
default=environ.get(
'PYPISERVER_LOG_ERR_FRMT',
_Defaults.log_err_fmt
),
metavar='FORMAT',
help=('a format-string selecting Http-Error properties to log; '
'set to "%s" to see them all')
)
@staticmethod
def add_deprecated_update_arg_group(parser):
"""Add arguments for the deprecated update packages command.
:param ArgumentParser parser: an ArgumentParser instance
"""
update = parser.add_argument_group(
'update packages',
description='Update packages instead of running the pypiserver.'
)
update.add_argument(
'-U', '--update-packages',
action='store_true',
help=(
'Update packages in specified diretories. This command '
'searches pypi.org for updates and outputs a pip command '
'which can be used to update the packages.'
)
)
update.add_argument(
'-x', '--execute',
action='store_true',
help='execute the pip commands instead of only showing them'
)
update.add_argument(
'-u', '--unstable',
action='store_true',
help='allow updating to unstable versions (alpha, beta, rc, dev)'
)
update.add_argument(
'--download-directory',
help=('download updates to this directory. The default is to use '
'the directory containing the package to be updated')
)

8
pypiserver/const.py Normal file

@ -0,0 +1,8 @@
"""Constant values for pypiserver."""
from sys import version_info
PLUGIN_GROUPS = ('authenticators',)
PY2 = version_info < (3,)
STANDALONE_WELCOME = 'standalone'

@ -1,93 +1,151 @@
#! /usr/bin/env python
"""minimal PyPI like server for use with pip/easy_install"""
import functools
import hashlib
import io
import itertools
import logging
import mimetypes
import os
import re
import sys
import pkg_resources
from pkg_resources import iter_entry_points
from . import Configuration
from .const import PLUGIN_GROUPS, PY2, STANDALONE_WELCOME
from .plugins.authenticators.interface import convert_legacy
if PY2:
from io import open
log = logging.getLogger(__name__)
_archive_suffix_rx = re.compile(
r"(\.zip|\.tar\.gz|\.tgz|\.tar\.bz2|-py[23]\.\d-.*|"
"\.win-amd64-py[23]\.\d\..*|\.win32-py[23]\.\d\..*|\.egg)$",
re.I)
wheel_file_re = re.compile(
r"""^(?P<namever>(?P<name>.+?)-(?P<ver>\d.*?))
((-(?P<build>\d.*?))?-(?P<pyver>.+?)-(?P<abi>.+?)-(?P<plat>.+?)
\.whl|\.dist-info)$""",
re.VERBOSE)
_pkgname_re = re.compile(r'-\d+[a-z_.!+]', re.I)
_pkgname_parts_re = re.compile(
r"[\.\-](?=cp\d|py\d|macosx|linux|sunos|solaris|irix|aix|cygwin|win)",
re.I)
def configure(**kwds):
"""
:return: a 2-tuple (Configure, package-list)
"""
c = Configuration(**kwds)
log.info("+++Pypiserver invoked with: %s", c)
if c.root is None:
c. root = os.path.expanduser("~/packages")
roots = c.root if isinstance(c.root, (list, tuple)) else [c.root]
roots = [os.path.abspath(r) for r in roots]
for r in roots:
def _validate_roots(roots):
"""Validate roots.
:param List[str] roots: a list of package roots.
"""
for root in roots:
try:
os.listdir(r)
except OSError:
err = sys.exc_info()[1]
msg = "Error: while trying to list root(%s): %s"
sys.exit(msg % (r, err))
os.listdir(root)
except OSError as exc:
raise ValueError(
'Error while trying to list root({}): '
'{}'.format(root, repr(exc))
)
packages = lambda: itertools.chain(*[listdir(r) for r in roots])
packages.root = roots[0]
if not c.authenticated:
c.authenticated = []
if not callable(c.auther):
if c.password_file and c.password_file != '.':
from passlib.apache import HtpasswdFile
htPsswdFile = HtpasswdFile(c.password_file)
else:
c.password_file = htPsswdFile = None
c.auther = functools.partial(auth_by_htpasswd_file, htPsswdFile)
# Read welcome-msg from external file,
# or failback to the embedded-msg (ie. in standalone mode).
#
def _welcome_msg(welcome_file):
"""Parse the provided welcome file to get the welcome message."""
try:
if not c.welcome_file:
c.welcome_file = "welcome.html"
c.welcome_msg = pkg_resources.resource_string( # @UndefinedVariable
__name__, "welcome.html").decode("utf-8") # @UndefinedVariable
# pkg_resources.resource_filename() is not supported for zipfiles,
# so we rely on resource_string() instead.
if welcome_file == STANDALONE_WELCOME:
welcome_msg = pkg_resources.resource_string(
__name__, 'welcome.html'
).decode('utf-8')
else:
with io.open(c.welcome_file, 'r', encoding='utf-8') as fd:
c.welcome_msg = fd.read()
with open(welcome_file, 'r', encoding='utf-8') as fd:
welcome_msg = fd.read()
except Exception:
log.warning(
"Could not load welcome-file(%s)!", c.welcome_file, exc_info=1)
if c.fallback_url is None:
c.fallback_url = "https://pypi.org/simple"
if c.hash_algo:
try:
halgos = hashlib.algorithms_available
except AttributeError:
halgos = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512']
if c.hash_algo not in halgos:
sys.exit('Hash-algorithm %s not one of: %s' % (c.hash_algo, halgos))
log.info("+++Pypiserver started with: %s", c)
return c, packages
"Could not load welcome file(%s)!",
welcome_file,
exc_info=1
)
return welcome_msg
def auth_by_htpasswd_file(htPsswdFile, username, password):
def prep_config(config):
"""Check config arguments and update values when required.
:param argparse.Namespace config: a config namespace
:raises ValueError: if a config value is invalid
"""
_validate_roots(config.roots)
config.welcome_msg = _welcome_msg(config.welcome_file)
def configure(config):
"""Validate configuration and return with a package list.
:param argparse.Namespace config: a config namespace
:return: 2-tuple (Configure, package-list)
:rtype: tuple
"""
prep_config(config)
add_plugins_to_config(config)
def packages():
"""Return an iterable over package files in package roots."""
return itertools.chain(*[listdir(r) for r in config.roots])
packages.root = config.roots[0]
log.info("+++Pypiserver started with: %s", config)
return config, packages
def load_plugins(*groups):
"""Load pypiserver plugins.
:param groups: the plugin group(s) names (str) to load. Group names
must be one of ``const.PLUGIN_GROUPS``. If no groups are
provided, all groups will be loaded.
:return: a dict whose keys are plugin group names and whose values
are nested dicts whose keys are plugin names and whose values
are the loaded plugins.
:rtype: dict
"""
if groups and not all(g in PLUGIN_GROUPS for g in groups):
raise ValueError(
'Invalid group provided. Groups must '
'be one of: {}'.format(PLUGIN_GROUPS)
)
groups = groups if groups else PLUGIN_GROUPS
plugins = {}
for group in groups:
plugins.setdefault(group, {})
for plugin in iter_entry_points('pypiserver.{}'.format(group)):
plugins[group][plugin.name] = plugin.load()
return plugins
def add_plugins_to_config(config, plugins=None):
"""Load plugins if necessary and add to a config object.
:param argparse.Namespace config: a config namespace
:param dict plugins: an optional loaded plugin dict. If not
provided, plugins will be loaded.
"""
plugins = load_plugins() if plugins is None else plugins
config.plugins = plugins
def auth_by_htpasswd_file(ht_pwd_file, username, password):
"""The default ``config.auther``."""
if htPsswdFile is not None:
htPsswdFile.load_if_changed()
return htPsswdFile.check_password(username, password)
if ht_pwd_file is not None:
ht_pwd_file.load_if_changed()
return ht_pwd_file.check_password(username, password)
mimetypes.add_type("application/octet-stream", ".egg")
@ -96,9 +154,14 @@ mimetypes.add_type("text/plain", ".asc")
# ### Next 2 functions adapted from :mod:`distribute.pkg_resources`.
#
component_re = re.compile(r'(\d+ | [a-z]+ | \.| -)', re.I | re.VERBOSE)
replace = {'pre': 'c', 'preview': 'c', '-': 'final-', 'rc': 'c', 'dev': '@'}.get
replace = {
'pre': 'c',
'preview': 'c',
'-': 'final-',
'rc': 'c',
'dev': '@'
}.get
def _parse_version_parts(s):
@ -123,23 +186,6 @@ def parse_version(s):
parts.pop()
parts.append(part)
return tuple(parts)
#
#### -- End of distribute's code.
_archive_suffix_rx = re.compile(
r"(\.zip|\.tar\.gz|\.tgz|\.tar\.bz2|-py[23]\.\d-.*|"
"\.win-amd64-py[23]\.\d\..*|\.win32-py[23]\.\d\..*|\.egg)$",
re.I)
wheel_file_re = re.compile(
r"""^(?P<namever>(?P<name>.+?)-(?P<ver>\d.*?))
((-(?P<build>\d.*?))?-(?P<pyver>.+?)-(?P<abi>.+?)-(?P<plat>.+?)
\.whl|\.dist-info)$""",
re.VERBOSE)
_pkgname_re = re.compile(r'-\d+[a-z_.!+]', re.I)
_pkgname_parts_re = re.compile(
r"[\.\-](?=cp\d|py\d|macosx|linux|sunos|solaris|irix|aix|cygwin|win)",
re.I)
def _guess_pkgname_and_version_wheel(basename):
@ -189,6 +235,7 @@ def is_allowed_path(path_part):
class PkgFile(object):
"""Provide methods on a package file."""
__slots__ = ['fn', 'root', '_fname_and_hash',
'relfn', 'relfn_unix',
@ -198,7 +245,8 @@ class PkgFile(object):
'parsed_version',
'replaces']
def __init__(self, pkgname, version, fn=None, root=None, relfn=None, replaces=None):
def __init__(self, pkgname, version, fn=None, root=None,
relfn=None, replaces=None):
self.pkgname = pkgname
self.pkgname_norm = normalize_pkgname(pkgname)
self.version = version
@ -212,14 +260,21 @@ class PkgFile(object):
def __repr__(self):
return "%s(%s)" % (
self.__class__.__name__,
", ".join(["%s=%r" % (k, getattr(self, k))
for k in sorted(self.__slots__)]))
", ".join([
"%s=%r" % (k, getattr(self, k)) for k in sorted(self.__slots__)
])
)
def fname_and_hash(self, hash_algo):
if not hasattr(self, '_fname_and_hash'):
if hash_algo:
self._fname_and_hash = '%s#%s=%.32s' % (self.relfn_unix, hash_algo,
digest_file(self.fn, hash_algo))
self._fname_and_hash = (
'%s#%s=%.32s' % (
self.relfn_unix,
hash_algo,
digest_file(self.fn, hash_algo)
)
)
else:
self._fname_and_hash = self.relfn_unix
return self._fname_and_hash

@ -1,11 +1,14 @@
"""Management tools for pypiserver."""
import itertools
import sys
import os
from subprocess import call
from . import core
import itertools
from .const import PY2
if sys.version_info >= (3, 0):
if not PY2:
from xmlrpc.client import Server
def make_pypi_client(url):
@ -148,6 +151,7 @@ def update(pkgset, destdir=None, dry_run=False, stable_only=True):
if not dry_run:
call(cmd)
def update_all_packages(roots, destdir=None, dry_run=False, stable_only=True):
packages = frozenset(itertools.chain(*[core.listdir(r) for r in roots]))
update(packages, destdir, dry_run, stable_only)

77
pypiserver/paste.py Normal file

@ -0,0 +1,77 @@
"""Provide a paste-compatible entry point."""
import os
from ._app import app
from .config import str2bool, Config
def _str_strip(string):
"""Provide a generic strip method to pass as a callback."""
return string.strip()
def paste_app_factory(global_config, **local_conf):
"""Parse a paste config and return an app."""
def upd_conf_with_bool_item(conf, attr, sdict):
value = sdict.pop(attr, None)
if value is not None and attr != '':
# conf[attr] = str2bool(value)
setattr(conf, attr, str2bool(value))
def upd_conf_with_str_item(conf, attr, sdict):
value = sdict.pop(attr, None)
if value is not None:
setattr(conf, attr, value)
# conf[attr] = value
def upd_conf_with_int_item(conf, attr, sdict):
value = sdict.pop(attr, None)
if value is not None:
setattr(conf, attr, int(value))
# conf[attr] = int(value)
def upd_conf_with_list_item(conf, attr, sdict, sep=' ', parse=_str_strip):
values = sdict.pop(attr, None)
if values:
# conf[attr] = list(filter(None, map(parse, values.split(sep))))
setattr(
conf, attr, list(filter(None, map(parse, values.split(sep))))
)
def _make_root(root):
root = root.strip()
if root.startswith("~"):
return os.path.expanduser(root)
return root
c = Config(
parser_type='pypi-server'
).get_parser().parse_args([])
upd_conf_with_bool_item(c, 'overwrite', local_conf)
upd_conf_with_bool_item(c, 'redirect_to_fallback', local_conf)
upd_conf_with_list_item(c, 'authenticated', local_conf, sep=' ')
upd_conf_with_list_item(c, 'root', local_conf, sep='\n', parse=_make_root)
upd_conf_with_int_item(c, 'verbosity', local_conf)
str_items = [
'fallback_url',
'hash_algo',
'log_err_frmt',
'log_file',
'log_frmt',
'log_req_frmt',
'log_res_frmt',
'password_file',
'welcome_file'
]
for str_item in str_items:
upd_conf_with_str_item(c, str_item, local_conf)
if getattr(c, 'authenticated', None) is not None:
# Update for v 2.0
c.authenticate = c.authenticated
# cache_control is undocumented; don't know what type is expected:
# upd_conf_with_str_item(c, 'cache_control', local_conf)
return app(c)

@ -0,0 +1,44 @@
"""Interface for authentication backends for pypiserver."""
from abc import abstractmethod
from pypiserver.plugins.interface import PluginInterface
class AuthenticatorInterface(PluginInterface):
"""Defines the interface for pypiserver auth plugins."""
def __init__(self, config):
"""Instantiate the auth plugin with the current config.
:param argparse.Namespace config: the active config for the
running server
"""
@abstractmethod
def authenticate(self, request):
"""Authenticate the passed request and return a success bool.
:param bottle.Request request: the request being authenticated
:return: whether the request was successfully authenticated
:rtype: bool
"""
def convert_legacy(auther):
"""Convert a legacy auther to the new interface automatically.
:param callable auther: a callable expecting to receive
`*request.auth` and returning a bool
:return: a new class corresponding to the proper plugin interface
but calling the legacy auth callable
:rtype: AuthenticatorInterface
"""
class NewAuther(AuthenticatorInterface):
"""Pass the request auth instead of the request."""
def authenticate(self, request):
"""Pass request.auth to the legacy auther callable."""
return auther(*request.auth)
return NewAuther

@ -0,0 +1,14 @@
"""An authenticator that always authenticates successfully."""
from .interface import AuthenticatorInterface
class NoAuthAuthenticator(AuthenticatorInterface):
"""Authenticate successfully all the time."""
plugin_name = 'No-Auth Authenticator'
plugin_help = 'Authenticate successfully all the time'
def authenticate(self, request):
"""Authenticate the provided request."""
return True

@ -0,0 +1,50 @@
"""Common plugin interface for pypiserver."""
from pypiserver.const import PY2
if PY2:
# Create the equivalent of Python 3's ABC class
from abc import ABCMeta, abstractproperty
ABC = ABCMeta('ABC', (object,), {'__slots__': ()})
else:
from abc import ABC
from .util import py3_abstractproperty as abstractproperty
class PluginInterface(ABC):
"""Base plugin interface for pypiserver plugins."""
@abstractproperty
def plugin_name(self):
"""Return the plugin name.
Note that this can (and should) just be defined as a class
attribute, e.g.:
.. code:: python
class MyPlugin(PluginInterface):
plugin_name = "My Plugin"
"""
@abstractproperty
def plugin_help(self):
"""Return user-facing one-line summary of plugin.
:rtype: str
:return: a short description of the plugin's purpose
"""
@classmethod
def update_parser(cls, parser):
"""Add arguments to the pypiserver argument parser.
Generally, this will be a subcommand parser (usually for "run"),
but it could vary by plugin type.
:param argparse.ArgumentParser parser: the parser for the "run"
subcommand
"""

@ -0,0 +1,30 @@
"""Utilities for plugin definition."""
from abc import abstractmethod
class py2_abstractclassmethod(classmethod):
"""An implementation of @abstractclassmethod for Python 2."""
__isabstractmethod__ = True
def __init__(self, callable):
"""Mark a callable classmethod as an abstract method.
:param Callable callable: a callable to mark
"""
callable.__isabstractmethod__ = True
super(py2_abstractclassmethod, self).__init__(callable)
def py3_abstractproperty(callable):
"""Create the equivalent of a Python 2 @abstractproperty.
While @abstractproperty still exists in Python 3, it was deprecated
in Python 3.3, and will probably be removed at some point. This
decorator is equivalent.
:param Callable callable: the callable to mark as an abstract
property
"""
return property(abstractmethod(callable))

@ -6,14 +6,17 @@
-r exe.pip
docopt # For `/bin/bumpver.py`.
ipdb
gevent>=1.1b4; python_version >= '3'
mock; python_version == '2.7'
pip>=7
passlib>=1.6
pytest>=2.3
requests
setuptools
setuptools-git>=0.3
tox
twine>=1.7
virtualenv
webtest
wheel>=0.25.0
wheel>=0.25.0

131
setup.py

@ -1,13 +1,11 @@
#! /usr/bin/env python
#!/usr/bin/env python
"""Setup file for Pypiserver."""
from os import path
import sys
from setuptools import setup
from setuptools import find_packages, setup
if sys.version_info >= (3, 0):
exec("def do_exec(co, loc): exec(co, loc)\n")
else:
exec("def do_exec(co, loc): exec co in loc\n")
tests_require = ['pytest>=2.3', 'tox', 'twine', 'pip>=7',
'passlib>=1.6', 'webtest']
@ -22,59 +20,72 @@ else:
def get_version():
d = {}
try:
do_exec(open("pypiserver/__init__.py").read(), d) # @UndefinedVariable
except (ImportError, RuntimeError):
pass
return d["__version__"]
"""Execute just what is needed from _version.py to get the version."""
fake_globals = {}
v_file = path.abspath(
path.join(path.dirname(__file__), 'pypiserver/_version.py')
)
with open(v_file) as vf:
for ln in vf:
if ln.startswith('__version__'):
exec(ln, fake_globals)
return fake_globals["__version__"]
setup(name="pypiserver",
description="A minimal PyPI server for use with pip/easy_install.",
long_description=open("README.rst").read(),
version=get_version(),
packages=["pypiserver"],
package_data={'pypiserver': ['welcome.html']},
python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*",
setup_requires=setup_requires,
extras_require={
'passlib': ['passlib>=1.6'],
'cache': ['watchdog']
},
tests_require=tests_require,
url="https://github.com/pypiserver/pypiserver",
maintainer=("Kostis Anagnostopoulos <ankostis@gmail.com>"
"Matthew Planchard <mplanchard@gmail.com>"),
maintainer_email="ankostis@gmail.com",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Environment :: Web Environment",
"Intended Audience :: Developers",
"Intended Audience :: System Administrators",
"License :: OSI Approved :: BSD License",
"License :: OSI Approved :: zlib/libpng License",
"Operating System :: MacOS :: MacOS X",
"Operating System :: Microsoft :: Windows",
"Operating System :: POSIX",
"Operating System :: OS Independent",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Software Development :: Build Tools",
"Topic :: System :: Software Distribution"],
zip_safe=True,
entry_points={
'paste.app_factory': ['main=pypiserver:paste_app_factory'],
'console_scripts': ['pypi-server=pypiserver.__main__:main']
},
options={
'bdist_wheel': {'universal': True},
},
platforms=['any'],
)
setup(
name="pypiserver",
description="A minimal PyPI server for use with pip/easy_install.",
long_description=open("README.rst").read(),
version=get_version(),
packages=find_packages(exclude=('tests', 'tests.*')),
package_data={'pypiserver': ['welcome.html']},
python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*",
setup_requires=setup_requires,
extras_require={
'passlib': ['pypiserver-passlib'],
'cache': ['watchdog']
},
tests_require=tests_require,
url="https://github.com/pypiserver/pypiserver",
maintainer=("Kostis Anagnostopoulos <ankostis@gmail.com> "
"Matthew Planchard <mplanchard@gmail.com>"),
maintainer_email="ankostis@gmail.com",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Environment :: Web Environment",
"Intended Audience :: Developers",
"Intended Audience :: System Administrators",
"License :: OSI Approved :: BSD License",
"License :: OSI Approved :: zlib/libpng License",
"Operating System :: MacOS :: MacOS X",
"Operating System :: Microsoft :: Windows",
"Operating System :: POSIX",
"Operating System :: OS Independent",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
"Topic :: Software Development :: Build Tools",
"Topic :: System :: Software Distribution"
],
zip_safe=True,
entry_points={
'paste.app_factory': ['main=pypiserver.paste:paste_app_factory'],
'console_scripts': [
'pypi-server=pypiserver.__main__:main',
'pypiserver=pypiserver.__main__:main',
],
'pypiserver.authenticators': [
'no-auth = '
'pypiserver.plugins.authenticators.no_auth:NoAuthAuthenticator'
]
},
options={
'bdist_wheel': {'universal': True},
},
platforms=['any'],
)

10
tests/doubles.py Normal file

@ -0,0 +1,10 @@
"""Utilities for constructing test doubles."""
class GenericNamespace(object):
"""A generic namespace constructed from kwargs."""
def __init__(self, **kwargs):
"""Convert kwargs to attributes on the instantiated object."""
for key, value in kwargs.items():
setattr(self, key, value)

@ -19,7 +19,8 @@ import webtest
# Local Imports
from pypiserver import __main__, bottle
import pypiserver
from pypiserver import __main__, bottle, config
import tests.test_core as test_core
@ -29,15 +30,21 @@ __main__.init_logging(level=logging.NOTSET)
hp = HTMLParser()
@pytest.fixture()
def _app(app):
return app.module
@pytest.fixture
def app(tmpdir):
from pypiserver import app
return app(root=tmpdir.strpath, authenticated=[])
conf = config.Config(
parser_type='pypi-server'
).get_parser().parse_args(
['-a', '.', tmpdir.strpath]
)
return pypiserver.app(conf)
def app_from_args(args):
conf = config.Config(
parser_type='pypi-server'
).get_parser().parse_args(args)
return pypiserver.app(conf)
@pytest.fixture
@ -123,18 +130,24 @@ def test_root_hostname(testapp):
def test_root_welcome_msg_no_vars(root, welcome_file_no_vars):
from pypiserver import app
app = app(root=root.strpath, welcome_file=welcome_file_no_vars.strpath)
testapp = webtest.TestApp(app)
# from pypiserver import app
# app = app(root=root.strpath, welcome_file=welcome_file_no_vars.strpath)
app_ = app_from_args(
['--welcome', welcome_file_no_vars.strpath, root.strpath]
)
testapp = webtest.TestApp(app_)
resp = testapp.get("/")
from pypiserver import __version__ as pver
resp.mustcontain(welcome_file_no_vars.read(), no=pver)
def test_root_welcome_msg_all_vars(root, welcome_file_all_vars):
from pypiserver import app
app = app(root=root.strpath, welcome_file=welcome_file_all_vars.strpath)
testapp = webtest.TestApp(app)
# from pypiserver import app
# app = app(root=root.strpath, welcome_file=welcome_file_all_vars.strpath)
app_ = app_from_args(
['--welcome', welcome_file_all_vars.strpath, root.strpath]
)
testapp = webtest.TestApp(app_)
resp = testapp.get("/")
from pypiserver import __version__ as pver
@ -174,14 +187,14 @@ def test_favicon(testapp):
testapp.get("/favicon.ico", status=404)
def test_fallback(root, _app, testapp):
assert _app.config.redirect_to_fallback
def test_fallback(root, app, testapp):
assert app.config.redirect_to_fallback
resp = testapp.get("/simple/pypiserver/", status=302)
assert resp.headers["Location"] == "https://pypi.org/simple/pypiserver/"
def test_no_fallback(root, _app, testapp):
_app.config.redirect_to_fallback = False
def test_no_fallback(root, app, testapp):
app.config.redirect_to_fallback = False
testapp.get("/simple/pypiserver/", status=404)
@ -351,17 +364,18 @@ def test_simple_index_list_name_with_underscore_no_egg(root, testapp):
assert hrefs == {"foo-bar/"}
def test_no_cache_control_set(root, _app, testapp):
assert not _app.config.cache_control
def test_no_cache_control_set(root, app, testapp):
assert not app.config.cache_control
root.join("foo_bar-1.0.tar.gz").write("")
resp = testapp.get("/packages/foo_bar-1.0.tar.gz")
assert "Cache-Control" not in resp.headers
def test_cache_control_set(root):
from pypiserver import app
AGE = 86400
app_with_cache = webtest.TestApp(app(root=root.strpath, cache_control=AGE))
app_with_cache = webtest.TestApp(
app_from_args(['--cache-control', str(AGE), root.strpath])
)
root.join("foo_bar-1.0.tar.gz").write("")
resp = app_with_cache.get("/packages/foo_bar-1.0.tar.gz")
assert "Cache-Control" in resp.headers
@ -384,8 +398,11 @@ def test_upload_badAction(root, testapp):
for f in test_core.files
if f[1] and '/' not in f[0]])
def test_upload(package, root, testapp):
resp = testapp.post("/", params={':action': 'file_upload'},
upload_files=[('content', package, b'')])
resp = testapp.post(
"/",
params={':action': 'file_upload'},
upload_files=[('content', package, b'')]
)
assert resp.status_int == 200
uploaded_pkgs = [f.basename for f in root.listdir()]
assert len(uploaded_pkgs) == 1
@ -396,10 +413,14 @@ def test_upload(package, root, testapp):
for f in test_core.files
if f[1] and '/' not in f[0]])
def test_upload_with_signature(package, root, testapp):
resp = testapp.post("/", params={':action': 'file_upload'},
upload_files=[
('content', package, b''),
('gpg_signature', '%s.asc' % package, b'')])
resp = testapp.post(
"/",
params={':action': 'file_upload'},
upload_files=[
('content', package, b''),
('gpg_signature', '%s.asc' % package, b'')
],
)
assert resp.status_int == 200
uploaded_pkgs = [f.basename.lower() for f in root.listdir()]
assert len(uploaded_pkgs) == 2
@ -411,9 +432,12 @@ def test_upload_with_signature(package, root, testapp):
f[0] for f in test_core.files
if f[1] is None])
def test_upload_badFilename(package, root, testapp):
resp = testapp.post("/", params={':action': 'file_upload'},
upload_files=[('content', package, b'')],
expect_errors=1)
resp = testapp.post(
"/",
params={':action': 'file_upload'},
upload_files=[('content', package, b'')],
expect_errors=1
)
assert resp.status == '400 Bad Request'
assert "Bad filename: %s" % package in resp.text
@ -430,20 +454,23 @@ def test_upload_badFilename(package, root, testapp):
def test_remove_pkg_missingNaveVersion(name, version, root, testapp):
msg = "Missing 'name'/'version' fields: name=%s, version=%s"
params = {':action': 'remove_pkg', 'name': name, 'version': version}
params = dict((k, v) for k,v in params.items() if v is not None)
params = dict((k, v) for k, v in params.items() if v is not None)
resp = testapp.post("/", expect_errors=1, params=params)
assert resp.status == '400 Bad Request'
assert msg %(name, version) in hp.unescape(resp.text)
assert msg % (name, version) in hp.unescape(resp.text)
def test_remove_pkg_notFound(root, testapp):
resp = testapp.post("/", expect_errors=1,
params={
':action': 'remove_pkg',
'name': 'foo',
'version': '123',
})
resp = testapp.post(
"/",
expect_errors=1,
params={
':action': 'remove_pkg',
'name': 'foo',
'version': '123',
}
)
assert resp.status == '404 Not Found'
assert "foo (123) not found" in hp.unescape(resp.text)

720
tests/test_config.py Normal file

@ -0,0 +1,720 @@
"""Test the ArgumentParser and associated functions."""
import argparse
import logging
from os import getcwd
from os.path import exists, expanduser
try:
from unittest.mock import Mock
except ImportError: # py2
from mock import Mock
import pytest
from pkg_resources import resource_filename
from pypiserver import config
from pypiserver import const
from .doubles import GenericNamespace
class StubAction(object):
"""Quick stub for argparse actions."""
def __init__(self, option_strings):
"""Set stub attributes."""
self.help = 'help'
self.default = 'default'
self.nargs = '*'
self.option_strings = option_strings
@pytest.mark.parametrize('options, expected_help', (
(['--foo'], 'help (default: %(default)s)'),
(['cmd'], 'help (default: %(default)s)'),
(['--disable-fallback'], 'help (default: False)'),
))
def test_argument_formatter(options, expected_help):
"""Test the custom formatter class.
In general, it should always just return help (default: %(default)s)
except when the option_strings contain --disable-fallback, our special
case.
"""
action = StubAction(options)
assert config._HelpFormatter('prog')._get_help_string(action) == (
expected_help
)
class TestCustomParsers(object):
@pytest.mark.parametrize('arg, exp', (
('update download list', ['update', 'download', 'list']),
('update, download, list', ['update', 'download', 'list']),
('update', ['update']),
('update,', ['update']),
('update, ', ['update']),
('update , ', ['update']),
('update ', ['update']),
('update , download', ['update', 'download']),
('.', []),
))
def test_auth_parse_success(self, arg, exp):
"""Test parsing auth strings from the commandline."""
assert config._CustomParsers.auth(arg) == exp
def test_auth_parse_disallowed_item(self):
"""Test that including a non-whitelisted action throws."""
with pytest.raises(ValueError):
config._CustomParsers.auth('download update foo')
def test_roots_parse_abspath(self):
"""Test the parsing of root directories returns absolute paths."""
assert config._CustomParsers.roots(
['./foo']
) == ['{}/foo'.format(getcwd())]
def test_roots_parse_home(self):
"""Test that parsing of root directories expands the user home."""
assert config._CustomParsers.roots(
['~/foo']
) == ([expanduser('~/foo')])
def test_roots_parse_both(self):
"""Test that root directories are both expanded and absolute-ed."""
assert config._CustomParsers.roots(
['~/foo/..']
) == [expanduser('~')]
@pytest.mark.parametrize('verbosity, exp', (
(0, logging.WARNING),
(1, logging.INFO),
(2, logging.DEBUG),
(3, logging.NOTSET),
(5, logging.NOTSET),
(100000, logging.NOTSET),
(-1, logging.NOTSET),
))
def test_verbosity_parse(self, verbosity, exp):
"""Test converting a number of -v's into a log level."""
assert config._CustomParsers.verbosity(verbosity) == exp
class TestDeprecatedParser(object):
"""Tests for the deprecated parser."""
@pytest.fixture()
def parser(self):
"""Return a deprecated parser."""
return config.Config(parser_type='pypi-server').get_parser()
def test_version_exits(self, parser):
"""Test that asking for the version exits the program."""
with pytest.raises(SystemExit):
parser.parse_args(['--version'])
@pytest.mark.parametrize('args, exp', (
([], logging.WARNING),
(['-v'], logging.INFO),
(['-vv'], logging.DEBUG),
(['-vvv'], logging.NOTSET),
(['-vvvvvvv'], logging.NOTSET),
(['-v', '--verbose'], logging.DEBUG),
(['--verbose', '--verbose'], logging.DEBUG),
(['-v', '-v'], logging.DEBUG),
))
def test_specifying_verbosity(self, parser, args, exp):
"""Test that verbosity is set correctly for -v arguments."""
assert parser.parse_args(args).verbosity == exp
@pytest.mark.parametrize('args, exp', (
([], list(map(expanduser, config._Defaults.roots))), # type: ignore
(['/foo'], ['/foo']),
(['/foo', '~/bar'], ['/foo', expanduser('~/bar')]),
))
def test_roots(self, parser, args, exp):
"""Test specifying package roots."""
assert parser.parse_args(args).roots == exp
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.host),
(['-i', '1.1.1.1'], '1.1.1.1'),
(['--interface', '1.1.1.1'], '1.1.1.1'),
))
def test_interface(self, parser, args, exp):
"""Test specifying a server interface."""
assert parser.parse_args(args).host == exp
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.port),
(['-p', '999'], 999),
(['--port', '1234'], 1234),
))
def test_port(self, parser, args, exp):
"""Test specifying a server port."""
assert parser.parse_args(args).port == exp
@pytest.mark.parametrize('args, exp', (
([], False),
(['-o'], True),
(['--overwrite'], True),
))
def test_overwrite(self, parser, args, exp):
"""Test the overwrite flag."""
assert parser.parse_args(args).overwrite == exp
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.fallback_url),
(['--fallback-url', 'http://www.google.com'], 'http://www.google.com'),
))
def test_fallback_url(self, parser, args, exp):
"""Test specifying a fallback URL."""
assert parser.parse_args(args).fallback_url == exp
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.redirect_to_fallback),
(['--disable-fallback'], not config._Defaults.redirect_to_fallback),
))
def test_disable_fallback(self, parser, args, exp):
"""Test disabling the fallback."""
assert parser.parse_args(args).redirect_to_fallback is exp
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.server),
(['--server', 'paste'], 'paste'),
))
def test_specify_server(self, parser, args, exp):
"""Test specifying a server."""
assert parser.parse_args(args).server == exp
def test_specify_server_bad_arg(self, parser):
"""Test specifying an unsupported server."""
with pytest.raises(SystemExit):
parser.parse_args(['--server', 'foobar'])
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.hash_algo),
(['--hash-algo', 'sha256'], 'sha256'),
(['--hash-algo', 'no'], None),
(['--hash-algo', 'false'], None),
(['--hash-algo', '0'], None),
(['--hash-algo', 'off'], None),
))
def test_specify_hash_algo(self, parser, args, exp):
"""Test specifying a hash algorithm."""
assert parser.parse_args(args).hash_algo == exp
def test_bad_hash_algo(self, parser):
"""Test an unavailable hash algorithm."""
with pytest.raises(SystemExit):
parser.parse_args(['--hash-algo', 'foobar-loo'])
@pytest.mark.parametrize('args, exp', (
([], None),
(['--welcome', 'foo'], 'foo'),
))
def test_specify_welcome_html(self, parser, args, exp):
"""Test specifying a welcome file."""
welcome = parser.parse_args(args).welcome_file
if exp is None:
# Ensure the pkg_resources file path is returned correctly
assert welcome.endswith('welcome.html')
assert exists(welcome)
else:
assert parser.parse_args(args).welcome_file == exp
def test_standalone_welcome(self, monkeypatch):
"""Test that the error raised in the standalone package is handled."""
monkeypatch.setattr(
config.pkg_resources,
'resource_filename',
Mock(side_effect=NotImplementedError)
)
assert config.Config(
parser_type='pypi-server'
).get_parser().parse_args([]).welcome_file == const.STANDALONE_WELCOME
@pytest.mark.parametrize('args, exp', (
([], None),
(['--cache-control', '12'], 12),
))
def test_specify_cache_control(self, parser, args, exp):
"""Test specifying cache retention time."""
assert parser.parse_args(args).cache_control == exp
@pytest.mark.parametrize('args, exp', (
([], ['update']),
(['-a', '.'], []),
(['--authenticate', '.'], []),
(['-a', 'update download'], ['update', 'download']),
(['-a', 'update, download'], ['update', 'download']),
(['-a', 'update,download'], ['update', 'download']),
))
def test_specify_auth(self, parser, args, exp):
"""Test specifying cache retention time."""
assert parser.parse_args(args).authenticate == exp
@pytest.mark.parametrize('args, exp', (
([], None),
(['-P', 'foo'], 'foo'),
(['--passwords', 'foo'], 'foo'),
))
def test_specify_password_file(self, parser, args, exp):
"""Test specifying cache retention time."""
assert parser.parse_args(args).password_file == exp
@pytest.mark.parametrize('attr, args, exp', (
('log_file', [], None),
('log_file', ['--log-file', 'foo'], 'foo'),
('log_frmt', [], config._Defaults.log_fmt),
('log_frmt', ['--log-frmt', 'foo'], 'foo'),
))
def test_log_args(self, parser, attr, args, exp):
"""Test various log args."""
assert getattr(parser.parse_args(args), attr) == exp
@pytest.mark.parametrize('attr, args, exp', (
('log_req_frmt', [], config._Defaults.log_req_fmt),
('log_req_frmt', ['--log-req-frmt', 'foo'], 'foo'),
('log_res_frmt', [], config._Defaults.log_res_fmt),
('log_res_frmt', ['--log-res-frmt', 'foo'], 'foo'),
('log_err_frmt', [], config._Defaults.log_err_fmt),
('log_err_frmt', ['--log-err-frmt', 'foo'], 'foo'),
))
def test_http_log_args(self, parser, attr, args, exp):
"""Test HTTP log args."""
assert getattr(parser.parse_args(args), attr) == exp
@pytest.mark.parametrize('args, exp', (
([], False),
(['-U'], True),
(['--update-packages'], True),
))
def test_update_flag(self, parser, args, exp):
"""Test specifying the update flag."""
assert parser.parse_args(args).update_packages is exp
@pytest.mark.parametrize('args, exp', (
([], False),
(['-x'], True),
(['--execute'], True),
))
def test_execute_flag(self, parser, args, exp):
"""Test specifying the execute flag."""
assert parser.parse_args(args).execute is exp
@pytest.mark.parametrize('args, exp', (
([], False),
(['-u'], True),
(['--unstable'], True),
))
def test_unstable_flag(self, parser, args, exp):
"""Test specifying the execute flag."""
assert parser.parse_args(args).unstable is exp
@pytest.mark.parametrize('args, exp', (
([], None),
(['--download-directory', 'foo'], 'foo'),
))
def test_download_directory(self, parser, args, exp):
"""Test specifying the execute flag."""
assert parser.parse_args(args).download_directory is exp
class TestParser(object):
"""Tests for the parser."""
@pytest.fixture()
def parser(self):
"""Return a deprecated parser."""
return config.Config().get_parser()
# **********************************************************************
# Root Command
# **********************************************************************
def test_version_exits(self, parser):
"""Test that asking for the version exits the program."""
with pytest.raises(SystemExit):
parser.parse_args(['--version'])
@pytest.mark.parametrize('args, exp', (
([], logging.WARNING),
(['-v'], logging.INFO),
(['-vv'], logging.DEBUG),
(['-vvv'], logging.NOTSET),
(['-vvvvvvv'], logging.NOTSET),
(['-v', '--verbose'], logging.DEBUG),
(['--verbose', '--verbose'], logging.DEBUG),
(['-v', '-v'], logging.DEBUG),
))
def test_specifying_verbosity_run(self, parser, args, exp):
"""Test that verbosity is set correctly for -v arguments."""
args.append('run')
assert parser.parse_args(args).verbosity == exp
@pytest.mark.parametrize('args, exp', (
([], logging.WARNING),
(['-v'], logging.INFO),
(['-vv'], logging.DEBUG),
(['-vvv'], logging.NOTSET),
(['-vvvvvvv'], logging.NOTSET),
(['-v', '--verbose'], logging.DEBUG),
(['--verbose', '--verbose'], logging.DEBUG),
(['-v', '-v'], logging.DEBUG),
))
def test_specifying_verbosity_update(self, parser, args, exp):
"""Test that verbosity is set correctly for -v arguments."""
args.append('update')
assert parser.parse_args(args).verbosity == exp
@pytest.mark.parametrize('attr, args, exp', (
('log_file', [], None),
('log_file', ['--log-file', 'foo'], 'foo'),
('log_frmt', [], config._Defaults.log_fmt),
('log_frmt', ['--log-frmt', 'foo'], 'foo'),
))
def test_log_args_run(self, parser, attr, args, exp):
"""Test various log args."""
args.append('run')
assert getattr(parser.parse_args(args), attr) == exp
@pytest.mark.parametrize('attr, args, exp', (
('log_file', [], None),
('log_file', ['--log-file', 'foo'], 'foo'),
('log_frmt', [], config._Defaults.log_fmt),
('log_frmt', ['--log-frmt', 'foo'], 'foo'),
))
def test_log_args_update(self, parser, attr, args, exp):
"""Test various log args."""
args.append('update')
assert getattr(parser.parse_args(args), attr) == exp
# **********************************************************************
# Run Subcommand
# **********************************************************************
def test_raw_run_command(self, parser):
"""Ensure the command name is stored."""
assert parser.parse_args(['run']).command == 'run'
@pytest.mark.parametrize('args, exp', (
([], list(map(expanduser, config._Defaults.roots))), # type: ignore
(['/foo'], ['/foo']),
(['/foo', '~/bar'], ['/foo', expanduser('~/bar')]),
))
def test_roots(self, parser, args, exp):
"""Test specifying package roots."""
args.insert(0, 'run')
assert parser.parse_args(args).roots == exp
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.host),
(['-i', '1.1.1.1'], '1.1.1.1'),
(['--interface', '1.1.1.1'], '1.1.1.1'),
))
def test_interface(self, parser, args, exp):
"""Test specifying a server interface."""
args.insert(0, 'run')
assert parser.parse_args(args).host == exp
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.port),
(['-p', '999'], 999),
(['--port', '1234'], 1234),
))
def test_port(self, parser, args, exp):
"""Test specifying a server port."""
args.insert(0, 'run')
assert parser.parse_args(args).port == exp
@pytest.mark.parametrize('args, exp', (
([], False),
(['-o'], True),
(['--overwrite'], True),
))
def test_overwrite(self, parser, args, exp):
"""Test the overwrite flag."""
args.insert(0, 'run')
assert parser.parse_args(args).overwrite == exp
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.fallback_url),
(['--fallback-url', 'http://www.google.com'], 'http://www.google.com'),
))
def test_fallback_url(self, parser, args, exp):
"""Test specifying a fallback URL."""
args.insert(0, 'run')
assert parser.parse_args(args).fallback_url == exp
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.redirect_to_fallback),
(['--disable-fallback'], not config._Defaults.redirect_to_fallback),
))
def test_disable_fallback(self, parser, args, exp):
"""Test disabling the fallback."""
args.insert(0, 'run')
assert parser.parse_args(args).redirect_to_fallback is exp
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.server),
(['--server', 'paste'], 'paste'),
))
def test_specify_server(self, parser, args, exp):
"""Test specifying a server."""
args.insert(0, 'run')
assert parser.parse_args(args).server == exp
def test_specify_server_bad_arg(self, parser):
"""Test specifying an unsupported server."""
with pytest.raises(SystemExit):
parser.parse_args(['run', '--server', 'foobar'])
@pytest.mark.parametrize('args, exp', (
([], config._Defaults.hash_algo),
(['--hash-algo', 'sha256'], 'sha256'),
(['--hash-algo', 'no'], None),
(['--hash-algo', 'false'], None),
(['--hash-algo', '0'], None),
(['--hash-algo', 'off'], None),
))
def test_specify_hash_algo(self, parser, args, exp):
"""Test specifying a hash algorithm."""
args.insert(0, 'run')
assert parser.parse_args(args).hash_algo == exp
def test_bad_hash_algo(self, parser):
"""Test an unavailable hash algorithm."""
with pytest.raises(SystemExit):
parser.parse_args(['run', '--hash-algo', 'foobar-loo'])
@pytest.mark.parametrize('args, exp', (
([], None),
(['--welcome', 'foo'], 'foo'),
))
def test_specify_welcome_html(self, parser, args, exp):
"""Test specifying a welcome file."""
args.insert(0, 'run')
welcome = parser.parse_args(args).welcome_file
if exp is None:
# Ensure the pkg_resources file path is returned correctly
assert welcome.endswith('welcome.html')
assert exists(welcome)
else:
assert parser.parse_args(args).welcome_file == exp
def test_standalone_welcome(self, monkeypatch):
"""Test that the error raised in the standalone package is handled."""
monkeypatch.setattr(
config.pkg_resources,
'resource_filename',
Mock(side_effect=NotImplementedError)
)
assert config.Config().get_parser().parse_args(
['run']
).welcome_file == const.STANDALONE_WELCOME
@pytest.mark.parametrize('args, exp', (
([], None),
(['--cache-control', '12'], 12),
))
def test_specify_cache_control(self, parser, args, exp):
"""Test specifying cache retention time."""
args.insert(0, 'run')
assert parser.parse_args(args).cache_control == exp
@pytest.mark.parametrize('args, exp', (
([], ['update']),
(['-a', '.'], []),
(['--authenticate', '.'], []),
(['-a', 'update download'], ['update', 'download']),
(['-a', 'update, download'], ['update', 'download']),
(['-a', 'update,download'], ['update', 'download']),
))
def test_specify_auth(self, monkeypatch, parser, args, exp):
"""Test specifying authed actions.
We need to patch the plugins list to ensure it includes passlib,
(really just something other than "no-auth"), otherwise the
``-a`` option is not provided.
"""
args.insert(0, 'run')
conf = config.Config()
passlib = GenericNamespace(
plugin_name='passlib',
plugin_help='foo',
update_parser=lambda x: None,
)
no_auth = GenericNamespace(
plugin_name='noauth',
plugin_help='foo',
update_parser=lambda x: None
)
monkeypatch.setattr(
conf,
'_plugins',
{'authenticators': {'passlib': passlib, 'no-auth': no_auth}}
)
assert conf.get_parser().parse_args(args).authenticate == exp
@pytest.mark.parametrize('args, exp', (
([], 'passlib'),
(['--auth-backend', 'no-auth'], 'no-auth')
))
def test_auth_backend(self, parser, args, exp):
"""Test specifying an auth backend."""
args.insert(0, 'run')
assert parser.parse_args(args).auth_backend == exp
def test_auth_backend_no_passlib(self, monkeypatch):
"""Ensure that we fallback to no-auth.
If passlib is not available and the ``passlib`` plugin
cannot be loaded, we should fall back to the ``no-auth``
dummy authenticator.
"""
conf = config.Config()
monkeypatch.setitem(
conf._plugins,
'authenticators',
{
'no-auth': GenericNamespace(
plugin_name='a',
plugin_help='a',
update_parser=lambda *x, **y: None,
)
}
)
parser = conf.get_parser()
assert parser.parse_args(['run']).auth_backend == 'no-auth'
def test_auth_backend_bad_value(self, parser):
"""Test that only loaded plugins may be specified."""
with pytest.raises(SystemExit):
parser.parse_args(['--auth-backend', 'foobar'])
@pytest.mark.parametrize('args, exp', (
([], None),
(['-P', 'foo'], 'foo'),
(['--password-file', 'foo'], 'foo'),
))
def test_specify_password_file(self, parser, args, exp):
"""Test specifying a password file."""
args.insert(0, 'run')
assert parser.parse_args(args).password_file == exp
@pytest.mark.parametrize('attr, args, exp', (
('log_req_frmt', [], config._Defaults.log_req_fmt),
('log_req_frmt', ['--log-req-frmt', 'foo'], 'foo'),
('log_res_frmt', [], config._Defaults.log_res_fmt),
('log_res_frmt', ['--log-res-frmt', 'foo'], 'foo'),
('log_err_frmt', [], config._Defaults.log_err_fmt),
('log_err_frmt', ['--log-err-frmt', 'foo'], 'foo'),
))
def test_http_log_args(self, parser, attr, args, exp):
"""Test HTTP log args."""
args.insert(0, 'run')
assert getattr(parser.parse_args(args), attr) == exp
# **********************************************************************
# Update Subcommand
# **********************************************************************
def test_raw_update(self, parser):
"""Test that the update subcommand is stored properly."""
assert parser.parse_args(['update']).command == 'update'
@pytest.mark.parametrize('args, exp', (
([], list(map(expanduser, config._Defaults.roots))), # type: ignore
(['/foo'], ['/foo']),
(['/foo', '~/bar'], ['/foo', expanduser('~/bar')]),
))
def test_update_roots(self, parser, args, exp):
"""Test specifying package roots."""
args.insert(0, 'update')
assert parser.parse_args(args).roots == exp
@pytest.mark.parametrize('args, exp', (
([], False),
(['-x'], True),
(['--execute'], True),
))
def test_execute_flag(self, parser, args, exp):
"""Test specifying the execute flag."""
args.insert(0, 'update')
assert parser.parse_args(args).execute is exp
@pytest.mark.parametrize('args, exp', (
([], False),
(['--pre'], True),
))
def test_prerelease_flag(self, parser, args, exp):
"""Test specifying the execute flag."""
args.insert(0, 'update')
assert parser.parse_args(args).pre is exp
@pytest.mark.parametrize('args, exp', (
([], None),
(['--download-directory', 'foo'], 'foo'),
))
def test_download_directory(self, parser, args, exp):
"""Test specifying the execute flag."""
args.insert(0, 'update')
assert parser.parse_args(args).download_directory is exp
class TestReadyMades(object):
"""Test generating ready-made configs."""
def test_get_default(self):
"""Test getting the default config."""
conf = config.Config().get_default()
assert any(d in conf for d in vars(config._Defaults))
for default, value in vars(config._Defaults).items():
if default in conf:
if default == 'roots':
assert getattr(conf, default) == (
[expanduser(v) for v in value]
)
elif default == 'authenticate':
assert getattr(conf, default) == (
[a for a in value.split()]
)
elif default == 'welcome_file':
assert getattr(conf, default) == (
resource_filename('pypiserver', value)
)
else:
assert getattr(conf, default) == value
def test_get_default_specify_subcommand(self):
"""Test getting default args for a non-default subcommand."""
conf = config.Config().get_default(subcommand='update')
exp_defaults = (
('execute', False),
('pre', False),
('download_directory', None)
)
for default, value in exp_defaults:
assert getattr(conf, default) is value
def test_get_parsed(self, monkeypatch):
"""Test getting a Namespace from commandline args."""
monkeypatch.setattr(
argparse._sys,
'argv',
['pypiserver', 'run', '--interface', '1.2.3.4']
)
conf = config.Config().get_parsed()
assert conf.host == '1.2.3.4'
def test_from_kwargs(self):
"""Test getting a default config updated with provided kwargs."""
conf = config.Config().from_kwargs(
port=9999,
foo='foo',
)
assert conf.port == 9999
assert conf.foo == 'foo'

@ -1,6 +1,7 @@
#! /usr/bin/env py.test
# -*- coding: utf-8 -*-
# TODO: write more tests for core!
import logging
import os
@ -8,9 +9,10 @@ import pytest
from pypiserver import __main__, core
from .doubles import GenericNamespace
## Enable logging to detect any problems with it
##
# Enable logging to detect any problems with it
__main__.init_logging(level=logging.NOTSET)
@ -57,18 +59,21 @@ files = [
("package-name-0.0.1.alpha.1.win-amd64-py3.2.exe", "package-name", "0.0.1.alpha.1"),
]
def _capitalize_ext(fpath):
f, e = os.path.splitext(fpath)
if e != '.whl':
e = e.upper()
return f + e
@pytest.mark.parametrize(("filename", "pkgname", "version"), files)
def test_guess_pkgname_and_version(filename, pkgname, version):
exp = (pkgname, version)
assert core.guess_pkgname_and_version(filename) == exp
assert core.guess_pkgname_and_version(_capitalize_ext(filename)) == exp
@pytest.mark.parametrize(("filename", "pkgname", "version"), files)
def test_guess_pkgname_and_version_asc(filename, pkgname, version):
exp = (pkgname, version)
@ -81,12 +86,65 @@ def test_listdir_bad_name(tmpdir):
res = list(core.listdir(tmpdir.strpath))
assert res == []
hashes = [
('sha256', 'e3b0c44298fc1c149afbf4c8996fb924'), # empty-sha256
('md5', 'd41d8cd98f00b204e9800998ecf8427e'), # empty-md5
('sha256', 'e3b0c44298fc1c149afbf4c8996fb924'), # empty-sha256
('md5', 'd41d8cd98f00b204e9800998ecf8427e'), # empty-md5
]
@pytest.mark.parametrize(("algo", "digest"), hashes)
def test_hashfile(tmpdir, algo, digest):
f = tmpdir.join("empty")
f.ensure()
assert core.digest_file(f.strpath, algo) == digest
def test_load_plugins():
"""Test loading plugins.
We should at least be able to get the ones included with the full
passlib install.
"""
plugins = core.load_plugins()
assert 'passlib' in plugins['authenticators']
def test_load_plugin_group():
"""Test loading a single plugin group.
This test is not quite definitive at the time of authorship since
there's only one plugin (therefore the output will be the same as
for ``load_plugins()`` with no arguments). However, as soon as
a second plugin type is added, it'll become more meaningful.
"""
auth_plugins = core.load_plugins('authenticators')
assert 'passlib' in auth_plugins['authenticators']
def test_load_plugin_bad_group():
"""Test that trying to load a bad group raises an error."""
with pytest.raises(ValueError):
# hopefully this is never a legit plugin type
core.load_plugins('fhgwgad')
def test_load_plugins_bad_and_good_group():
"""Test that the bad group is detected even among a good one."""
with pytest.raises(ValueError):
core.load_plugins('authenticators', 'wheelchair_assassins')
def test_add_plugins_to_config_load(monkeypatch):
"""Test that load_plugins() is called for no provided plugins."""
monkeypatch.setattr(core, 'load_plugins', lambda *x: 'plugin_stub')
config = GenericNamespace()
core.add_plugins_to_config(config)
assert config.plugins == 'plugin_stub' # pylint: disable=no-member
def test_add_plugins_to_config_no_load():
"""Test adding passed plugins to a config."""
config = GenericNamespace()
core.add_plugins_to_config(config, plugins='plugins!')
assert config.plugins == 'plugins!' # pylint: disable=no-member

@ -4,11 +4,18 @@ import pytest
import re
from pypiserver import version as my_ver
@pytest.fixture()
def readme():
return open('README.rst', 'rt').read()
def test_READMEversion(readme):
m = re.compile(r'^\s*:Version:\s*(.+)\s*$', re.MULTILINE).search(readme)
m = re.compile(
r'^\s*:Version:\s*(.+)\s*$',
re.MULTILINE
).search(readme)
assert m, "Could not find version on README!"
assert m.group(1) == my_ver, 'Updaed version(%s) on README!' % m.group(1)
assert m.group(1) == my_ver, (
'Incorrect version (%s) on README!' % m.group(1)
)

@ -1,43 +1,16 @@
"""
Test module for . . .
"""
# Standard library imports
"""Test module for pypiserver.__init__"""
from __future__ import (absolute_import, division,
print_function, unicode_literals)
import logging
from os.path import abspath, dirname, join, realpath
from sys import path
# Third party imports
import pytest
# Local imports
logger = logging.getLogger(__name__)
test_dir = realpath(dirname(__file__))
src_dir = abspath(join(test_dir, '..'))
path.append(src_dir)
print(path)
import pypiserver
logger = logging.getLogger(__name__)
@pytest.mark.parametrize('conf_options', [
{},
{'root': '~/stable_packages'},
{'root': '~/unstable_packages', 'authenticated': 'upload',
'passwords': '~/htpasswd'},
# Verify that the strip parser works properly.
{'authenticated': str('upload')},
])
def test_paste_app_factory(conf_options, monkeypatch):
"""Test the paste_app_factory method"""
monkeypatch.setattr('pypiserver.core.configure',
lambda **x: (x, [x.keys()]))
pypiserver.paste_app_factory({}, **conf_options)
def test_app_factory(monkeypatch):
monkeypatch.setattr('pypiserver.core.configure',
lambda **x: (x, [x.keys()]))
assert pypiserver.app() is not pypiserver.app()
def test_app_factory(monkeypatch, tmpdir):
"""Test creating an app."""
conf = pypiserver.config.Config(
parser_type='pypi-server'
).get_parser().parse_args([str(tmpdir)])
assert pypiserver.app(conf) is not pypiserver.app(conf)

@ -1,12 +1,20 @@
#! /usr/bin/env py.test
"""Test the __main__ module."""
import argparse
import logging
import os
import sys
import warnings
import sys, os, pytest, logging
from pypiserver import __main__
try:
from unittest import mock
except ImportError:
import mock
import pytest
from pypiserver import __main__
class main_wrapper(object):
@ -16,7 +24,8 @@ class main_wrapper(object):
def __call__(self, argv):
sys.stdout.write("Running %s\n" % (argv,))
__main__.main(["pypi-server"] + argv)
with warnings.catch_warnings():
__main__.main(argv)
return self.run_kwargs
@ -41,130 +50,156 @@ def main(request, monkeypatch):
return main
def test_default_pkgdir(main):
main([])
assert os.path.normpath(main.pkgdir) == os.path.normpath(os.path.expanduser("~/packages"))
class TestMain(object):
"""Test the main() method."""
@pytest.fixture(autouse=True)
def patch_warner(self, monkeypatch):
"""Don't bother emitting deprecation warnings."""
monkeypatch.setattr(__main__, '_warn_deprecation', lambda: None)
@pytest.fixture(autouse=True)
def patch_argv(self, monkeypatch):
"""Set argv so it looks like pypi-server started the script."""
new_argv = list(sys.argv)
new_argv[0] = 'pypi-server'
monkeypatch.setattr(__main__.sys, 'argv', new_argv)
def test_default_pkgdir(self, main):
main([])
assert os.path.normpath(main.pkgdir) == (
os.path.normpath(os.path.expanduser("~/packages"))
)
def test_noargs(self, main):
assert main([]) == {'host': "0.0.0.0", 'port': 8080, 'server': "auto"}
def test_port(self, main):
expected = dict(host="0.0.0.0", port=8081, server="auto")
assert main(["--port=8081"]) == expected
assert main(["--port", "8081"]) == expected
assert main(["-p", "8081"]) == expected
def test_server(self, main):
assert main(["--server=paste"])["server"] == "paste"
assert main(["--server", "cherrypy"])["server"] == "cherrypy"
@pytest.mark.skipif(True, reason='deprecated')
def test_root(self, main):
main(["--root", "."])
assert main.app.module.packages.root == os.path.abspath(".")
assert main.pkgdir == os.path.abspath(".")
@pytest.mark.skipif(True, reason='deprecated')
def test_root_r(self, main):
main(["-r", "."])
assert main.app.module.packages.root == os.path.abspath(".")
assert main.pkgdir == os.path.abspath(".")
def test_fallback_url(self, main):
main(["--fallback-url", "https://pypi.mirror/simple"])
assert main.app.config.fallback_url == "https://pypi.mirror/simple"
def test_fallback_url_default(self, main):
main([])
assert main.app.config.fallback_url == "https://pypi.org/simple"
def test_hash_algo_default(self, main):
main([])
assert main.app.config.hash_algo == 'md5'
def test_hash_algo(self, main):
main(['--hash-algo=sha256'])
assert main.app.config.hash_algo == 'sha256'
def test_hash_algo_off(self, main):
main(['--hash-algo=off'])
assert main.app.config.hash_algo is None
main(['--hash-algo=0'])
assert main.app.config.hash_algo is None
main(['--hash-algo=no'])
assert main.app.config.hash_algo is None
main(['--hash-algo=false'])
assert main.app.config.hash_algo is None
def test_hash_algo_BAD(self, main):
with pytest.raises(SystemExit):
main(['--hash-algo', 'BAD'])
def test_logging(self, main, tmpdir):
logfile = tmpdir.mkdir("logs").join('test.log')
main(["-v", "--log-file", logfile.strpath])
assert logfile.check(), logfile
def test_logging_verbosity(self, main):
main([])
assert logging.getLogger().level == logging.WARN
main(["-v"])
assert logging.getLogger().level == logging.INFO
main(["-v", "-v"])
assert logging.getLogger().level == logging.DEBUG
main(["-v", "-v", "-v"])
assert logging.getLogger().level == logging.NOTSET
def test_welcome_file(self, main):
sample_msg_file = os.path.join(
os.path.dirname(__file__),
"sample_msg.html"
)
main(["--welcome", sample_msg_file])
assert "Hello pypiserver tester!" in main.app.config.welcome_msg
def test_welcome_file_default(self, main):
main([])
assert "Welcome to pypiserver!" in main.app.config.welcome_msg
def test_password_without_auth_list(self, main, monkeypatch):
sysexit = mock.MagicMock(side_effect=ValueError('BINGO'))
monkeypatch.setattr('sys.exit', sysexit)
with pytest.raises(ValueError) as ex:
main(["-P", "pswd-file", "-a", ""])
assert ex.value.args[0] == 'BINGO'
with pytest.raises(ValueError) as ex:
main(["-a", "."])
assert ex.value.args[0] == 'BINGO'
with pytest.raises(ValueError) as ex:
main(["-a", ""])
assert ex.value.args[0] == 'BINGO'
with pytest.raises(ValueError) as ex:
main(["-P", "."])
assert ex.value.args[0] == 'BINGO'
def test_password_alone(self, main, monkeypatch):
monkeypatch.setitem(sys.modules, 'passlib', mock.MagicMock())
monkeypatch.setitem(sys.modules, 'passlib.apache', mock.MagicMock())
main(["-P", "pswd-file"])
assert main.app.config.authenticate == ['update']
def test_dot_password_without_auth_list(self, main, monkeypatch):
main(["-P", ".", "-a", ""])
assert main.app.config.authenticate == []
main(["-P", ".", "-a", "."])
assert main.app.config.authenticate == []
def test_noargs(main):
assert main([]) == {'host': "0.0.0.0", 'port': 8080, 'server': "auto"}
class TestPypiserverDeprecation(object):
"""Test the deprecation of the old pypi-server command.
Note that these tests should be removed when the pypi-server
command is removed.
"""
def test_port(main):
expected = dict(host="0.0.0.0", port=8081, server="auto")
assert main(["--port=8081"]) == expected
assert main(["--port", "8081"]) == expected
assert main(["-p", "8081"]) == expected
@pytest.fixture(autouse=True)
def patch_run(self, monkeypatch):
"""Monkeypatch argv and the _run_app_from_config method."""
monkeypatch.setattr(argparse._sys, 'argv', ['pypi-server'])
monkeypatch.setattr(__main__, '_run_app_from_config', lambda c: None)
def test_server(main):
assert main(["--server=paste"])["server"] == "paste"
assert main(["--server", "cherrypy"])["server"] == "cherrypy"
def test_root(main):
main(["--root", "."])
assert main.app.module.packages.root == os.path.abspath(".")
assert main.pkgdir == os.path.abspath(".")
def test_root_r(main):
main(["-r", "."])
assert main.app.module.packages.root == os.path.abspath(".")
assert main.pkgdir == os.path.abspath(".")
# def test_root_multiple(main):
# pytest.raises(SystemExit, main, [".", "."])
# pytest.raises(SystemExit, main, ["-r", ".", "."])
def test_fallback_url(main):
main(["--fallback-url", "https://pypi.mirror/simple"])
assert main.app.module.config.fallback_url == "https://pypi.mirror/simple"
def test_fallback_url_default(main):
main([])
assert main.app.module.config.fallback_url == "https://pypi.org/simple"
def test_hash_algo_default(main):
main([])
assert main.app.module.config.hash_algo == 'md5'
def test_hash_algo(main):
main(['--hash-algo=sha256'])
assert main.app.module.config.hash_algo == 'sha256'
def test_hash_algo_off(main):
main(['--hash-algo=off'])
assert main.app.module.config.hash_algo is None
main(['--hash-algo=0'])
assert main.app.module.config.hash_algo is None
main(['--hash-algo=no'])
assert main.app.module.config.hash_algo is None
main(['--hash-algo=false'])
assert main.app.module.config.hash_algo is None
def test_hash_algo_BAD(main):
with pytest.raises(SystemExit) as excinfo:
main(['--hash-algo BAD'])
#assert excinfo.value.message == 'some info' main(['--hash-algo BAD'])
print(excinfo)
def test_logging(main, tmpdir):
logfile = tmpdir.mkdir("logs").join('test.log')
main(["-v", "--log-file", logfile.strpath])
assert logfile.check(), logfile
def test_logging_verbosity(main):
main([])
assert logging.getLogger().level == logging.WARN
main(["-v"])
assert logging.getLogger().level == logging.INFO
main(["-v", "-v"])
assert logging.getLogger().level == logging.DEBUG
main(["-v", "-v", "-v"])
assert logging.getLogger().level == logging.NOTSET
def test_welcome_file(main):
sample_msg_file = os.path.join(os.path.dirname(__file__), "sample_msg.html")
main(["--welcome", sample_msg_file])
assert "Hello pypiserver tester!" in main.app.module.config.welcome_msg
def test_welcome_file_default(main):
main([])
assert "Welcome to pypiserver!" in main.app.module.config.welcome_msg
def test_password_without_auth_list(main, monkeypatch):
sysexit = mock.MagicMock(side_effect=ValueError('BINGO'))
monkeypatch.setattr('sys.exit', sysexit)
with pytest.raises(ValueError) as ex:
main(["-P", "pswd-file", "-a", ""])
assert ex.value.args[0] == 'BINGO'
with pytest.raises(ValueError) as ex:
main(["-a", "."])
assert ex.value.args[0] == 'BINGO'
with pytest.raises(ValueError) as ex:
main(["-a", ""])
assert ex.value.args[0] == 'BINGO'
with pytest.raises(ValueError) as ex:
main(["-P", "."])
assert ex.value.args[0] == 'BINGO'
def test_password_alone(main, monkeypatch):
monkeypatch.setitem(sys.modules, 'passlib', mock.MagicMock())
monkeypatch.setitem(sys.modules, 'passlib.apache', mock.MagicMock())
main(["-P", "pswd-file"])
assert main.app.module.config.authenticated == ['update']
def test_dot_password_without_auth_list(main, monkeypatch):
main(["-P", ".", "-a", ""])
assert main.app.module.config.authenticated == []
main(["-P", ".", "-a", "."])
assert main.app.module.config.authenticated == []
def test_warns(self):
"""Test that a deprecation warning is thrown."""
warnings.simplefilter('always', category=DeprecationWarning)
with pytest.warns(DeprecationWarning):
__main__.main()

26
tests/test_paste.py Normal file

@ -0,0 +1,26 @@
"""Test the paste-compatible app factory."""
import pytest
from pypiserver import paste
from pypiserver.core import load_plugins
def configure_fake(config):
"""Fake for the configure() function."""
config.plugins = load_plugins()
return config, []
@pytest.mark.parametrize('conf_options', [
{},
{'root': '~/stable_packages'},
{'root': '~/unstable_packages', 'authenticated': 'upload',
'passwords': '~/htpasswd'},
# Verify that the strip parser works properly.
{'authenticated': str('upload')},
])
def test_paste_app_factory(conf_options, monkeypatch):
"""Test the paste_app_factory method."""
monkeypatch.setattr('pypiserver.core.configure', configure_fake)
paste.paste_app_factory({}, **conf_options)

@ -6,8 +6,10 @@ deps=-r{toxinidir}/requirements/dev.pip
whitelist_externals=
/bin/sh
commands=
# Allow a custom setup command to be specified via env vars,
# which is used in the `pre_twine` test below.
/bin/sh -c "{env:PYPISERVER_SETUP_CMD:true}"
pytest []
pytest {posargs}
sitepackages=False
[pytest]