Cleanup code to python 3.6 (#342)

* Cleanup setup.py

* remove explicit inheritance from object

* convert most string interpolations to f-strings

Co-authored-by: Pelle Koster <pelle.koster@nginfra.nl>
This commit is contained in:
PelleK 2020-10-08 03:45:51 +02:00 committed by GitHub
parent b44edb61ce
commit d886bc2eba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 125 additions and 193 deletions

@ -11,7 +11,7 @@ __summary__ = "A minimal PyPI server for use with pip/easy_install."
__uri__ = "https://github.com/pypiserver/pypiserver"
class Configuration(object):
class Configuration:
"""
.. see:: config-options: :func:`pypiserver.configure()`
"""
@ -20,11 +20,11 @@ class Configuration(object):
vars(self).update(kwds)
def __repr__(self, *args, **kwargs):
return "Configuration(**%s)" % vars(self)
return f"Configuration(**{vars(self)})"
def __str__(self, *args, **kwargs):
return "Configuration:\n%s" % "\n".join(
"%20s = %s" % (k, v) for k, v in sorted(vars(self).items())
return "Configuration:\n" + "\n".join(
f"{k:>20} = {v}" for k, v in sorted(vars(self).items())
)
def update(self, props):

@ -216,7 +216,7 @@ def main(argv=None):
)
except getopt.GetoptError:
err = sys.exc_info()[1]
sys.exit("usage error: %s" % (err,))
sys.exit(f"usage error: {err}")
for k, v in opts:
if k in ("-p", "--port"):
@ -224,7 +224,7 @@ def main(argv=None):
c.port = int(v)
except Exception:
err = sys.exc_info()[1]
sys.exit("Invalid port(%r) due to: %s" % (v, err))
sys.exit(f"Invalid port({v!r}) due to: {err}")
elif k in ("-a", "--authenticate"):
c.authenticated = [
a.lower() for a in re.split("[, ]+", v.strip(" ,")) if a
@ -235,8 +235,11 @@ def main(argv=None):
actions = ("list", "download", "update")
for a in c.authenticated:
if a not in actions:
errmsg = "Action '%s' for option `%s` not one of %s!"
sys.exit(errmsg % (a, k, actions))
errmsg = (
f"Action '{a}' for option `{k}`"
f" not one of {actions}!"
)
sys.exit(errmsg)
elif k in ("-i", "--interface"):
c.host = v
elif k in ("-r", "--root"):
@ -250,7 +253,7 @@ def main(argv=None):
elif k == "--welcome":
c.welcome_file = v
elif k == "--version":
print("pypiserver %s\n" % pypiserver.__version__)
print(f"pypiserver {pypiserver.__version__}\n")
return
elif k == "-U":
command = "update"
@ -294,8 +297,10 @@ def main(argv=None):
or c.authenticated
and c.password_file == "."
):
auth_err = "When auth-ops-list is empty (-a=.), password-file (-P=%r) must also be empty ('.')!"
sys.exit(auth_err % c.password_file)
sys.exit(
"When auth-ops-list is empty (-a=.), password-file"
f" (-P={c.password_file!r}) must also be empty ('.')!"
)
if len(roots) == 0:
roots.append(os.path.expanduser("~/packages"))
@ -314,8 +319,8 @@ def main(argv=None):
valid_streams = {"none": None, "stderr": sys.stderr, "stdout": sys.stdout}
if c.log_stream not in valid_streams:
sys.exit(
"invalid log stream %s. choose one of %s"
% (c.log_stream, ", ".join(valid_streams.keys()))
f"Invalid log stream {c.log_stream}."
f" Choose one of {', '.join(valid_streams.keys())}"
)
init_logging(
@ -349,8 +354,8 @@ def main(argv=None):
if c.server not in bottle.server_names:
sys.exit(
"unknown server %r. choose one of %s"
% (c.server, ", ".join(bottle.server_names.keys()))
f"Unknown server {c.server}."
f" Choose one of {', '.join(bottle.server_names.keys())}"
)
bottle.debug(c.verbosity > 1)

@ -41,7 +41,7 @@ config = None
app = Bottle()
class auth(object):
class auth:
"""decorator to apply authentication if specified for the decorated method & action"""
def __init__(self, action):
@ -145,8 +145,8 @@ def remove_pkg():
name = request.forms.get("name")
version = request.forms.get("version")
if not name or not version:
msg = "Missing 'name'/'version' fields: name=%s, version=%s"
raise HTTPError(400, msg % (name, version))
msg = f"Missing 'name'/'version' fields: name={name}, version={version}"
raise HTTPError(400, msg)
pkgs = list(
filter(
lambda pkg: pkg.pkgname == name and pkg.version == version,
@ -154,7 +154,7 @@ def remove_pkg():
)
)
if len(pkgs) == 0:
raise HTTPError(404, "%s (%s) not found" % (name, version))
raise HTTPError(404, f"{name} ({version}) not found")
for pkg in pkgs:
os.unlink(pkg.fn)
@ -170,11 +170,11 @@ def file_upload():
raise HTTPError(400, "Missing 'content' file-field!")
if (
ufiles.sig
and "%s.asc" % ufiles.pkg.raw_filename != ufiles.sig.raw_filename
and f"{ufiles.pkg.raw_filename}.asc" != ufiles.sig.raw_filename
):
raise HTTPError(
400,
"Unrelated signature %r for package %r!" % (ufiles.sig, ufiles.pkg),
f"Unrelated signature {ufiles.sig!r} for package {ufiles.pkg!r}!",
)
for uf in ufiles:
@ -184,19 +184,17 @@ def file_upload():
not is_valid_pkg_filename(uf.raw_filename)
or core.guess_pkgname_and_version(uf.raw_filename) is None
):
raise HTTPError(400, "Bad filename: %s" % uf.raw_filename)
raise HTTPError(400, f"Bad filename: {uf.raw_filename}")
if not config.overwrite and core.exists(packages.root, uf.raw_filename):
log.warn(
"Cannot upload %r since it already exists! \n"
" You may start server with `--overwrite` option. ",
uf.raw_filename,
log.warning(
f"Cannot upload {uf.raw_filename!r} since it already exists! \n"
" You may start server with `--overwrite` option. "
)
raise HTTPError(
409,
"Package %r already exists!\n"
" You may start server with `--overwrite` option."
% uf.raw_filename,
f"Package {uf.raw_filename!r} already exists!\n"
" You may start server with `--overwrite` option.",
)
core.store(packages.root, uf.raw_filename, uf.save)
@ -204,7 +202,7 @@ def file_upload():
user = request.auth[0]
else:
user = "anon"
log.info("User %r stored %r.", user, uf.raw_filename)
log.info(f"User {user!r} stored {uf.raw_filename!r}.")
@app.post("/")
@ -216,7 +214,7 @@ def update():
raise HTTPError(400, "Missing ':action' field!")
if action in ("verify", "submit"):
log.warning("Ignored ':action': %s", action)
log.warning(f"Ignored ':action': {action}")
elif action == "doc_upload":
doc_upload()
elif action == "remove_pkg":
@ -224,7 +222,7 @@ def update():
elif action == "file_upload":
file_upload()
else:
raise HTTPError(400, "Unsupported ':action' field: %s" % action)
raise HTTPError(400, f"Unsupported ':action' field: {action}")
return ""
@ -247,7 +245,7 @@ def handle_rpc():
.childNodes[0]
.wholeText.strip()
)
log.info("Processing RPC2 request for '%s'", methodname)
log.info(f"Processing RPC2 request for '{methodname}'")
if methodname == "search":
value = (
parser.getElementsByTagName("string")[0]
@ -308,18 +306,14 @@ def simple(prefix=""):
)
if not files:
if config.redirect_to_fallback:
return redirect(
"%s/%s/" % (config.fallback_url.rstrip("/"), prefix)
)
return HTTPError(404, "Not Found (%s does not exist)\n\n" % normalized)
return redirect(f"{config.fallback_url.rstrip('/')}/{prefix}/")
return HTTPError(404, f"Not Found ({normalized} does not exist)\n\n")
fp = request.custom_fullpath
links = [
(
os.path.basename(f.relfn),
urljoin(
fp, "../../packages/%s" % f.fname_and_hash(config.hash_algo)
),
urljoin(fp, f"../../packages/{f.fname_and_hash(config.hash_algo)}"),
)
for f in files
]
@ -381,11 +375,11 @@ def server_static(filename):
)
if config.cache_control:
response.set_header(
"Cache-Control", "public, max-age=%s" % config.cache_control
"Cache-Control", f"public, max-age={config.cache_control}"
)
return response
return HTTPError(404, "Not Found (%s does not exist)\n\n" % filename)
return HTTPError(404, f"Not Found ({filename} does not exist)\n\n")
@app.route("/:prefix")

@ -9,7 +9,7 @@ from watchdog.observers import Observer
import threading
class CacheManager(object):
class CacheManager:
"""
A naive cache implementation for listdir and digest_file
@ -87,7 +87,7 @@ class CacheManager(object):
self.observer.schedule(_EventHandler(self, root), root, recursive=True)
class _EventHandler(object):
class _EventHandler:
def __init__(self, cache, root):
self.cache = cache
self.root = root

@ -18,7 +18,7 @@ except ImportError: # PY2
import pkg_resources
from . import Configuration
from pypiserver import Configuration
log = logging.getLogger(__name__)
@ -29,7 +29,7 @@ def configure(**kwds):
:return: a 2-tuple (Configure, package-list)
"""
c = Configuration(**kwds)
log.info("+++Pypiserver invoked with: %s", c)
log.info(f"+++Pypiserver invoked with: {c}")
if c.root is None:
c.root = os.path.expanduser("~/packages")
@ -40,8 +40,7 @@ def configure(**kwds):
os.listdir(r)
except OSError:
err = sys.exc_info()[1]
msg = "Error: while trying to list root(%s): %s"
sys.exit(msg % (r, err))
sys.exit(f"Error: while trying to list root({r}): {err}")
packages = lambda: itertools.chain(*[listdir(r) for r in roots])
packages.root = roots[0]
@ -71,7 +70,7 @@ def configure(**kwds):
c.welcome_msg = fd.read()
except Exception:
log.warning(
"Could not load welcome-file(%s)!", c.welcome_file, exc_info=1
f"Could not load welcome-file({c.welcome_file})!", exc_info=True
)
if c.fallback_url is None:
@ -84,9 +83,9 @@ def configure(**kwds):
halgos = ["md5", "sha1", "sha224", "sha256", "sha384", "sha512"]
if c.hash_algo not in halgos:
sys.exit("Hash-algorithm %s not one of: %s" % (c.hash_algo, halgos))
sys.exit(f"Hash-algorithm {c.hash_algo} not one of: {halgos}")
log.info("+++Pypiserver started with: %s", c)
log.info(f"+++Pypiserver started with: {c}")
return c, packages
@ -205,7 +204,7 @@ def is_allowed_path(path_part):
return not (p.startswith(".") or "/." in p)
class PkgFile(object):
class PkgFile:
__slots__ = [
"fn",
@ -234,11 +233,11 @@ class PkgFile(object):
self.replaces = replaces
def __repr__(self):
return "%s(%s)" % (
return "{}({})".format(
self.__class__.__name__,
", ".join(
[
"%s=%r" % (k, getattr(self, k, "AttributeError"))
f"{k}={getattr(self, k, 'AttributeError')!r}"
for k in sorted(self.__slots__)
]
),
@ -247,10 +246,9 @@ class PkgFile(object):
def fname_and_hash(self, hash_algo):
if not hasattr(self, "_fname_and_hash"):
if hash_algo:
self._fname_and_hash = "%s#%s=%s" % (
self.relfn_unix,
hash_algo,
digest_file(self.fn, hash_algo),
self._fname_and_hash = (
f"{self.relfn_unix}#{hash_algo}="
f"{digest_file(self.fn, hash_algo)}"
)
else:
self._fname_and_hash = self.relfn_unix
@ -296,8 +294,8 @@ def read_lines(filename):
]
except Exception:
log.error(
'Failed to read package blacklist file "%s". '
"Aborting server startup, please fix this." % filename
f'Failed to read package blacklist file "{filename}". '
"Aborting server startup, please fix this."
)
raise

@ -12,51 +12,11 @@ import pip
from . import core
if sys.version_info >= (3, 0):
from xmlrpc.client import Server
def make_pypi_client(url):
return Server(url)
from xmlrpc.client import Server
else:
from xmlrpclib import Transport # @UnresolvedImport
from xmlrpclib import ServerProxy
import httplib # @UnresolvedImport
import urllib
class ProxiedTransport(Transport):
def set_proxy(self, proxy):
self.proxy = proxy
def make_connection(self, host):
self.realhost = host
if sys.hexversion < 0x02070000:
_http_connection = httplib.HTTP
else:
_http_connection = httplib.HTTPConnection
return _http_connection(self.proxy)
def send_request(self, connection, handler, request_body):
connection.putrequest(
"POST", "http://%s%s" % (self.realhost, handler)
)
def send_host(self, connection, host):
connection.putheader("Host", self.realhost)
def make_pypi_client(url):
http_proxy_url = urllib.getproxies().get("http", "")
if http_proxy_url:
http_proxy_spec = urllib.splithost(
urllib.splittype(http_proxy_url)[1]
)[0]
transport = ProxiedTransport()
transport.set_proxy(http_proxy_spec)
else:
transport = None
return ServerProxy(url, transport=transport)
def make_pypi_client(url):
return Server(url)
def is_stable_version(pversion):
@ -109,7 +69,7 @@ def find_updates(pkgset, stable_only=True):
latest_pkgs = frozenset(filter_latest_pkgs(pkgset))
sys.stdout.write(
"checking %s packages for newer version\n" % len(latest_pkgs),
f"checking {len(latest_pkgs)} packages for newer version\n"
)
need_update = set()
@ -138,14 +98,13 @@ def find_updates(pkgset, stable_only=True):
if no_releases:
sys.stdout.write(
"no releases found on pypi for %s\n\n"
% (", ".join(sorted(no_releases)),)
f"no releases found on pypi for {', '.join(sorted(no_releases))}\n\n"
)
return need_update
class PipCmd(object):
class PipCmd:
"""Methods for generating pip commands."""
@staticmethod

@ -1,14 +1,9 @@
#! /usr/bin/env python
#! /usr/bin/env python3
import sys
from pathlib import Path
from setuptools import setup
if sys.version_info >= (3, 0):
exec("def do_exec(co, loc): exec(co, loc)\n")
else:
exec("def do_exec(co, loc): exec co in loc\n")
tests_require = [
"pytest>=2.3",
"tox",
@ -17,33 +12,31 @@ tests_require = [
"passlib>=1.6",
"webtest",
]
if sys.version_info == (2, 7):
tests_require.append("mock")
setup_requires = ["setuptools", "setuptools-git >= 0.3"]
if sys.version_info >= (3, 5):
setup_requires.append("wheel >= 0.25.0") # earlier wheels fail in 3.5
else:
setup_requires.append("wheel")
setup_requires = ["setuptools", "setuptools-git >= 0.3", "wheel >= 0.25.0"]
def read_file(rel_path: str):
return Path(__file__).parent.joinpath(rel_path).read_text()
def get_version():
d = {}
locals_ = {}
try:
do_exec(open("pypiserver/__init__.py").read(), d) # @UndefinedVariable
exec(read_file("pypiserver/__init__.py"), locals_)
except (ImportError, RuntimeError):
pass
return d["__version__"]
return locals_["__version__"]
setup(
name="pypiserver",
description="A minimal PyPI server for use with pip/easy_install.",
long_description=open("README.rst").read(),
long_description=read_file("README.rst"),
version=get_version(),
packages=["pypiserver"],
package_data={"pypiserver": ["welcome.html"]},
python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*",
python_requires=">=3.6",
setup_requires=setup_requires,
extras_require={"passlib": ["passlib>=1.6"], "cache": ["watchdog"]},
tests_require=tests_require,
@ -64,10 +57,7 @@ setup(
"Operating System :: Microsoft :: Windows",
"Operating System :: POSIX",
"Operating System :: OS Independent",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",

@ -1,7 +1,7 @@
"""Test doubles."""
class Namespace(object):
class Namespace:
"""Simple namespace."""
def __init__(self, **kwargs):

@ -428,7 +428,7 @@ def test_cache_control_set(root):
root.join("foo_bar-1.0.tar.gz").write("")
resp = app_with_cache.get("/packages/foo_bar-1.0.tar.gz")
assert "Cache-Control" in resp.headers
assert resp.headers["Cache-Control"] == "public, max-age=%s" % AGE
assert resp.headers["Cache-Control"] == f"public, max-age={AGE}"
def test_upload_noAction(root, testapp):
@ -467,14 +467,14 @@ def test_upload_with_signature(package, root, testapp):
params={":action": "file_upload"},
upload_files=[
("content", package, b""),
("gpg_signature", "%s.asc" % package, b""),
("gpg_signature", f"{package}.asc", b""),
],
)
assert resp.status_int == 200
uploaded_pkgs = [f.basename.lower() for f in root.listdir()]
assert len(uploaded_pkgs) == 2
assert package.lower() in uploaded_pkgs
assert "%s.asc" % package.lower() in uploaded_pkgs
assert f"{package.lower()}.asc" in uploaded_pkgs
@pytest.mark.parametrize(
@ -488,7 +488,7 @@ def test_upload_badFilename(package, root, testapp):
expect_errors=1,
)
assert resp.status == "400 Bad Request"
assert "Bad filename: %s" % package in resp.text
assert f"Bad filename: {package}" in resp.text
@pytest.mark.parametrize(
@ -628,13 +628,13 @@ class TestRemovePkg:
],
)
def test_remove_pkg_missingNaveVersion(self, name, version, root, testapp):
msg = "Missing 'name'/'version' fields: name=%s, version=%s"
msg = f"Missing 'name'/'version' fields: name={name}, version={version}"
params = {":action": "remove_pkg", "name": name, "version": version}
params = dict((k, v) for k, v in params.items() if v is not None)
resp = testapp.post("/", expect_errors=1, params=params)
assert resp.status == "400 Bad Request"
assert msg % (name, version) in unescape(resp.text)
assert msg in unescape(resp.text)
def test_remove_pkg_notFound(self, root, testapp):
resp = testapp.post(

@ -100,7 +100,7 @@ def test_guess_pkgname_and_version(filename, pkgname, version):
@pytest.mark.parametrize(("filename", "pkgname", "version"), files)
def test_guess_pkgname_and_version_asc(filename, pkgname, version):
exp = (pkgname, version)
filename = "%s.asc" % filename
filename = f"{filename}.asc"
assert core.guess_pkgname_and_version(filename) == exp

@ -1,4 +1,5 @@
#! /usr/bin/env py.test
from pathlib import Path
import pytest
import re
@ -7,10 +8,10 @@ from pypiserver import version as my_ver
@pytest.fixture()
def readme():
return open("README.rst", "rt").read()
return Path(__file__).parents[1].joinpath("README.rst").read_text()
def test_READMEversion(readme):
m = re.compile(r"^\s*:Version:\s*(.+)\s*$", re.MULTILINE).search(readme)
assert m, "Could not find version on README!"
assert m.group(1) == my_ver, "Updaed version(%s) on README!" % m.group(1)
assert m.group(1) == my_ver, f"Updated version({m.group(1)}) on README!"

@ -9,24 +9,24 @@ except ImportError:
import mock
class main_wrapper(object):
class main_wrapper:
def __init__(self):
self.run_kwargs = None
self.pkgdir = None
def __call__(self, argv):
sys.stdout.write("Running %s\n" % (argv,))
sys.stdout.write(f"Running {argv}\n")
__main__.main(["pypi-server"] + argv)
return self.run_kwargs
@pytest.fixture()
def main(request, monkeypatch):
def main(monkeypatch):
main = main_wrapper()
def run(**kwargs):
sys.stdout.write("RUN: %s\n" % kwargs)
sys.stdout.write(f"RUN: {kwargs}\n")
app = kwargs.pop("app")
main.app = app
main.run_kwargs = kwargs

@ -21,6 +21,7 @@ import subprocess
import sys
import tempfile
import time
from pathlib import Path
from shlex import split
from subprocess import Popen
from textwrap import dedent
@ -63,15 +64,8 @@ def _run_server(packdir, port, authed, other_cli=""):
}
pswd_opts = pswd_opt_choices[authed]
cmd = (
"%s -m pypiserver.__main__ -vvv --overwrite -i 127.0.0.1 "
"-p %s %s %s %s"
% (
sys.executable,
port,
pswd_opts,
other_cli,
packdir,
)
f"{sys.executable} -m pypiserver.__main__ -vvv --overwrite -i 127.0.0.1 "
f"-p {port} {pswd_opts} {other_cli} {packdir}"
)
proc = subprocess.Popen(cmd.split(), bufsize=_BUFF_SIZE)
time.sleep(SLEEP_AFTER_SRV)
@ -81,7 +75,7 @@ def _run_server(packdir, port, authed, other_cli=""):
def _kill_server(srv):
print("Killing %s" % (srv,))
print(f"Killing {srv}")
try:
srv.proc.terminate()
time.sleep(1)
@ -109,8 +103,7 @@ def chdir(d):
def _run_python(cmd):
ncmd = "%s %s" % (sys.executable, cmd)
return os.system(ncmd)
return os.system(f"{sys.executable} {cmd}")
@pytest.fixture(scope="module")
@ -179,16 +172,16 @@ def empty_packdir(tmpdir):
def _build_url(port, user="", pswd=""):
auth = "%s:%s@" % (user, pswd) if user or pswd else ""
return "http://%slocalhost:%s" % (auth, port)
auth = f"{user}:{pswd}@" if user or pswd else ""
return f"http://{auth}localhost:{port}"
def _run_pip(cmd):
ncmd = (
"pip --no-cache-dir --disable-pip-version-check "
"--retries 0 --timeout 5 --no-input %s"
) % cmd
print("PIP: %s" % ncmd)
f"--retries 0 --timeout 5 --no-input {cmd}"
)
print(f"PIP: {ncmd}")
proc = Popen(split(ncmd))
proc.communicate()
return proc.returncode
@ -196,9 +189,7 @@ def _run_pip(cmd):
def _run_pip_install(cmd, port, install_dir, user=None, pswd=None):
url = _build_url(port, user, pswd)
# ncmd = '-vv install --download %s -i %s %s' % (install_dir, url, cmd)
ncmd = "-vv download -d %s -i %s %s" % (install_dir, url, cmd)
return _run_pip(ncmd)
return _run_pip(f"-vv download -d {install_dir} -i {url} {cmd}")
@pytest.fixture
@ -211,21 +202,20 @@ def pypirc_tmpfile(port, user, password):
"""Create a temporary pypirc file."""
fd, filepath = tempfile.mkstemp()
os.close(fd)
with open(filepath, "w") as rcfile:
rcfile.writelines(
"\n".join(
(
"[distutils]",
"index-servers: test",
"" "[test]",
"repository: {}".format(_build_url(port)),
"username: {}".format(user),
"password: {}".format(password),
)
Path(filepath).write_text(
"\n".join(
(
"[distutils]",
"index-servers: test",
"" "[test]",
f"repository: {_build_url(port)}",
f"username: {user}",
f"password: {password}",
)
)
with open(filepath) as rcfile:
print(rcfile.read())
)
print(Path(filepath).read_text())
yield filepath
os.remove(filepath)
@ -322,23 +312,22 @@ def test_setuptoolsUpload_open(empty_packdir, port, project, package, pkg_frmt):
url = _build_url(port, None, None)
with pypirc_file(
dedent(
"""\
f"""\
[distutils]
index-servers: test
[test]
repository: %s
repository: {url}
username: ''
password: ''
"""
% url
)
):
with new_server(empty_packdir, port):
with chdir(project.strpath):
cmd = "setup.py -vvv %s upload -r %s" % (pkg_frmt, url)
cmd = f"setup.py -vvv {pkg_frmt} upload -r {url}"
for i in range(5):
print("++Attempt #%s" % i)
print(f"++Attempt #{i}")
assert _run_python(cmd) == 0
time.sleep(SLEEP_AFTER_SRV)
assert len(empty_packdir.listdir()) == 1
@ -351,26 +340,24 @@ def test_setuptoolsUpload_authed(
url = _build_url(port)
with pypirc_file(
dedent(
"""\
f"""\
[distutils]
index-servers: test
[test]
repository: %s
repository: {url}
username: a
password: a
"""
% url
)
):
with new_server(empty_packdir, port, authed=True):
with chdir(project.strpath):
cmd = (
"setup.py -vvv %s register -r "
"test upload -r test" % pkg_frmt
f"setup.py -vvv {pkg_frmt} register -r test upload -r test"
)
for i in range(5):
print("++Attempt #%s" % i)
print(f"++Attempt #{i}")
assert _run_python(cmd) == 0
time.sleep(SLEEP_AFTER_SRV)
assert len(empty_packdir.listdir()) == 1
@ -384,26 +371,24 @@ def test_setuptools_upload_partial_authed(
url = _build_url(port)
with pypirc_file(
dedent(
"""\
f"""\
[distutils]
index-servers: test
[test]
repository: %s
repository: {url}
username: a
password: a
"""
% url
)
):
with new_server(empty_packdir, port, authed="partial"):
with chdir(project.strpath):
cmd = (
"setup.py -vvv %s register -r test upload -r test"
% pkg_frmt
f"setup.py -vvv {pkg_frmt} register -r test upload -r test"
)
for i in range(5):
print("++Attempt #%s" % i)
print(f"++Attempt #{i}")
assert _run_python(cmd) == 0
time.sleep(SLEEP_AFTER_SRV)
assert len(empty_packdir.listdir()) == 1