Run black on codebase (#336)

* run black on codebase

* add black check to travis ci

* add pyproject.toml, revert black on bottle.py

Co-authored-by: Pelle Koster <pelle.koster@nginfra.nl>
This commit is contained in:
PelleK 2020-10-06 04:04:22 +02:00 committed by GitHub
parent 4ab210c82b
commit 8101cf9192
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 811 additions and 551 deletions

View File

@ -19,3 +19,9 @@ script:
branches:
except:
- standalone
jobs:
include:
- python: 3.8
install: pip install -U black
script: black --check .

View File

@ -45,13 +45,13 @@ import docopt
my_dir = osp.dirname(__file__)
VFILE = osp.join(my_dir, '..', 'pypiserver', '__init__.py')
VFILE = osp.join(my_dir, "..", "pypiserver", "__init__.py")
VFILE_regex_v = re.compile(r'version *= *__version__ *= *"([^"]+)"')
VFILE_regex_d = re.compile(r'__updated__ *= *"([^"]+)"')
RFILE = osp.join(my_dir, '..', 'README.rst')
RFILE = osp.join(my_dir, "..", "README.rst")
PYTEST_ARGS = [osp.join('tests', 'test_docs.py')]
PYTEST_ARGS = [osp.join("tests", "test_docs.py")]
class CmdException(Exception):
@ -60,7 +60,7 @@ class CmdException(Exception):
@fnt.lru_cache()
def read_txtfile(fpath):
with open(fpath, 'rt', encoding='utf-8') as fp:
with open(fpath, "rt", encoding="utf-8") as fp:
return fp.read()
@ -75,9 +75,10 @@ def extract_file_regexes(fpath, regexes):
matches = [regex.search(txt) for regex in regexes]
if not all(matches):
raise CmdException("Failed extracting current versions with: %s"
"\n matches: %s" %
(regexes, matches))
raise CmdException(
"Failed extracting current versions with: %s"
"\n matches: %s" % (regexes, matches)
)
return [m.group(1) for m in matches]
@ -96,8 +97,7 @@ def replace_substrings(files, subst_pairs):
def format_syscmd(cmd):
if isinstance(cmd, (list, tuple)):
cmd = ' '.join('"%s"' % s if ' ' in s else s
for s in cmd)
cmd = " ".join('"%s"' % s if " " in s else s for s in cmd)
else:
assert isinstance(cmd, str), cmd
@ -107,7 +107,7 @@ def format_syscmd(cmd):
def strip_ver2_commonprefix(ver1, ver2):
cprefix = osp.commonprefix([ver1, ver2])
if cprefix:
striplen = cprefix.rfind('.')
striplen = cprefix.rfind(".")
if striplen > 0:
striplen += 1
else:
@ -123,7 +123,8 @@ def run_testcases():
retcode = pytest.main(PYTEST_ARGS)
if retcode:
raise CmdException(
"Doc TCs failed(%s), probably version-bumping has failed!" % retcode)
"Doc TCs failed(%s), probably version-bumping has failed!" % retcode
)
def exec_cmd(cmd):
@ -137,14 +138,14 @@ def exec_cmd(cmd):
def do_commit(new_ver, old_ver, dry_run, amend, ver_files):
import pathlib
#new_ver = strip_ver2_commonprefix(old_ver, new_ver)
cmt_msg = 'chore(ver): bump %s-->%s' % (old_ver, new_ver)
# new_ver = strip_ver2_commonprefix(old_ver, new_ver)
cmt_msg = "chore(ver): bump %s-->%s" % (old_ver, new_ver)
ver_files = [pathlib.Path(f).as_posix() for f in ver_files]
git_add = ['git', 'add'] + ver_files
git_cmt = ['git', 'commit', '-m', cmt_msg]
git_add = ["git", "add"] + ver_files
git_cmt = ["git", "commit", "-m", cmt_msg]
if amend:
git_cmt.append('--amend')
git_cmt.append("--amend")
commands = [git_add, git_cmt]
for cmd in commands:
@ -157,9 +158,9 @@ def do_commit(new_ver, old_ver, dry_run, amend, ver_files):
def do_tag(tag, tag_msg, dry_run, force):
cmd = ['git', 'tag', tag, '-s', '-m', tag_msg]
cmd = ["git", "tag", tag, "-s", "-m", tag_msg]
if force:
cmd.append('--force')
cmd.append("--force")
cmd_str = format_syscmd(cmd)
if dry_run:
yield "DRYRUN: %s" % cmd_str
@ -168,15 +169,16 @@ def do_tag(tag, tag_msg, dry_run, force):
exec_cmd(cmd)
def bumpver(new_ver, dry_run=False, force=False, amend=False,
tag_name_or_commit=None):
def bumpver(
new_ver, dry_run=False, force=False, amend=False, tag_name_or_commit=None
):
"""
:param tag_name_or_commit:
if true, do `git commit`, if string, also `git tag` with that as msg.
"""
if amend:
## Restore previous version before extracting it.
cmd = 'git checkout HEAD~ --'.split()
cmd = "git checkout HEAD~ --".split()
cmd.append(VFILE)
cmd.append(RFILE)
exec_cmd(cmd)
@ -199,7 +201,7 @@ def bumpver(new_ver, dry_run=False, force=False, amend=False,
from datetime import datetime
new_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S%z')
new_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S%z")
ver_files = [osp.normpath(f) for f in [VFILE, RFILE]]
subst_pairs = [(old_ver, new_ver), (old_date, new_date)]
@ -208,12 +210,12 @@ def bumpver(new_ver, dry_run=False, force=False, amend=False,
new_txt, fpath, replacements = repl
if not dry_run:
with open(fpath, 'wt', encoding='utf-8') as fp:
with open(fpath, "wt", encoding="utf-8") as fp:
fp.write(new_txt)
yield '%s: ' % fpath
yield "%s: " % fpath
for old, new, nrepl in replacements:
yield ' %i x (%24s --> %s)' % (nrepl, old, new)
yield " %i x (%24s --> %s)" % (nrepl, old, new)
yield "...now launching DocTCs..."
run_testcases()
@ -222,20 +224,21 @@ def bumpver(new_ver, dry_run=False, force=False, amend=False,
yield from do_commit(new_ver, old_ver, dry_run, amend, ver_files)
if isinstance(tag_name_or_commit, str):
tag = 'v%s' % new_ver
tag = "v%s" % new_ver
yield from do_tag(tag, tag_name_or_commit, dry_run, force)
def main(*args):
opts = docopt.docopt(__doc__, argv=args)
new_ver = opts['<new-ver>']
new_ver = opts["<new-ver>"]
assert not new_ver or new_ver[0] != 'v', (
"Version '%s' must NOT start with `v`!" % new_ver)
assert not new_ver or new_ver[0] != "v", (
"Version '%s' must NOT start with `v`!" % new_ver
)
commit = opts['--commit']
tag = opts['--tag']
commit = opts["--commit"]
tag = opts["--tag"]
if tag:
tag_name_or_commit = tag
elif commit:
@ -244,11 +247,13 @@ def main(*args):
tag_name_or_commit = None
try:
for i in bumpver(new_ver,
opts['--dry-run'],
opts['--force'],
opts['--amend'],
tag_name_or_commit):
for i in bumpver(
new_ver,
opts["--dry-run"],
opts["--force"],
opts["--amend"],
tag_name_or_commit,
):
print(i)
except CmdException as ex:
sys.exit(str(ex))
@ -256,5 +261,5 @@ def main(*args):
raise ex
if __name__ == '__main__':
if __name__ == "__main__":
main(*sys.argv[1:])

View File

@ -23,7 +23,7 @@ from optparse import OptionParser
tmpeggs = tempfile.mkdtemp()
usage = '''\
usage = """\
[DESIRED PYTHON FOR BUILDOUT] bootstrap.py [options]
Bootstraps a buildout-based project.
@ -33,25 +33,34 @@ Python that you want bin/buildout to use.
Note that by using --setup-source and --download-base to point to
local resources, you can keep this script from going over the network.
'''
"""
parser = OptionParser(usage=usage)
parser.add_option("-v", "--version", help="use a specific zc.buildout version")
parser.add_option("-t", "--accept-buildout-test-releases",
dest='accept_buildout_test_releases',
action="store_true", default=False,
help=("Normally, if you do not specify a --version, the "
"bootstrap script and buildout gets the newest "
"*final* versions of zc.buildout and its recipes and "
"extensions for you. If you use this flag, "
"bootstrap and buildout will get the newest releases "
"even if they are alphas or betas."))
parser.add_option("-c", "--config-file",
help=("Specify the path to the buildout configuration "
"file to be used."))
parser.add_option("-f", "--find-links",
help="Specify a URL to search for buildout releases")
parser.add_option(
"-t",
"--accept-buildout-test-releases",
dest="accept_buildout_test_releases",
action="store_true",
default=False,
help=(
"Normally, if you do not specify a --version, the "
"bootstrap script and buildout gets the newest "
"*final* versions of zc.buildout and its recipes and "
"extensions for you. If you use this flag, "
"bootstrap and buildout will get the newest releases "
"even if they are alphas or betas."
),
)
parser.add_option(
"-c",
"--config-file",
help=("Specify the path to the buildout configuration " "file to be used."),
)
parser.add_option(
"-f", "--find-links", help="Specify a URL to search for buildout releases"
)
options, args = parser.parse_args()
@ -62,7 +71,8 @@ options, args = parser.parse_args()
to_reload = False
try:
import pkg_resources, setuptools
if not hasattr(pkg_resources, '_distribute'):
if not hasattr(pkg_resources, "_distribute"):
to_reload = True
raise ImportError
except ImportError:
@ -73,13 +83,14 @@ except ImportError:
except ImportError:
from urllib2 import urlopen
exec(urlopen('http://python-distribute.org/distribute_setup.py').read(), ez)
exec(urlopen("http://python-distribute.org/distribute_setup.py").read(), ez)
setup_args = dict(to_dir=tmpeggs, download_delay=0, no_fake=True)
ez['use_setuptools'](**setup_args)
ez["use_setuptools"](**setup_args)
if to_reload:
reload(pkg_resources)
import pkg_resources
# This does not (always?) update the default working set. We will
# do it.
for path in sys.path:
@ -89,37 +100,47 @@ except ImportError:
######################################################################
# Install buildout
ws = pkg_resources.working_set
ws = pkg_resources.working_set
cmd = [sys.executable, '-c',
'from setuptools.command.easy_install import main; main()',
'-mZqNxd', tmpeggs]
cmd = [
sys.executable,
"-c",
"from setuptools.command.easy_install import main; main()",
"-mZqNxd",
tmpeggs,
]
find_links = os.environ.get(
'bootstrap-testing-find-links',
options.find_links or
('http://downloads.buildout.org/'
if options.accept_buildout_test_releases else None)
)
"bootstrap-testing-find-links",
options.find_links
or (
"http://downloads.buildout.org/"
if options.accept_buildout_test_releases
else None
),
)
if find_links:
cmd.extend(['-f', find_links])
cmd.extend(["-f", find_links])
distribute_path = ws.find(
pkg_resources.Requirement.parse('distribute')).location
pkg_resources.Requirement.parse("distribute")
).location
requirement = 'zc.buildout'
requirement = "zc.buildout"
version = options.version
if version is None and not options.accept_buildout_test_releases:
# Figure out the most recent final version of zc.buildout.
import setuptools.package_index
_final_parts = '*final-', '*final'
_final_parts = "*final-", "*final"
def _final_version(parsed_version):
for part in parsed_version:
if (part[:1] == '*') and (part not in _final_parts):
if (part[:1] == "*") and (part not in _final_parts):
return False
return True
index = setuptools.package_index.PackageIndex(
search_path=[distribute_path])
index = setuptools.package_index.PackageIndex(search_path=[distribute_path])
if find_links:
index.add_find_links((find_links,))
req = pkg_resources.Requirement.parse(requirement)
@ -138,14 +159,13 @@ if version is None and not options.accept_buildout_test_releases:
best.sort()
version = best[-1].version
if version:
requirement = '=='.join((requirement, version))
requirement = "==".join((requirement, version))
cmd.append(requirement)
import subprocess
if subprocess.call(cmd, env=dict(os.environ, PYTHONPATH=distribute_path)) != 0:
raise Exception(
"Failed to execute command:\n%s",
repr(cmd)[1:-1])
raise Exception("Failed to execute command:\n%s", repr(cmd)[1:-1])
######################################################################
# Import and run buildout
@ -154,12 +174,12 @@ ws.add_entry(tmpeggs)
ws.require(requirement)
import zc.buildout.buildout
if not [a for a in args if '=' not in a]:
args.append('bootstrap')
if not [a for a in args if "=" not in a]:
args.append("bootstrap")
# if -c was provided, we push it back into args for buildout' main function
if options.config_file is not None:
args[0:0] = ['-c', options.config_file]
args[0:0] = ["-c", options.config_file]
zc.buildout.buildout.main(args)
shutil.rmtree(tmpeggs)

View File

@ -3,7 +3,7 @@ import re as _re
import sys
version = __version__ = "1.4.0"
__version_info__ = tuple(_re.split('[.-]', __version__))
__version_info__ = tuple(_re.split("[.-]", __version__))
__updated__ = "2020-10-03 17:45:07"
__title__ = "pypiserver"
@ -20,11 +20,12 @@ class Configuration(object):
vars(self).update(kwds)
def __repr__(self, *args, **kwargs):
return 'Configuration(**%s)' % vars(self)
return "Configuration(**%s)" % vars(self)
def __str__(self, *args, **kwargs):
return 'Configuration:\n%s' % '\n'.join('%20s = %s' % (k, v)
for k, v in sorted(vars(self).items()))
return "Configuration:\n%s" % "\n".join(
"%20s = %s" % (k, v) for k, v in sorted(vars(self).items())
)
def update(self, props):
d = props if isinstance(props, dict) else vars(props)
@ -35,27 +36,28 @@ DEFAULT_SERVER = "auto"
def default_config(
root=None,
host="0.0.0.0",
port=8080,
server=DEFAULT_SERVER,
redirect_to_fallback=True,
fallback_url=None,
authenticated=['update'],
password_file=None,
overwrite=False,
hash_algo='md5',
verbosity=1,
log_file=None,
log_stream="stderr",
log_frmt="%(asctime)s|%(name)s|%(levelname)s|%(thread)d|%(message)s",
log_req_frmt="%(bottle.request)s",
log_res_frmt="%(status)s",
log_err_frmt="%(body)s: %(exception)s \n%(traceback)s",
welcome_file=None,
cache_control=None,
auther=None,
VERSION=__version__):
root=None,
host="0.0.0.0",
port=8080,
server=DEFAULT_SERVER,
redirect_to_fallback=True,
fallback_url=None,
authenticated=["update"],
password_file=None,
overwrite=False,
hash_algo="md5",
verbosity=1,
log_file=None,
log_stream="stderr",
log_frmt="%(asctime)s|%(name)s|%(levelname)s|%(thread)d|%(message)s",
log_req_frmt="%(bottle.request)s",
log_res_frmt="%(status)s",
log_err_frmt="%(body)s: %(exception)s \n%(traceback)s",
welcome_file=None,
cache_control=None,
auther=None,
VERSION=__version__,
):
"""
Fetch default-opts with overridden kwds, capable of starting-up pypiserver.
@ -126,19 +128,19 @@ def app(**kwds):
from . import core
_app = __import__("_app", globals(), locals(), ["."], 1)
sys.modules.pop('pypiserver._app', None)
sys.modules.pop("pypiserver._app", None)
kwds = default_config(**kwds)
config, packages = core.configure(**kwds)
_app.config = config
_app.packages = packages
_app.app.module = _app # HACK for testing.
_app.app.module = _app # HACK for testing.
return _app.app
def str2bool(s, default):
if s is not None and s != '':
if s is not None and s != "":
return s.lower() not in ("no", "off", "0", "false")
return default
@ -164,7 +166,7 @@ def paste_app_factory(global_config, **local_conf):
if value is not None:
conf[attr] = int(value)
def upd_conf_with_list_item(conf, attr, sdict, sep=' ', parse=_str_strip):
def upd_conf_with_list_item(conf, attr, sdict, sep=" ", parse=_str_strip):
values = sdict.pop(attr, None)
if values:
conf[attr] = list(filter(None, map(parse, values.split(sep))))
@ -177,21 +179,21 @@ def paste_app_factory(global_config, **local_conf):
c = default_config()
upd_conf_with_bool_item(c, 'overwrite', local_conf)
upd_conf_with_bool_item(c, 'redirect_to_fallback', local_conf)
upd_conf_with_list_item(c, 'authenticated', local_conf, sep=' ')
upd_conf_with_list_item(c, 'root', local_conf, sep='\n', parse=_make_root)
upd_conf_with_int_item(c, 'verbosity', local_conf)
upd_conf_with_bool_item(c, "overwrite", local_conf)
upd_conf_with_bool_item(c, "redirect_to_fallback", local_conf)
upd_conf_with_list_item(c, "authenticated", local_conf, sep=" ")
upd_conf_with_list_item(c, "root", local_conf, sep="\n", parse=_make_root)
upd_conf_with_int_item(c, "verbosity", local_conf)
str_items = [
'fallback_url',
'hash_algo',
'log_err_frmt',
'log_file',
'log_frmt',
'log_req_frmt',
'log_res_frmt',
'password_file',
'welcome_file'
"fallback_url",
"hash_algo",
"log_err_frmt",
"log_file",
"log_frmt",
"log_req_frmt",
"log_res_frmt",
"password_file",
"welcome_file",
]
for str_item in str_items:
upd_conf_with_str_item(c, str_item, local_conf)
@ -203,9 +205,9 @@ def paste_app_factory(global_config, **local_conf):
def _logwrite(logger, level, msg):
if msg:
line_endings = ['\r\n', '\n\r', '\n']
line_endings = ["\r\n", "\n\r", "\n"]
for le in line_endings:
if msg.endswith(le):
msg = msg[:-len(le)]
msg = msg[: -len(le)]
if msg:
logger.log(level, msg)

View File

@ -13,10 +13,16 @@ import textwrap
import functools as ft
log = logging.getLogger('pypiserver.main')
log = logging.getLogger("pypiserver.main")
def init_logging(level=logging.NOTSET, frmt=None, filename=None, stream=sys.stderr, logger=None):
def init_logging(
level=logging.NOTSET,
frmt=None,
filename=None,
stream=sys.stderr,
logger=None,
):
logger = logger or logging.getLogger()
logger.setLevel(level)
@ -31,8 +37,10 @@ def init_logging(level=logging.NOTSET, frmt=None, filename=None, stream=sys.stde
handler.setFormatter(formatter)
logger.addHandler(handler)
def usage():
return textwrap.dedent("""\
return textwrap.dedent(
"""\
pypi-server [OPTIONS] [PACKAGES_DIRECTORY...]
start PyPI compatible package server serving packages from
PACKAGES_DIRECTORY. If PACKAGES_DIRECTORY is not given on the
@ -159,7 +167,8 @@ def usage():
containing arbitrary code.
Visit https://pypi.org/project/pypiserver/ for more information.
""")
"""
)
def main(argv=None):
@ -178,29 +187,33 @@ def main(argv=None):
update_blacklist_file = None
try:
opts, roots = getopt.getopt(argv[1:], "i:p:a:r:d:P:Uuvxoh", [
"interface=",
"passwords=",
"authenticate=",
"port=",
"root=",
"server=",
"fallback-url=",
"disable-fallback",
"overwrite",
"hash-algo=",
"blacklist-file=",
"log-file=",
"log-stream=",
"log-frmt=",
"log-req-frmt=",
"log-res-frmt=",
"log-err-frmt=",
"welcome=",
"cache-control=",
"version",
"help"
])
opts, roots = getopt.getopt(
argv[1:],
"i:p:a:r:d:P:Uuvxoh",
[
"interface=",
"passwords=",
"authenticate=",
"port=",
"root=",
"server=",
"fallback-url=",
"disable-fallback",
"overwrite",
"hash-algo=",
"blacklist-file=",
"log-file=",
"log-stream=",
"log-frmt=",
"log-req-frmt=",
"log-res-frmt=",
"log-err-frmt=",
"welcome=",
"cache-control=",
"version",
"help",
],
)
except getopt.GetoptError:
err = sys.exc_info()[1]
sys.exit("usage error: %s" % (err,))
@ -213,10 +226,10 @@ def main(argv=None):
err = sys.exc_info()[1]
sys.exit("Invalid port(%r) due to: %s" % (v, err))
elif k in ("-a", "--authenticate"):
c.authenticated = [a.lower()
for a in re.split("[, ]+", v.strip(" ,"))
if a]
if c.authenticated == ['.']:
c.authenticated = [
a.lower() for a in re.split("[, ]+", v.strip(" ,")) if a
]
if c.authenticated == ["."]:
c.authenticated = []
else:
actions = ("list", "download", "update")
@ -275,57 +288,75 @@ def main(argv=None):
print(usage())
sys.exit(0)
if (not c.authenticated and c.password_file != '.' or
c.authenticated and c.password_file == '.'):
if (
not c.authenticated
and c.password_file != "."
or c.authenticated
and c.password_file == "."
):
auth_err = "When auth-ops-list is empty (-a=.), password-file (-P=%r) must also be empty ('.')!"
sys.exit(auth_err % c.password_file)
if len(roots) == 0:
roots.append(os.path.expanduser("~/packages"))
roots=[os.path.abspath(x) for x in roots]
roots = [os.path.abspath(x) for x in roots]
c.root = roots
verbose_levels=[
logging.WARNING, logging.INFO, logging.DEBUG, logging.NOTSET]
log_level=list(zip(verbose_levels, range(c.verbosity)))[-1][0]
verbose_levels = [
logging.WARNING,
logging.INFO,
logging.DEBUG,
logging.NOTSET,
]
log_level = list(zip(verbose_levels, range(c.verbosity)))[-1][0]
valid_streams = {"none": None, "stderr": sys.stderr, "stdout": sys.stdout}
if c.log_stream not in valid_streams:
sys.exit("invalid log stream %s. choose one of %s" % (
c.log_stream, ", ".join(valid_streams.keys())))
sys.exit(
"invalid log stream %s. choose one of %s"
% (c.log_stream, ", ".join(valid_streams.keys()))
)
init_logging(
level=log_level,
filename=c.log_file,
frmt=c.log_frmt,
stream=valid_streams[c.log_stream]
stream=valid_streams[c.log_stream],
)
if command == "update":
from pypiserver.manage import update_all_packages
update_all_packages(
roots, update_directory,
dry_run=update_dry_run, stable_only=update_stable_only,
blacklist_file=update_blacklist_file
roots,
update_directory,
dry_run=update_dry_run,
stable_only=update_stable_only,
blacklist_file=update_blacklist_file,
)
return
# Fixes #49:
# The gevent server adapter needs to patch some
# modules BEFORE importing bottle!
if c.server and c.server.startswith('gevent'):
if c.server and c.server.startswith("gevent"):
import gevent.monkey # @UnresolvedImport
gevent.monkey.patch_all()
from pypiserver import bottle
if c.server not in bottle.server_names:
sys.exit("unknown server %r. choose one of %s" % (
c.server, ", ".join(bottle.server_names.keys())))
sys.exit(
"unknown server %r. choose one of %s"
% (c.server, ", ".join(bottle.server_names.keys()))
)
bottle.debug(c.verbosity > 1)
bottle._stderr = ft.partial(pypiserver._logwrite,
logging.getLogger(bottle.__name__), logging.INFO)
bottle._stderr = ft.partial(
pypiserver._logwrite, logging.getLogger(bottle.__name__), logging.INFO
)
app = pypiserver.app(**vars(c))
bottle.run(app=app, host=c.host, port=c.port, server=c.server)

View File

@ -174,7 +174,7 @@ def file_upload():
):
raise HTTPError(
400,
"Unrelated signature %r for package %r!" % (ufiles.sig, ufiles.pkg)
"Unrelated signature %r for package %r!" % (ufiles.sig, ufiles.pkg),
)
for uf in ufiles:

View File

@ -8,20 +8,21 @@ from os.path import dirname
from watchdog.observers import Observer
import threading
class CacheManager(object):
"""
A naive cache implementation for listdir and digest_file
A naive cache implementation for listdir and digest_file
The listdir_cache is just a giant list of PkgFile objects, and
for simplicity it is invalidated anytime a modification occurs
within the directory it represents. If we were smarter about
the way that the listdir data structure were created/stored,
then we could do more granular invalidation. In practice, this
is good enough for now.
The listdir_cache is just a giant list of PkgFile objects, and
for simplicity it is invalidated anytime a modification occurs
within the directory it represents. If we were smarter about
the way that the listdir data structure were created/stored,
then we could do more granular invalidation. In practice, this
is good enough for now.
The digest_cache exists on a per-file basis, because computing
hashes on large files can get expensive, and it's very easy to
invalidate specific filenames.
The digest_cache exists on a per-file basis, because computing
hashes on large files can get expensive, and it's very easy to
invalidate specific filenames.
"""
def __init__(self):
@ -85,8 +86,8 @@ class CacheManager(object):
self.watched.add(root)
self.observer.schedule(_EventHandler(self, root), root, recursive=True)
class _EventHandler(object):
class _EventHandler(object):
def __init__(self, cache, root):
self.cache = cache
self.root = root
@ -106,7 +107,7 @@ class _EventHandler(object):
# Digests are more expensive: invalidate specific paths
paths = []
if event.event_type == 'moved':
if event.event_type == "moved":
paths.append(event.src_path)
paths.append(event.dest_path)
else:
@ -117,4 +118,5 @@ class _EventHandler(object):
for path in paths:
subcache.pop(path, None)
cache_manager = CacheManager()

View File

@ -32,7 +32,7 @@ def configure(**kwds):
log.info("+++Pypiserver invoked with: %s", c)
if c.root is None:
c. root = os.path.expanduser("~/packages")
c.root = os.path.expanduser("~/packages")
roots = c.root if isinstance(c.root, (list, tuple)) else [c.root]
roots = [os.path.abspath(r) for r in roots]
for r in roots:
@ -49,8 +49,9 @@ def configure(**kwds):
if not c.authenticated:
c.authenticated = []
if not callable(c.auther):
if c.password_file and c.password_file != '.':
if c.password_file and c.password_file != ".":
from passlib.apache import HtpasswdFile
htPsswdFile = HtpasswdFile(c.password_file)
else:
c.password_file = htPsswdFile = None
@ -61,13 +62,17 @@ def configure(**kwds):
if not c.welcome_file:
c.welcome_file = "welcome.html"
c.welcome_msg = pkg_resources.resource_string( # @UndefinedVariable
__name__, "welcome.html").decode("utf-8") # @UndefinedVariable
__name__, "welcome.html"
).decode(
"utf-8"
) # @UndefinedVariable
else:
with io.open(c.welcome_file, 'r', encoding='utf-8') as fd:
with io.open(c.welcome_file, "r", encoding="utf-8") as fd:
c.welcome_msg = fd.read()
except Exception:
log.warning(
"Could not load welcome-file(%s)!", c.welcome_file, exc_info=1)
"Could not load welcome-file(%s)!", c.welcome_file, exc_info=1
)
if c.fallback_url is None:
c.fallback_url = "https://pypi.org/simple"
@ -76,10 +81,10 @@ def configure(**kwds):
try:
halgos = hashlib.algorithms_available
except AttributeError:
halgos = ['md5', 'sha1', 'sha224', 'sha256', 'sha384', 'sha512']
halgos = ["md5", "sha1", "sha224", "sha256", "sha384", "sha512"]
if c.hash_algo not in halgos:
sys.exit('Hash-algorithm %s not one of: %s' % (c.hash_algo, halgos))
sys.exit("Hash-algorithm %s not one of: %s" % (c.hash_algo, halgos))
log.info("+++Pypiserver started with: %s", c)
@ -100,32 +105,34 @@ mimetypes.add_type("text/plain", ".asc")
# ### Next 2 functions adapted from :mod:`distribute.pkg_resources`.
#
component_re = re.compile(r'(\d+ | [a-z]+ | \.| -)', re.I | re.VERBOSE)
replace = {'pre': 'c', 'preview': 'c', '-': 'final-', 'rc': 'c', 'dev': '@'}.get
component_re = re.compile(r"(\d+ | [a-z]+ | \.| -)", re.I | re.VERBOSE)
replace = {"pre": "c", "preview": "c", "-": "final-", "rc": "c", "dev": "@"}.get
def _parse_version_parts(s):
for part in component_re.split(s):
part = replace(part, part)
if part in ['', '.']:
if part in ["", "."]:
continue
if part[:1] in '0123456789':
if part[:1] in "0123456789":
yield part.zfill(8) # pad for numeric comparison
else:
yield '*' + part
yield "*" + part
yield '*final' # ensure that alpha/beta/candidate are before final
yield "*final" # ensure that alpha/beta/candidate are before final
def parse_version(s):
parts = []
for part in _parse_version_parts(s.lower()):
if part.startswith('*'):
if part.startswith("*"):
# remove trailing zeros from each series of numeric parts
while parts and parts[-1] == '00000000':
while parts and parts[-1] == "00000000":
parts.pop()
parts.append(part)
return tuple(parts)
#
#### -- End of distribute's code.
@ -133,16 +140,18 @@ def parse_version(s):
_archive_suffix_rx = re.compile(
r"(\.zip|\.tar\.gz|\.tgz|\.tar\.bz2|-py[23]\.\d-.*|"
r"\.win-amd64-py[23]\.\d\..*|\.win32-py[23]\.\d\..*|\.egg)$",
re.I)
re.I,
)
wheel_file_re = re.compile(
r"""^(?P<namever>(?P<name>.+?)-(?P<ver>\d.*?))
((-(?P<build>\d.*?))?-(?P<pyver>.+?)-(?P<abi>.+?)-(?P<plat>.+?)
\.whl|\.dist-info)$""",
re.VERBOSE)
_pkgname_re = re.compile(r'-\d+[a-z_.!+]', re.I)
re.VERBOSE,
)
_pkgname_re = re.compile(r"-\d+[a-z_.!+]", re.I)
_pkgname_parts_re = re.compile(
r"[\.\-](?=cp\d|py\d|macosx|linux|sunos|solaris|irix|aix|cygwin|win)",
re.I)
r"[\.\-](?=cp\d|py\d|macosx|linux|sunos|solaris|irix|aix|cygwin|win)", re.I
)
def _guess_pkgname_and_version_wheel(basename):
@ -166,16 +175,16 @@ def guess_pkgname_and_version(path):
return _guess_pkgname_and_version_wheel(path)
if not _archive_suffix_rx.search(path):
return
path = _archive_suffix_rx.sub('', path)
if '-' not in path:
pkgname, version = path, ''
elif path.count('-') == 1:
pkgname, version = path.split('-', 1)
elif '.' not in path:
pkgname, version = path.rsplit('-', 1)
path = _archive_suffix_rx.sub("", path)
if "-" not in path:
pkgname, version = path, ""
elif path.count("-") == 1:
pkgname, version = path.split("-", 1)
elif "." not in path:
pkgname, version = path.rsplit("-", 1)
else:
pkgname = _pkgname_re.split(path)[0]
ver_spec = path[len(pkgname) + 1:]
ver_spec = path[len(pkgname) + 1 :]
parts = _pkgname_parts_re.split(ver_spec)
version = parts[0]
return pkgname, version
@ -198,15 +207,22 @@ def is_allowed_path(path_part):
class PkgFile(object):
__slots__ = ['fn', 'root', '_fname_and_hash',
'relfn', 'relfn_unix',
'pkgname_norm',
'pkgname',
'version',
'parsed_version',
'replaces']
__slots__ = [
"fn",
"root",
"_fname_and_hash",
"relfn",
"relfn_unix",
"pkgname_norm",
"pkgname",
"version",
"parsed_version",
"replaces",
]
def __init__(self, pkgname, version, fn=None, root=None, relfn=None, replaces=None):
def __init__(
self, pkgname, version, fn=None, root=None, relfn=None, replaces=None
):
self.pkgname = pkgname
self.pkgname_norm = normalize_pkgname(pkgname)
self.version = version
@ -220,14 +236,21 @@ class PkgFile(object):
def __repr__(self):
return "%s(%s)" % (
self.__class__.__name__,
", ".join(["%s=%r" % (k, getattr(self, k, 'AttributeError'))
for k in sorted(self.__slots__)]))
", ".join(
[
"%s=%r" % (k, getattr(self, k, "AttributeError"))
for k in sorted(self.__slots__)
]
),
)
def fname_and_hash(self, hash_algo):
if not hasattr(self, '_fname_and_hash'):
if not hasattr(self, "_fname_and_hash"):
if hash_algo:
self._fname_and_hash = '%s#%s=%s' % (
self.relfn_unix, hash_algo, digest_file(self.fn, hash_algo)
self._fname_and_hash = "%s#%s=%s" % (
self.relfn_unix,
hash_algo,
digest_file(self.fn, hash_algo),
)
else:
self._fname_and_hash = self.relfn_unix
@ -248,10 +271,13 @@ def _listdir(root):
continue
pkgname, version = res
if pkgname:
yield PkgFile(pkgname=pkgname,
version=version,
fn=fn, root=root,
relfn=fn[len(root) + 1:])
yield PkgFile(
pkgname=pkgname,
version=version,
fn=fn,
root=root,
relfn=fn[len(root) + 1 :],
)
def read_lines(filename):
@ -264,14 +290,15 @@ def read_lines(filename):
try:
with open(filename) as f:
lines = [
line
for line in (ln.strip() for ln in f.readlines())
if line and not line.startswith('#')
line
for line in (ln.strip() for ln in f.readlines())
if line and not line.startswith("#")
]
except Exception:
log.error('Failed to read package blacklist file "%s". '
'Aborting server startup, please fix this.'
% filename)
log.error(
'Failed to read package blacklist file "%s". '
"Aborting server startup, please fix this." % filename
)
raise
return lines
@ -310,7 +337,7 @@ def get_bad_url_redirect_path(request, prefix):
p = request.custom_fullpath
if p.endswith("/"):
p = p[:-1]
p = p.rsplit('/', 1)[0]
p = p.rsplit("/", 1)[0]
prefix = quote(prefix)
p += "/simple/{}/".format(prefix)
return p
@ -325,10 +352,10 @@ def _digest_file(fpath, hash_algo):
From http://stackoverflow.com/a/21565932/548792
"""
blocksize = 2**16
blocksize = 2 ** 16
digester = getattr(hashlib, hash_algo)()
with open(fpath, 'rb') as f:
for block in iter(lambda: f.read(blocksize), b''):
with open(fpath, "rb") as f:
for block in iter(lambda: f.read(blocksize), b""):
digester.update(block)
return digester.hexdigest()
@ -344,6 +371,7 @@ try:
# fpath must be absolute path
return cache_manager.digest_file(fpath, hash_algo, _digest_file)
except ImportError:
listdir = _listdir
digest_file = _digest_file

View File

@ -17,6 +17,8 @@ if sys.version_info >= (3, 0):
def make_pypi_client(url):
return Server(url)
else:
from xmlrpclib import Transport # @UnresolvedImport
from xmlrpclib import ServerProxy
@ -24,7 +26,6 @@ else:
import urllib
class ProxiedTransport(Transport):
def set_proxy(self, proxy):
self.proxy = proxy
@ -38,17 +39,19 @@ else:
def send_request(self, connection, handler, request_body):
connection.putrequest(
"POST", 'http://%s%s' % (self.realhost, handler))
"POST", "http://%s%s" % (self.realhost, handler)
)
def send_host(self, connection, host):
connection.putheader('Host', self.realhost)
connection.putheader("Host", self.realhost)
def make_pypi_client(url):
http_proxy_url = urllib.getproxies().get("http", "")
if http_proxy_url:
http_proxy_spec = urllib.splithost(
urllib.splittype(http_proxy_url)[1])[0]
urllib.splittype(http_proxy_url)[1]
)[0]
transport = ProxiedTransport()
transport.set_proxy(http_proxy_spec)
else:
@ -92,9 +95,7 @@ def build_releases(pkg, versions):
for x in versions:
parsed_version = core.parse_version(x)
if parsed_version > pkg.parsed_version:
yield core.PkgFile(pkgname=pkg.pkgname,
version=x,
replaces=pkg)
yield core.PkgFile(pkgname=pkg.pkgname, version=x, replaces=pkg)
def find_updates(pkgset, stable_only=True):
@ -108,7 +109,8 @@ def find_updates(pkgset, stable_only=True):
latest_pkgs = frozenset(filter_latest_pkgs(pkgset))
sys.stdout.write(
"checking %s packages for newer version\n" % len(latest_pkgs),)
"checking %s packages for newer version\n" % len(latest_pkgs),
)
need_update = set()
pypi = make_pypi_client("https://pypi.org/pypi/")
@ -135,8 +137,10 @@ def find_updates(pkgset, stable_only=True):
write("\n\n")
if no_releases:
sys.stdout.write("no releases found on pypi for %s\n\n" %
(", ".join(sorted(no_releases)),))
sys.stdout.write(
"no releases found on pypi for %s\n\n"
% (", ".join(sorted(no_releases)),)
)
return need_update
@ -148,20 +152,25 @@ class PipCmd(object):
def update_root(pip_version):
"""Yield an appropriate root command depending on pip version."""
# legacy_pip = StrictVersion(pip_version) < StrictVersion('10.0')
legacy_pip = LooseVersion(pip_version) < LooseVersion('10.0')
for part in ('pip', '-q'):
legacy_pip = LooseVersion(pip_version) < LooseVersion("10.0")
for part in ("pip", "-q"):
yield part
yield 'install' if legacy_pip else 'download'
yield "install" if legacy_pip else "download"
@staticmethod
def update(cmd_root, destdir, pkg_name, pkg_version,
index='https://pypi.org/simple'):
def update(
cmd_root,
destdir,
pkg_name,
pkg_version,
index="https://pypi.org/simple",
):
"""Yield an update command for pip."""
for part in cmd_root:
yield part
for part in ('--no-deps', '-i', index, '-d', destdir):
for part in ("--no-deps", "-i", index, "-d", destdir):
yield part
yield '{}=={}'.format(pkg_name, pkg_version)
yield "{}=={}".format(pkg_name, pkg_version)
def update_package(pkg, destdir, dry_run=False):
@ -176,7 +185,7 @@ def update_package(pkg, destdir, dry_run=False):
PipCmd.update_root(pip.__version__),
destdir or os.path.dirname(pkg.replaces.fn),
pkg.pkgname,
pkg.version
pkg.version,
)
)
@ -200,15 +209,22 @@ def update(pkgset, destdir=None, dry_run=False, stable_only=True):
update_package(pkg, destdir, dry_run=dry_run)
def update_all_packages(roots, destdir=None, dry_run=False, stable_only=True, blacklist_file=None):
def update_all_packages(
roots, destdir=None, dry_run=False, stable_only=True, blacklist_file=None
):
all_packages = itertools.chain(*[core.listdir(r) for r in roots])
skip_packages = set()
if blacklist_file:
skip_packages = set(core.read_lines(blacklist_file))
print('Skipping update of blacklisted packages (listed in "{}"): {}'
.format(blacklist_file, ', '.join(sorted(skip_packages))))
print(
'Skipping update of blacklisted packages (listed in "{}"): {}'.format(
blacklist_file, ", ".join(sorted(skip_packages))
)
)
packages = frozenset([pkg for pkg in all_packages if pkg.pkgname not in skip_packages])
packages = frozenset(
[pkg for pkg in all_packages if pkg.pkgname not in skip_packages]
)
update(packages, destdir, dry_run, stable_only)

27
pyproject.toml Normal file
View File

@ -0,0 +1,27 @@
[build-system]
# Minimum requirements for the build system to execute.
requires = ["setuptools", "wheel"] # PEP 508 specifications.
[tool.black]
# Configuration for the Black autoformatter
line-length = 80
target-version = ['py36']
exclude = '''
(
/(
\.direnv
| \.eggs # exclude a few common directories in the
| \.git # root of the project
| \.mypy_cache
| \.tox
| \.venv
| \.vscode
| _build
| build
| dist
| venv
| pypiserver/bottle.py
)
)
'''

View File

@ -1,15 +1,16 @@
## A test-distribution to check if
## A test-distribution to check if
# bottle supports uploading 100's of packages,
# see: https://github.com/pypiserver/pypiserver/issues/82
#
# Has been run once `pip wheel .`, just to generate:
# ./wheelhouse/centodeps-0.0.0-cp34-none-win_amd64.whl
#
#
from setuptools import setup
setup(
name='centodeps',
install_requires=['a==1.0'] * 200,
name="centodeps",
install_requires=["a==1.0"] * 200,
options={
'bdist_wheel': {'universal': True},
"bdist_wheel": {"universal": True},
},
)

View File

@ -126,7 +126,9 @@ def test_root_count(root, testapp):
def test_root_hostname(testapp):
resp = testapp.get("/", headers={"Host": "systemexit.de"})
resp.mustcontain("easy_install --index-url http://systemexit.de/simple/ PACKAGE")
resp.mustcontain(
"easy_install --index-url http://systemexit.de/simple/ PACKAGE"
)
# go("http://systemexit.de/")
@ -307,18 +309,24 @@ def test_simple_index_case(root, testapp):
def test_nonroot_root(testpriv):
resp = testpriv.get("/priv/", headers={"Host": "nonroot"})
resp.mustcontain("easy_install --index-url http://nonroot/priv/simple/ PACKAGE")
resp.mustcontain(
"easy_install --index-url http://nonroot/priv/simple/ PACKAGE"
)
def test_nonroot_root_with_x_forwarded_host(testapp):
resp = testapp.get("/", headers={"X-Forwarded-Host": "forward.ed/priv/"})
resp.mustcontain("easy_install --index-url http://forward.ed/priv/simple/ PACKAGE")
resp.mustcontain(
"easy_install --index-url http://forward.ed/priv/simple/ PACKAGE"
)
resp.mustcontain("""<a href="/priv/packages/">here</a>""")
def test_nonroot_root_with_x_forwarded_host_without_trailing_slash(testapp):
resp = testapp.get("/", headers={"X-Forwarded-Host": "forward.ed/priv"})
resp.mustcontain("easy_install --index-url http://forward.ed/priv/simple/ PACKAGE")
resp.mustcontain(
"easy_install --index-url http://forward.ed/priv/simple/ PACKAGE"
)
resp.mustcontain("""<a href="/priv/packages/">here</a>""")

View File

@ -38,42 +38,69 @@ files = [
("package-20000101.zip", "package", "20000101"),
("flup-123-1.0.3.dev-20110405.tar.gz", "flup-123", "1.0.3.dev-20110405"),
("package-123-1.0.0-alpha.1.zip", "package-123", "1.0.0-alpha.1"),
("package-123-1.3.7+build.11.e0f985a.zip", "package-123", "1.3.7+build.11.e0f985a"),
(
"package-123-1.3.7+build.11.e0f985a.zip",
"package-123",
"1.3.7+build.11.e0f985a",
),
("package-123-v1.1_3-8.1.zip", "package-123-v1.1_3", "8.1"),
("package-123-2013.02.17.dev123.zip", "package-123", "2013.02.17.dev123"),
("package-123-20000101.zip", "package-123", "20000101"),
("pyelasticsearch-0.5-brainbot-1-20130712.zip", "pyelasticsearch", "0.5-brainbot-1-20130712"),
(
"pyelasticsearch-0.5-brainbot-1-20130712.zip",
"pyelasticsearch",
"0.5-brainbot-1-20130712",
),
("pywin32-217-cp27-none-win32.whl", "pywin32", "217"),
("pywin32-217-55-cp27-none-win32.whl", "pywin32", "217-55"),
("pywin32-217.1-cp27-none-win32.whl", "pywin32", "217.1"),
("package.zip", "package", ""),
("package-name-0.0.1.dev0.linux-x86_64.tar.gz", "package-name", "0.0.1.dev0"),
("package-name-0.0.1.dev0.macosx-10.10-intel.tar.gz", "package-name", "0.0.1.dev0"),
("package-name-0.0.1.alpha.1.win-amd64-py3.2.exe", "package-name", "0.0.1.alpha.1"),
("pkg-3!1.0-0.1.tgz", 'pkg', '3!1.0-0.1'), # TO BE FIXED
("pkg-3!1+.0-0.1.tgz", 'pkg', '3!1+.0-0.1'), # TO BE FIXED
("pkg.zip", 'pkg', ''),
("foo/pkg.zip", 'pkg', ''),
("foo/pkg-1b.zip", 'pkg', '1b'),
("package-name-0.0.1.alpha.1.win-amd64-py3.2.exe", "package-name", "0.0.1.alpha.1"),
(
"package-name-0.0.1.dev0.linux-x86_64.tar.gz",
"package-name",
"0.0.1.dev0",
),
(
"package-name-0.0.1.dev0.macosx-10.10-intel.tar.gz",
"package-name",
"0.0.1.dev0",
),
(
"package-name-0.0.1.alpha.1.win-amd64-py3.2.exe",
"package-name",
"0.0.1.alpha.1",
),
("pkg-3!1.0-0.1.tgz", "pkg", "3!1.0-0.1"), # TO BE FIXED
("pkg-3!1+.0-0.1.tgz", "pkg", "3!1+.0-0.1"), # TO BE FIXED
("pkg.zip", "pkg", ""),
("foo/pkg.zip", "pkg", ""),
("foo/pkg-1b.zip", "pkg", "1b"),
(
"package-name-0.0.1.alpha.1.win-amd64-py3.2.exe",
"package-name",
"0.0.1.alpha.1",
),
]
def _capitalize_ext(fpath):
f, e = os.path.splitext(fpath)
if e != '.whl':
if e != ".whl":
e = e.upper()
return f + e
@pytest.mark.parametrize(("filename", "pkgname", "version"), files)
def test_guess_pkgname_and_version(filename, pkgname, version):
exp = (pkgname, version)
assert core.guess_pkgname_and_version(filename) == exp
assert core.guess_pkgname_and_version(_capitalize_ext(filename)) == exp
@pytest.mark.parametrize(("filename", "pkgname", "version"), files)
def test_guess_pkgname_and_version_asc(filename, pkgname, version):
exp = (pkgname, version)
filename = '%s.asc' % filename
filename = "%s.asc" % filename
assert core.guess_pkgname_and_version(filename) == exp
@ -84,27 +111,35 @@ def test_listdir_bad_name(tmpdir):
def test_read_lines(tmpdir):
filename = 'pkg_blacklist'
filename = "pkg_blacklist"
file_contents = (
'# Names of private packages that we don\'t want to upgrade\n'
'\n'
'my_private_pkg \n'
' \t# This is a comment with starting space and tab\n'
' my_other_private_pkg'
"# Names of private packages that we don't want to upgrade\n"
"\n"
"my_private_pkg \n"
" \t# This is a comment with starting space and tab\n"
" my_other_private_pkg"
)
f = tmpdir.join(filename).ensure()
f.write(file_contents)
assert core.read_lines(f.strpath) == ['my_private_pkg', 'my_other_private_pkg']
assert core.read_lines(f.strpath) == [
"my_private_pkg",
"my_other_private_pkg",
]
hashes = (
# empty-sha256
('sha256', 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'),
(
"sha256",
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
),
# empty-md5
('md5', 'd41d8cd98f00b204e9800998ecf8427e'),
("md5", "d41d8cd98f00b204e9800998ecf8427e"),
)
@pytest.mark.parametrize(("algo", "digest"), hashes)
def test_hashfile(tmpdir, algo, digest):
f = tmpdir.join("empty")
@ -117,9 +152,7 @@ def test_fname_and_hash(tmpdir, hash_algo):
"""Ensure we are returning the expected hashes for files."""
f = tmpdir.join("tmpfile")
f.ensure()
pkgfile = core.PkgFile(
"tmp", "1.0.0", f.strpath, f.dirname, f.basename
)
pkgfile = core.PkgFile("tmp", "1.0.0", f.strpath, f.dirname, f.basename)
assert pkgfile.fname_and_hash(hash_algo) == "{}#{}={}".format(
f.basename, hash_algo, str(f.computehash(hashtype=hash_algo))
)
@ -127,16 +160,14 @@ def test_fname_and_hash(tmpdir, hash_algo):
def test_redirect_prefix_encodes_newlines():
"""Ensure raw newlines are url encoded in the generated redirect."""
request = Namespace(
custom_fullpath='/\nSet-Cookie:malicious=1;'
)
prefix = '\nSet-Cookie:malicious=1;'
request = Namespace(custom_fullpath="/\nSet-Cookie:malicious=1;")
prefix = "\nSet-Cookie:malicious=1;"
newpath = core.get_bad_url_redirect_path(request, prefix)
assert '\n' not in newpath
assert "\n" not in newpath
def test_normalize_pkgname_for_url_encodes_newlines():
"""Ensure newlines are url encoded in package names for urls."""
assert '\n' not in core.normalize_pkgname_for_url(
'/\nSet-Cookie:malicious=1;'
assert "\n" not in core.normalize_pkgname_for_url(
"/\nSet-Cookie:malicious=1;"
)

View File

@ -4,11 +4,13 @@ import pytest
import re
from pypiserver import version as my_ver
@pytest.fixture()
def readme():
return open('README.rst', 'rt').read()
return open("README.rst", "rt").read()
def test_READMEversion(readme):
m = re.compile(r'^\s*:Version:\s*(.+)\s*$', re.MULTILINE).search(readme)
m = re.compile(r"^\s*:Version:\s*(.+)\s*$", re.MULTILINE).search(readme)
assert m, "Could not find version on README!"
assert m.group(1) == my_ver, 'Updaed version(%s) on README!' % m.group(1)
assert m.group(1) == my_ver, "Updaed version(%s) on README!" % m.group(1)

View File

@ -2,8 +2,12 @@
Test module for . . .
"""
# Standard library imports
from __future__ import (absolute_import, division,
print_function, unicode_literals)
from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)
import logging
from os.path import abspath, dirname, join, realpath
from sys import path
@ -16,28 +20,37 @@ import pytest
logger = logging.getLogger(__name__)
test_dir = realpath(dirname(__file__))
src_dir = abspath(join(test_dir, '..'))
src_dir = abspath(join(test_dir, ".."))
path.append(src_dir)
print(path)
import pypiserver
@pytest.mark.parametrize('conf_options', [
{},
{'root': '~/stable_packages'},
{'root': '~/unstable_packages', 'authenticated': 'upload',
'passwords': '~/htpasswd'},
# Verify that the strip parser works properly.
{'authenticated': str('upload')},
])
@pytest.mark.parametrize(
"conf_options",
[
{},
{"root": "~/stable_packages"},
{
"root": "~/unstable_packages",
"authenticated": "upload",
"passwords": "~/htpasswd",
},
# Verify that the strip parser works properly.
{"authenticated": str("upload")},
],
)
def test_paste_app_factory(conf_options, monkeypatch):
"""Test the paste_app_factory method"""
monkeypatch.setattr('pypiserver.core.configure',
lambda **x: (x, [x.keys()]))
monkeypatch.setattr(
"pypiserver.core.configure", lambda **x: (x, [x.keys()])
)
pypiserver.paste_app_factory({}, **conf_options)
def test_app_factory(monkeypatch):
monkeypatch.setattr('pypiserver.core.configure',
lambda **x: (x, [x.keys()]))
monkeypatch.setattr(
"pypiserver.core.configure", lambda **x: (x, [x.keys()])
)
assert pypiserver.app() is not pypiserver.app()

View File

@ -2,6 +2,7 @@
import sys, os, pytest, logging
from pypiserver import __main__
try:
from unittest import mock
except ImportError:
@ -9,7 +10,6 @@ except ImportError:
class main_wrapper(object):
def __init__(self):
self.run_kwargs = None
self.pkgdir = None
@ -43,11 +43,13 @@ def main(request, monkeypatch):
def test_default_pkgdir(main):
main([])
assert os.path.normpath(main.pkgdir) == os.path.normpath(os.path.expanduser("~/packages"))
assert os.path.normpath(main.pkgdir) == os.path.normpath(
os.path.expanduser("~/packages")
)
def test_noargs(main):
assert main([]) == {'host': "0.0.0.0", 'port': 8080, 'server': "auto"}
assert main([]) == {"host": "0.0.0.0", "port": 8080, "server": "auto"}
def test_port(main):
@ -91,34 +93,38 @@ def test_fallback_url_default(main):
def test_hash_algo_default(main):
main([])
assert main.app.module.config.hash_algo == 'md5'
assert main.app.module.config.hash_algo == "md5"
def test_hash_algo(main):
main(['--hash-algo=sha256'])
assert main.app.module.config.hash_algo == 'sha256'
main(["--hash-algo=sha256"])
assert main.app.module.config.hash_algo == "sha256"
def test_hash_algo_off(main):
main(['--hash-algo=off'])
main(["--hash-algo=off"])
assert main.app.module.config.hash_algo is None
main(['--hash-algo=0'])
main(["--hash-algo=0"])
assert main.app.module.config.hash_algo is None
main(['--hash-algo=no'])
main(["--hash-algo=no"])
assert main.app.module.config.hash_algo is None
main(['--hash-algo=false'])
main(["--hash-algo=false"])
assert main.app.module.config.hash_algo is None
def test_hash_algo_BAD(main):
with pytest.raises(SystemExit) as excinfo:
main(['--hash-algo BAD'])
#assert excinfo.value.message == 'some info' main(['--hash-algo BAD'])
main(["--hash-algo BAD"])
# assert excinfo.value.message == 'some info' main(['--hash-algo BAD'])
print(excinfo)
def test_logging(main, tmpdir):
logfile = tmpdir.mkdir("logs").join('test.log')
logfile = tmpdir.mkdir("logs").join("test.log")
main(["-v", "--log-file", logfile.strpath])
assert logfile.check(), logfile
def test_logging_verbosity(main):
main([])
assert logging.getLogger().level == logging.WARN
@ -129,12 +135,14 @@ def test_logging_verbosity(main):
main(["-v", "-v", "-v"])
assert logging.getLogger().level == logging.NOTSET
@pytest.mark.parametrize(
"cli_arg, expected_stream",[
"cli_arg, expected_stream",
[
("stderr", sys.stderr),
("stdout", sys.stdout),
("none", None),
]
],
)
@mock.patch.object(__main__, "init_logging")
def test_log_to_stdout(init_logging, main, cli_arg, expected_stream):
@ -144,10 +152,11 @@ def test_log_to_stdout(init_logging, main, cli_arg, expected_stream):
@pytest.fixture
def dummy_logger():
logger = logging.getLogger('test')
logger = logging.getLogger("test")
yield logger
logger.handlers = []
def test_init_logging_with_stream(dummy_logger):
assert not dummy_logger.handlers
@ -155,44 +164,50 @@ def test_init_logging_with_stream(dummy_logger):
assert isinstance(dummy_logger.handlers[0], logging.StreamHandler)
assert dummy_logger.handlers[0].stream is sys.stdout
def test_init_logging_with_none_stream_doesnt_add_stream_handler(dummy_logger):
assert not dummy_logger.handlers
__main__.init_logging(stream=None, logger=dummy_logger)
assert not dummy_logger.handlers
def test_welcome_file(main):
sample_msg_file = os.path.join(os.path.dirname(__file__), "sample_msg.html")
main(["--welcome", sample_msg_file])
assert "Hello pypiserver tester!" in main.app.module.config.welcome_msg
def test_welcome_file_default(main):
main([])
assert "Welcome to pypiserver!" in main.app.module.config.welcome_msg
def test_password_without_auth_list(main, monkeypatch):
sysexit = mock.MagicMock(side_effect=ValueError('BINGO'))
monkeypatch.setattr('sys.exit', sysexit)
sysexit = mock.MagicMock(side_effect=ValueError("BINGO"))
monkeypatch.setattr("sys.exit", sysexit)
with pytest.raises(ValueError) as ex:
main(["-P", "pswd-file", "-a", ""])
assert ex.value.args[0] == 'BINGO'
assert ex.value.args[0] == "BINGO"
with pytest.raises(ValueError) as ex:
main(["-a", "."])
assert ex.value.args[0] == 'BINGO'
assert ex.value.args[0] == "BINGO"
with pytest.raises(ValueError) as ex:
main(["-a", ""])
assert ex.value.args[0] == 'BINGO'
assert ex.value.args[0] == "BINGO"
with pytest.raises(ValueError) as ex:
main(["-P", "."])
assert ex.value.args[0] == 'BINGO'
assert ex.value.args[0] == "BINGO"
def test_password_alone(main, monkeypatch):
monkeypatch.setitem(sys.modules, 'passlib', mock.MagicMock())
monkeypatch.setitem(sys.modules, 'passlib.apache', mock.MagicMock())
monkeypatch.setitem(sys.modules, "passlib", mock.MagicMock())
monkeypatch.setitem(sys.modules, "passlib.apache", mock.MagicMock())
main(["-P", "pswd-file"])
assert main.app.module.config.authenticated == ['update']
assert main.app.module.config.authenticated == ["update"]
def test_dot_password_without_auth_list(main, monkeypatch):
main(["-P", ".", "-a", ""])

View File

@ -36,103 +36,118 @@ def touch_files(root, files):
def pkgfile_from_path(fn):
pkgname, version = guess_pkgname_and_version(fn)
return PkgFile(pkgname=pkgname, version=version,
root=py.path.local(fn).parts()[1].strpath, # noqa pylint: disable=no-member
fn=fn)
return PkgFile(
pkgname=pkgname,
version=version,
root=py.path.local(fn)
.parts()[1]
.strpath, # noqa pylint: disable=no-member
fn=fn,
)
@pytest.mark.parametrize(
("version", "is_stable"),
[("1.0", True),
("0.0.0", True),
("1.1beta1", False),
("1.2.10-123", True),
("5.5.0-DEV", False),
("1.2-rc1", False),
("1.0b1", False)])
[
("1.0", True),
("0.0.0", True),
("1.1beta1", False),
("1.2.10-123", True),
("5.5.0-DEV", False),
("1.2-rc1", False),
("1.0b1", False),
],
)
def test_is_stable_version(version, is_stable):
parsed_version = parse_version(version)
assert is_stable_version(parsed_version) == is_stable
def test_build_releases():
p = pkgfile_from_path('/home/ralf/pypiserver/d/greenlet-0.2.zip')
p = pkgfile_from_path("/home/ralf/pypiserver/d/greenlet-0.2.zip")
expected = dict(parsed_version=('00000000', '00000003', '*final'),
pkgname='greenlet',
replaces=p,
version='0.3.0')
expected = dict(
parsed_version=("00000000", "00000003", "*final"),
pkgname="greenlet",
replaces=p,
version="0.3.0",
)
res, = list(build_releases(p, ["0.3.0"]))
(res,) = list(build_releases(p, ["0.3.0"]))
for k, v in expected.items():
assert getattr(res, k) == v
def test_filter_stable_releases():
p = pkgfile_from_path('/home/ralf/pypiserver/d/greenlet-0.2.zip')
p = pkgfile_from_path("/home/ralf/pypiserver/d/greenlet-0.2.zip")
assert list(filter_stable_releases([p])) == [p]
p2 = pkgfile_from_path('/home/ralf/pypiserver/d/greenlet-0.5rc1.zip')
p2 = pkgfile_from_path("/home/ralf/pypiserver/d/greenlet-0.5rc1.zip")
assert list(filter_stable_releases([p2])) == []
def test_filter_latest_pkgs():
paths = ["/home/ralf/greenlet-0.2.zip",
"/home/ralf/foo/baz-1.0.zip"
"/home/ralf/bar/greenlet-0.3.zip"]
paths = [
"/home/ralf/greenlet-0.2.zip",
"/home/ralf/foo/baz-1.0.zip" "/home/ralf/bar/greenlet-0.3.zip",
]
pkgs = [pkgfile_from_path(x) for x in paths]
assert frozenset(filter_latest_pkgs(pkgs)) == frozenset(pkgs[1:])
def test_filter_latest_pkgs_case_insensitive():
paths = ["/home/ralf/greenlet-0.2.zip",
"/home/ralf/foo/baz-1.0.zip"
"/home/ralf/bar/Greenlet-0.3.zip"]
paths = [
"/home/ralf/greenlet-0.2.zip",
"/home/ralf/foo/baz-1.0.zip" "/home/ralf/bar/Greenlet-0.3.zip",
]
pkgs = [pkgfile_from_path(x) for x in paths]
assert frozenset(filter_latest_pkgs(pkgs)) == frozenset(pkgs[1:])
@pytest.mark.parametrize('pip_ver, cmd_type', (
('10.0.0', 'd'),
('10.0.0rc10', 'd'),
('10.0.0b10', 'd'),
('10.0.0a3', 'd'),
('10.0.0.dev8', 'd'),
('10.0.0.dev8', 'd'),
('18.0', 'd'),
('9.9.8', 'i'),
('9.9.8rc10', 'i'),
('9.9.8b10', 'i'),
('9.9.8a10', 'i'),
('9.9.8.dev10', 'i'),
('9.9', 'i'),
))
@pytest.mark.parametrize(
"pip_ver, cmd_type",
(
("10.0.0", "d"),
("10.0.0rc10", "d"),
("10.0.0b10", "d"),
("10.0.0a3", "d"),
("10.0.0.dev8", "d"),
("10.0.0.dev8", "d"),
("18.0", "d"),
("9.9.8", "i"),
("9.9.8rc10", "i"),
("9.9.8b10", "i"),
("9.9.8a10", "i"),
("9.9.8.dev10", "i"),
("9.9", "i"),
),
)
def test_pip_cmd_root(pip_ver, cmd_type):
"""Verify correct determination of the command root by pip version."""
exp_cmd = (
'pip',
'-q',
'install' if cmd_type == 'i' else 'download',
"pip",
"-q",
"install" if cmd_type == "i" else "download",
)
assert tuple(PipCmd.update_root(pip_ver)) == exp_cmd
def test_pip_cmd_update():
"""Verify the correct determination of a pip command."""
index = 'https://pypi.org/simple'
destdir = 'foo/bar'
pkg_name = 'mypkg'
pkg_version = '12.0'
cmd_root = ('pip', '-q', 'download')
index = "https://pypi.org/simple"
destdir = "foo/bar"
pkg_name = "mypkg"
pkg_version = "12.0"
cmd_root = ("pip", "-q", "download")
exp_cmd = cmd_root + (
'--no-deps',
'-i',
"--no-deps",
"-i",
index,
'-d',
"-d",
destdir,
'{}=={}'.format(pkg_name, pkg_version)
"{}=={}".format(pkg_name, pkg_version),
)
assert exp_cmd == tuple(
PipCmd.update(cmd_root, destdir, pkg_name, pkg_version)
@ -141,16 +156,18 @@ def test_pip_cmd_update():
def test_pip_cmd_update_index_overridden():
"""Verify the correct determination of a pip command."""
index = 'https://pypi.org/complex'
destdir = 'foo/bar'
pkg_name = 'mypkg'
pkg_version = '12.0'
cmd_root = ('pip', '-q', 'download')
index = "https://pypi.org/complex"
destdir = "foo/bar"
pkg_name = "mypkg"
pkg_version = "12.0"
cmd_root = ("pip", "-q", "download")
exp_cmd = cmd_root + (
'--no-deps',
'-i', index,
'-d', destdir,
'{}=={}'.format(pkg_name, pkg_version)
"--no-deps",
"-i",
index,
"-d",
destdir,
"{}=={}".format(pkg_name, pkg_version),
)
assert exp_cmd == tuple(
PipCmd.update(cmd_root, destdir, pkg_name, pkg_version, index=index)
@ -159,52 +176,53 @@ def test_pip_cmd_update_index_overridden():
def test_update_package(monkeypatch):
"""Test generating an update command for a package."""
monkeypatch.setattr(manage, 'call', Mock())
pkg = PkgFile('mypkg', '1.0', replaces=PkgFile('mypkg', '0.9'))
update_package(pkg, '.')
manage.call.assert_called_once_with(( # pylint: disable=no-member
'pip',
'-q',
'download',
'--no-deps',
'-i', 'https://pypi.org/simple',
'-d', '.',
'mypkg==1.0',
))
monkeypatch.setattr(manage, "call", Mock())
pkg = PkgFile("mypkg", "1.0", replaces=PkgFile("mypkg", "0.9"))
update_package(pkg, ".")
manage.call.assert_called_once_with(
( # pylint: disable=no-member
"pip",
"-q",
"download",
"--no-deps",
"-i",
"https://pypi.org/simple",
"-d",
".",
"mypkg==1.0",
)
)
def test_update_package_dry_run(monkeypatch):
"""Test generating an update command for a package."""
monkeypatch.setattr(manage, 'call', Mock())
pkg = PkgFile('mypkg', '1.0', replaces=PkgFile('mypkg', '0.9'))
update_package(pkg, '.', dry_run=True)
monkeypatch.setattr(manage, "call", Mock())
pkg = PkgFile("mypkg", "1.0", replaces=PkgFile("mypkg", "0.9"))
update_package(pkg, ".", dry_run=True)
assert not manage.call.mock_calls # pylint: disable=no-member
def test_update_all_packages(monkeypatch):
"""Test calling update_all_packages()"""
public_pkg_1 = PkgFile('Flask', '1.0')
public_pkg_2 = PkgFile('requests', '1.0')
private_pkg_1 = PkgFile('my_private_pkg', '1.0')
private_pkg_2 = PkgFile('my_other_private_pkg', '1.0')
public_pkg_1 = PkgFile("Flask", "1.0")
public_pkg_2 = PkgFile("requests", "1.0")
private_pkg_1 = PkgFile("my_private_pkg", "1.0")
private_pkg_2 = PkgFile("my_other_private_pkg", "1.0")
roots_mock = {
'/opt/pypi': [
"/opt/pypi": [
public_pkg_1,
private_pkg_1,
],
'/data/pypi': [
public_pkg_2,
private_pkg_2
],
"/data/pypi": [public_pkg_2, private_pkg_2],
}
def core_listdir_mock(directory):
return roots_mock.get(directory, [])
monkeypatch.setattr(manage.core, 'listdir', core_listdir_mock)
monkeypatch.setattr(manage.core, 'read_lines', Mock(return_value=[]))
monkeypatch.setattr(manage, 'update', Mock(return_value=None))
monkeypatch.setattr(manage.core, "listdir", core_listdir_mock)
monkeypatch.setattr(manage.core, "read_lines", Mock(return_value=[]))
monkeypatch.setattr(manage, "update", Mock(return_value=None))
destdir = None
dry_run = False
@ -219,44 +237,45 @@ def test_update_all_packages(monkeypatch):
blacklist_file=blacklist_file,
)
manage.core.read_lines.assert_not_called() # pylint: disable=no-member
manage.update.assert_called_once_with( # pylint: disable=no-member
manage.core.read_lines.assert_not_called() # pylint: disable=no-member
manage.update.assert_called_once_with( # pylint: disable=no-member
frozenset([public_pkg_1, public_pkg_2, private_pkg_1, private_pkg_2]),
destdir,
dry_run,
stable_only
stable_only,
)
def test_update_all_packages_with_blacklist(monkeypatch):
"""Test calling update_all_packages()"""
public_pkg_1 = PkgFile('Flask', '1.0')
public_pkg_2 = PkgFile('requests', '1.0')
private_pkg_1 = PkgFile('my_private_pkg', '1.0')
private_pkg_2 = PkgFile('my_other_private_pkg', '1.0')
public_pkg_1 = PkgFile("Flask", "1.0")
public_pkg_2 = PkgFile("requests", "1.0")
private_pkg_1 = PkgFile("my_private_pkg", "1.0")
private_pkg_2 = PkgFile("my_other_private_pkg", "1.0")
roots_mock = {
'/opt/pypi': [
"/opt/pypi": [
public_pkg_1,
private_pkg_1,
],
'/data/pypi': [
public_pkg_2,
private_pkg_2
],
"/data/pypi": [public_pkg_2, private_pkg_2],
}
def core_listdir_mock(directory):
return roots_mock.get(directory, [])
monkeypatch.setattr(manage.core, 'listdir', core_listdir_mock)
monkeypatch.setattr(manage.core, 'read_lines', Mock(return_value=['my_private_pkg', 'my_other_private_pkg']))
monkeypatch.setattr(manage, 'update', Mock(return_value=None))
monkeypatch.setattr(manage.core, "listdir", core_listdir_mock)
monkeypatch.setattr(
manage.core,
"read_lines",
Mock(return_value=["my_private_pkg", "my_other_private_pkg"]),
)
monkeypatch.setattr(manage, "update", Mock(return_value=None))
destdir = None
dry_run = False
stable_only = True
blacklist_file = '/root/pkg_blacklist'
blacklist_file = "/root/pkg_blacklist"
update_all_packages(
roots=list(roots_mock.keys()),
@ -266,10 +285,9 @@ def test_update_all_packages_with_blacklist(monkeypatch):
blacklist_file=blacklist_file,
)
manage.update.assert_called_once_with( # pylint: disable=no-member
frozenset([public_pkg_1, public_pkg_2]),
destdir,
dry_run,
stable_only
manage.update.assert_called_once_with( # pylint: disable=no-member
frozenset([public_pkg_1, public_pkg_2]), destdir, dry_run, stable_only
)
manage.core.read_lines.assert_called_once_with(blacklist_file) # pylint: disable=no-member
manage.core.read_lines.assert_called_once_with(
blacklist_file
) # pylint: disable=no-member

View File

@ -24,6 +24,7 @@ import time
from shlex import split
from subprocess import Popen
from textwrap import dedent
try:
from urllib.request import urlopen
except ImportError:
@ -38,7 +39,7 @@ import pytest
# ######################################################################
_BUFF_SIZE = 2**16
_BUFF_SIZE = 2 ** 16
_port = 8090
SLEEP_AFTER_SRV = 3 # sec
@ -50,20 +51,21 @@ def port():
return _port
Srv = namedtuple('Srv', ('proc', 'port', 'package'))
Srv = namedtuple("Srv", ("proc", "port", "package"))
def _run_server(packdir, port, authed, other_cli=''):
def _run_server(packdir, port, authed, other_cli=""):
"""Run a server, optionally with partial auth enabled."""
pswd_opt_choices = {
True: "-Ptests/htpasswd.a.a -a update,download",
False: "-P. -a.",
'partial': "-Ptests/htpasswd.a.a -a update",
"partial": "-Ptests/htpasswd.a.a -a update",
}
pswd_opts = pswd_opt_choices[authed]
cmd = (
"%s -m pypiserver.__main__ -vvv --overwrite -i 127.0.0.1 "
"-p %s %s %s %s" % (
"-p %s %s %s %s"
% (
sys.executable,
port,
pswd_opts,
@ -79,7 +81,7 @@ def _run_server(packdir, port, authed, other_cli=''):
def _kill_server(srv):
print('Killing %s' % (srv,))
print("Killing %s" % (srv,))
try:
srv.proc.terminate()
time.sleep(1)
@ -88,9 +90,8 @@ def _kill_server(srv):
@contextlib.contextmanager
def new_server(packdir, port, authed=False, other_cli=''):
srv = _run_server(packdir, port,
authed=authed, other_cli=other_cli)
def new_server(packdir, port, authed=False, other_cli=""):
srv = _run_server(packdir, port, authed=authed, other_cli=other_cli)
try:
yield srv
finally:
@ -108,33 +109,34 @@ def chdir(d):
def _run_python(cmd):
ncmd = '%s %s' % (sys.executable, cmd)
ncmd = "%s %s" % (sys.executable, cmd)
return os.system(ncmd)
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def project(request):
def fin():
tmpdir.remove(True)
tmpdir = path.local(tempfile.mkdtemp())
request.addfinalizer(fin)
src_setup_py = path.local().join('tests', 'centodeps-setup.py')
src_setup_py = path.local().join("tests", "centodeps-setup.py")
assert src_setup_py.check()
projdir = tmpdir.join('centodeps')
projdir = tmpdir.join("centodeps")
projdir.mkdir()
dst_setup_py = projdir.join('setup.py')
dst_setup_py = projdir.join("setup.py")
src_setup_py.copy(dst_setup_py)
assert dst_setup_py.check()
return projdir
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def package(project, request):
with chdir(project.strpath):
cmd = 'setup.py bdist_wheel'
cmd = "setup.py bdist_wheel"
assert _run_python(cmd) == 0
pkgs = list(project.join('dist').visit('centodeps*.whl'))
pkgs = list(project.join("dist").visit("centodeps*.whl"))
assert len(pkgs) == 1
pkg = path.local(pkgs[0])
assert pkg.check()
@ -142,7 +144,7 @@ def package(project, request):
return pkg
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def packdir(package):
return package.dirpath()
@ -150,7 +152,7 @@ def packdir(package):
open_port = 8081
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def open_server(packdir, request):
srv = _run_server(packdir, open_port, authed=False)
fin = functools.partial(_kill_server, srv)
@ -162,7 +164,7 @@ def open_server(packdir, request):
protected_port = 8082
@pytest.fixture(scope='module')
@pytest.fixture(scope="module")
def protected_server(packdir, request):
srv = _run_server(packdir, protected_port, authed=True)
fin = functools.partial(_kill_server, srv)
@ -176,9 +178,9 @@ def empty_packdir(tmpdir):
return tmpdir.mkdir("dists")
def _build_url(port, user='', pswd=''):
auth = '%s:%s@' % (user, pswd) if user or pswd else ''
return 'http://%slocalhost:%s' % (auth, port)
def _build_url(port, user="", pswd=""):
auth = "%s:%s@" % (user, pswd) if user or pswd else ""
return "http://%slocalhost:%s" % (auth, port)
def _run_pip(cmd):
@ -186,7 +188,7 @@ def _run_pip(cmd):
"pip --no-cache-dir --disable-pip-version-check "
"--retries 0 --timeout 5 --no-input %s"
) % cmd
print('PIP: %s' % ncmd)
print("PIP: %s" % ncmd)
proc = Popen(split(ncmd))
proc.communicate()
return proc.returncode
@ -195,7 +197,7 @@ def _run_pip(cmd):
def _run_pip_install(cmd, port, install_dir, user=None, pswd=None):
url = _build_url(port, user, pswd)
# ncmd = '-vv install --download %s -i %s %s' % (install_dir, url, cmd)
ncmd = '-vv download -d %s -i %s %s' % (install_dir, url, cmd)
ncmd = "-vv download -d %s -i %s %s" % (install_dir, url, cmd)
return _run_pip(ncmd)
@ -209,17 +211,18 @@ def pypirc_tmpfile(port, user, password):
"""Create a temporary pypirc file."""
fd, filepath = tempfile.mkstemp()
os.close(fd)
with open(filepath, 'w') as rcfile:
with open(filepath, "w") as rcfile:
rcfile.writelines(
'\n'.join((
'[distutils]',
'index-servers: test',
''
'[test]',
'repository: {}'.format(_build_url(port)),
'username: {}'.format(user),
'password: {}'.format(password),
))
"\n".join(
(
"[distutils]",
"index-servers: test",
"" "[test]",
"repository: {}".format(_build_url(port)),
"username: {}".format(user),
"password: {}".format(password),
)
)
)
with open(filepath) as rcfile:
print(rcfile.read())
@ -229,7 +232,7 @@ def pypirc_tmpfile(port, user, password):
@contextlib.contextmanager
def pypirc_file(txt):
pypirc_path = path.local('~/.pypirc', expanduser=1)
pypirc_path = path.local("~/.pypirc", expanduser=1)
old_pypirc = pypirc_path.read() if pypirc_path.check() else None
pypirc_path.write(txt)
try:
@ -241,34 +244,44 @@ def pypirc_file(txt):
pypirc_path.remove()
def twine_upload(packages, repository='test', conf='pypirc',
expect_failure=False):
def twine_upload(
packages, repository="test", conf="pypirc", expect_failure=False
):
"""Call 'twine upload' with appropriate arguments"""
proc = Popen((
'twine',
'upload',
'--repository', repository,
'--config-file', conf,
' '.join(packages),
))
proc = Popen(
(
"twine",
"upload",
"--repository",
repository,
"--config-file",
conf,
" ".join(packages),
)
)
proc.communicate()
if not expect_failure and proc.returncode:
assert False, 'Twine upload failed. See stdout/err'
assert False, "Twine upload failed. See stdout/err"
def twine_register(packages, repository='test', conf='pypirc',
expect_failure=False):
def twine_register(
packages, repository="test", conf="pypirc", expect_failure=False
):
"""Call 'twine register' with appropriate args"""
proc = Popen((
'twine',
'register',
'--repository', repository,
'--config-file', conf,
' '.join(packages)
))
proc = Popen(
(
"twine",
"register",
"--repository",
repository,
"--config-file",
conf,
" ".join(packages),
)
)
proc.communicate()
if not expect_failure and proc.returncode:
assert False, 'Twine register failed. See stdout/err'
assert False, "Twine register failed. See stdout/err"
# ######################################################################
@ -297,16 +310,19 @@ def test_pipInstall_authedFails(protected_server, pipdir):
def test_pipInstall_authedOk(protected_server, package, pipdir):
cmd = "centodeps"
assert _run_pip_install(cmd, protected_server.port, pipdir,
user='a', pswd='a') == 0
assert (
_run_pip_install(cmd, protected_server.port, pipdir, user="a", pswd="a")
== 0
)
assert pipdir.join(package.basename).check()
@pytest.mark.parametrize("pkg_frmt", ['bdist', 'bdist_wheel'])
def test_setuptoolsUpload_open(empty_packdir, port, project, package,
pkg_frmt):
@pytest.mark.parametrize("pkg_frmt", ["bdist", "bdist_wheel"])
def test_setuptoolsUpload_open(empty_packdir, port, project, package, pkg_frmt):
url = _build_url(port, None, None)
with pypirc_file(dedent("""\
with pypirc_file(
dedent(
"""\
[distutils]
index-servers: test
@ -314,22 +330,28 @@ def test_setuptoolsUpload_open(empty_packdir, port, project, package,
repository: %s
username: ''
password: ''
""" % url)):
"""
% url
)
):
with new_server(empty_packdir, port):
with chdir(project.strpath):
cmd = "setup.py -vvv %s upload -r %s" % (pkg_frmt, url)
for i in range(5):
print('++Attempt #%s' % i)
print("++Attempt #%s" % i)
assert _run_python(cmd) == 0
time.sleep(SLEEP_AFTER_SRV)
assert len(empty_packdir.listdir()) == 1
@pytest.mark.parametrize("pkg_frmt", ['bdist', 'bdist_wheel'])
def test_setuptoolsUpload_authed(empty_packdir, port, project, package,
pkg_frmt, monkeypatch):
@pytest.mark.parametrize("pkg_frmt", ["bdist", "bdist_wheel"])
def test_setuptoolsUpload_authed(
empty_packdir, port, project, package, pkg_frmt, monkeypatch
):
url = _build_url(port)
with pypirc_file(dedent("""\
with pypirc_file(
dedent(
"""\
[distutils]
index-servers: test
@ -337,7 +359,10 @@ def test_setuptoolsUpload_authed(empty_packdir, port, project, package,
repository: %s
username: a
password: a
""" % url)):
"""
% url
)
):
with new_server(empty_packdir, port, authed=True):
with chdir(project.strpath):
cmd = (
@ -345,18 +370,21 @@ def test_setuptoolsUpload_authed(empty_packdir, port, project, package,
"test upload -r test" % pkg_frmt
)
for i in range(5):
print('++Attempt #%s' % i)
print("++Attempt #%s" % i)
assert _run_python(cmd) == 0
time.sleep(SLEEP_AFTER_SRV)
assert len(empty_packdir.listdir()) == 1
@pytest.mark.parametrize("pkg_frmt", ['bdist', 'bdist_wheel'])
def test_setuptools_upload_partial_authed(empty_packdir, port, project,
pkg_frmt):
@pytest.mark.parametrize("pkg_frmt", ["bdist", "bdist_wheel"])
def test_setuptools_upload_partial_authed(
empty_packdir, port, project, pkg_frmt
):
"""Test uploading a package with setuptools with partial auth."""
url = _build_url(port)
with pypirc_file(dedent("""\
with pypirc_file(
dedent(
"""\
[distutils]
index-servers: test
@ -364,13 +392,18 @@ def test_setuptools_upload_partial_authed(empty_packdir, port, project,
repository: %s
username: a
password: a
""" % url)):
with new_server(empty_packdir, port, authed='partial'):
"""
% url
)
):
with new_server(empty_packdir, port, authed="partial"):
with chdir(project.strpath):
cmd = ("setup.py -vvv %s register -r test upload -r test" %
pkg_frmt)
cmd = (
"setup.py -vvv %s register -r test upload -r test"
% pkg_frmt
)
for i in range(5):
print('++Attempt #%s' % i)
print("++Attempt #%s" % i)
assert _run_python(cmd) == 0
time.sleep(SLEEP_AFTER_SRV)
assert len(empty_packdir.listdir()) == 1
@ -378,18 +411,18 @@ def test_setuptools_upload_partial_authed(empty_packdir, port, project,
def test_partial_authed_open_download(empty_packdir, port):
"""Validate that partial auth still allows downloads."""
url = _build_url(port) + '/simple'
with new_server(empty_packdir, port, authed='partial'):
url = _build_url(port) + "/simple"
with new_server(empty_packdir, port, authed="partial"):
resp = urlopen(url)
assert resp.getcode() == 200
def test_twine_upload_open(empty_packdir, port, package):
"""Test twine upload with no authentication"""
user, pswd = 'foo', 'bar'
user, pswd = "foo", "bar"
with new_server(empty_packdir, port):
with pypirc_tmpfile(port, user, pswd) as rcfile:
twine_upload([package.strpath], repository='test', conf=rcfile)
twine_upload([package.strpath], repository="test", conf=rcfile)
time.sleep(SLEEP_AFTER_SRV)
assert len(empty_packdir.listdir()) == 1
@ -398,12 +431,12 @@ def test_twine_upload_open(empty_packdir, port, package):
@pytest.mark.parametrize("hash_algo", ("md5", "sha256", "sha512"))
def test_hash_algos(empty_packdir, port, package, pipdir, hash_algo):
"""Test twine upload with no authentication"""
user, pswd = 'foo', 'bar'
user, pswd = "foo", "bar"
with new_server(
empty_packdir, port, other_cli="--hash-algo {}".format(hash_algo)
):
with pypirc_tmpfile(port, user, pswd) as rcfile:
twine_upload([package.strpath], repository='test', conf=rcfile)
twine_upload([package.strpath], repository="test", conf=rcfile)
time.sleep(SLEEP_AFTER_SRV)
assert _run_pip_install("centodeps", port, pipdir) == 0
@ -411,23 +444,25 @@ def test_hash_algos(empty_packdir, port, package, pipdir, hash_algo):
def test_twine_upload_authed(empty_packdir, port, package):
"""Test authenticated twine upload"""
user, pswd = 'a', 'a'
user, pswd = "a", "a"
with new_server(empty_packdir, port, authed=False):
with pypirc_tmpfile(port, user, pswd) as rcfile:
twine_upload([package.strpath], repository='test', conf=rcfile)
twine_upload([package.strpath], repository="test", conf=rcfile)
time.sleep(SLEEP_AFTER_SRV)
assert len(empty_packdir.listdir()) == 1
assert empty_packdir.join(
package.basename).check(), (package.basename, empty_packdir.listdir())
assert empty_packdir.join(package.basename).check(), (
package.basename,
empty_packdir.listdir(),
)
def test_twine_upload_partial_authed(empty_packdir, port, package):
"""Test partially authenticated twine upload"""
user, pswd = 'a', 'a'
with new_server(empty_packdir, port, authed='partial'):
user, pswd = "a", "a"
with new_server(empty_packdir, port, authed="partial"):
with pypirc_tmpfile(port, user, pswd) as rcfile:
twine_upload([package.strpath], repository='test', conf=rcfile)
twine_upload([package.strpath], repository="test", conf=rcfile)
time.sleep(SLEEP_AFTER_SRV)
assert len(empty_packdir.listdir()) == 1
@ -435,13 +470,13 @@ def test_twine_upload_partial_authed(empty_packdir, port, package):
def test_twine_register_open(open_server, package):
"""Test unauthenticated twine registration"""
srv = open_server
with pypirc_tmpfile(srv.port, 'foo', 'bar') as rcfile:
twine_register([package.strpath], repository='test', conf=rcfile)
with pypirc_tmpfile(srv.port, "foo", "bar") as rcfile:
twine_register([package.strpath], repository="test", conf=rcfile)
def test_twine_register_authed_ok(protected_server, package):
"""Test authenticated twine registration"""
srv = protected_server
user, pswd = 'a', 'a'
user, pswd = "a", "a"
with pypirc_tmpfile(srv.port, user, pswd) as rcfile:
twine_register([package.strpath], repository='test', conf=rcfile)
twine_register([package.strpath], repository="test", conf=rcfile)