Add a separate cache for file digests

- It's harder to make the listdir cache not recalculate, because when a file changes you need to only touch the file that changed. However, for file digests, the key is one-to-one, and very simple to invalidate.
- This cache is especially useful when you have a lot of really large packages
This commit is contained in:
Dustin Spicuzza 2016-03-04 15:09:16 -05:00
parent b42c8c3840
commit 37d21db134
2 changed files with 108 additions and 29 deletions

@ -1,49 +1,120 @@
#
# The cache implementation is only used when the watchdog package
# is installed
#
# Dumb cache implementation -- requires watchdog to be installed
# Basically -- cache the results of listdir in memory until something
# gets modified, then invalidate the whole thing
from os.path import dirname
from watchdog.observers import Observer
import threading
class ListdirCache(object):
class CacheManager(object):
'''
A naive cache implementation for listdir and digest_file
The listdir_cache is just a giant list of PkgFile objects, and
for simplicity it is invalidated anytime a modification occurs
within the directory it represents. If we were smarter about
the way that the listdir data structure were created/stored,
then we could do more granular invalidation. In practice, this
is good enough for now.
The digest_cache exists on a per-file basis, because computing
hashes on large files can get expensive, and it's very easy to
invalidate specific filenames.
'''
def __init__(self):
self.cache = {}
# Cache for listdir output
self.listdir_cache = {}
# Cache for hashes: two-level dictionary
# -> key: hash_algo, value: dict
# -> key: file path, value: hash
# We assume that the hash_algo value will never be erased
self.digest_cache = {}
self.observer = Observer()
self.observer.start()
# Directories being watched
self.watched = set()
self.lock = threading.Lock()
def get(self, root, fn):
with self.lock:
self.watch_lock = threading.Lock()
self.digest_lock = threading.Lock()
self.listdir_lock = threading.Lock()
def listdir(self, root, impl_fn):
with self.listdir_lock:
try:
return self.cache[root]
return self.listdir_cache[root]
except KeyError:
# check to see if we're watching
if root not in self.watched:
self._watch(root)
with self.watch_lock:
if root not in self.watched:
self._watch(root)
v = list(fn(root))
self.cache[root] = v
v = list(impl_fn(root))
self.listdir_cache[root] = v
return v
def digest_file(self, fpath, hash_algo, impl_fn):
with self.digest_lock:
try:
cache = self.digest_cache[hash_algo]
except KeyError:
cache = self.digest_cache.setdefault(hash_algo, {})
try:
return cache[fpath]
except KeyError:
root = dirname(fpath)
with self.watch_lock:
if root not in self.watched:
self._watch(root)
# TODO: move this outside of the lock... but there's not a good
# way to do this without a race condition if the file
# gets modified
v = impl_fn(fpath, hash_algo)
cache[fpath] = v
return v
def _watch(self, root):
self.watched.add(root)
self.observer.schedule(_EventHandler(self, root), root, recursive=True)
class _EventHandler(object):
def __init__(self, lcache, root):
self.lcache = lcache
def __init__(self, cache, root):
self.cache = cache
self.root = root
def dispatch(self, event):
'''Called by watchdog observer'''
with self.lcache.lock:
self.lcache.cache.pop(self.root, None)
cache = self.cache
listdir_cache = ListdirCache()
# Don't care about directory events
if event.is_directory:
return
# Lazy: just invalidate the whole cache
with cache.listdir_lock:
cache.listdir_cache.pop(self.root, None)
# Digests are more expensive: invalidate specific paths
paths = []
if event.event_type == 'moved':
paths.append(event.src_path)
paths.append(event.dest_path)
else:
paths.append(event.src_path)
with cache.digest_lock:
for _, subcache in cache.digest_cache.items():
for path in paths:
subcache.pop(path, None)
cache_manager = CacheManager()

@ -241,14 +241,6 @@ def _listdir(root):
fn=fn, root=root,
relfn=fn[len(root) + 1:])
try:
from .cache import listdir_cache
def listdir(root):
return listdir_cache.get(root, _listdir)
except ImportError:
listdir = _listdir
def find_packages(pkgs, prefix=""):
prefix = normalize_pkgname(prefix)
for x in pkgs:
@ -289,7 +281,7 @@ def store(root, filename, save_method):
save_method(dest_fn, overwrite=True) # Overwite check earlier.
def digest_file(fpath, hash_algo):
def _digest_file(fpath, hash_algo):
"""
Reads and digests a file according to specified hashing-algorith.
@ -304,3 +296,19 @@ def digest_file(fpath, hash_algo):
for block in iter(lambda: f.read(blocksize), b''):
digester.update(block)
return digester.hexdigest()[:32]
try:
from .cache import cache_manager
def listdir(root):
# root must be absolute path
return cache_manager.listdir(root, _listdir)
def digest_file(fpath, hash_algo):
# fpath must be absolute path
return cache_manager.digest_file(fpath, hash_algo, _digest_file)
except ImportError:
listdir = _listdir
digest_file = _digest_file