diff --git a/src/fscacher/cache.py b/src/fscacher/cache.py index 7845c10..67e9020 100644 --- a/src/fscacher/cache.py +++ b/src/fscacher/cache.py @@ -1,5 +1,5 @@ from collections import deque, namedtuple -from functools import wraps +from functools import partial, wraps from inspect import Parameter, signature import logging import os @@ -73,7 +73,10 @@ def memoize(self, f): return f return self._memory.cache(f) - def memoize_path(self, f): + def memoize_path(self, f=None, *, attrname=None): + if f is None: + return partial(self.memoize_path, attrname=attrname) + # we need to actually decorate a function fingerprint_kwarg = "_cache_fingerprint" @@ -99,6 +102,8 @@ def fingerprinted(path, *args, **kwargs): def fingerprinter(path, *args, **kwargs): # we need to dereference symlinks and use that path in the function # call signature + if attrname is not None: + path = getattr(path, attrname) path_orig = path path = op.realpath(path) if path != path_orig: diff --git a/src/fscacher/tests/test_cache.py b/src/fscacher/tests/test_cache.py index b96b906..5330ca0 100644 --- a/src/fscacher/tests/test_cache.py +++ b/src/fscacher/tests/test_cache.py @@ -388,3 +388,94 @@ def git(*args): assert len(calls) == 1 assert memoread(tmp_path / "subdir" / "text.txt") == content assert len(calls) == 1 + + +def test_memoize_path_attrname(cache, tmp_path): + calls = [] + + class Foo: + def __init__(self, name, path): + self.name = name + self.path = path + self.extra = 0 + + @cache.memoize_path(attrname="path") + def memoread(self, arg, kwarg=None): + calls.append([self.path, arg, kwarg]) + with open(self.path) as f: + return f.read() + + def check_new_memoread(obj, arg, content, expect_new=False): + ncalls = len(calls) + assert obj.memoread(arg) == content + assert len(calls) == ncalls + 1 + assert obj.memoread(arg) == content + assert len(calls) == ncalls + 1 + int(expect_new) + + path = str(tmp_path / "file.dat") + foo = Foo("foo", path) + + with pytest.raises(IOError): + foo.memoread(0) + # and again + with pytest.raises(IOError): + foo.memoread(0) + assert len(calls) == 2 + + with open(path, "w") as f: + f.write("content") + + t0 = time.time() + try: + # unless this computer is too slow -- there should be less than + # cache._min_dtime between our creating the file and testing, + # so we would force a direct read: + check_new_memoread(foo, 0, "content", True) + except AssertionError: # pragma: no cover + # if computer is indeed slow (happens on shared CIs) we might fail + # because distance is too short + if time.time() - t0 < cache._min_dtime: + raise # if we were quick but still failed -- legit + assert calls[-1] == [path, 0, None] + + # but if we sleep - should memoize + time.sleep(cache._min_dtime * 1.1) + check_new_memoread(foo, 1, "content") + + # and if we modify the file -- a new read + time.sleep(cache._min_dtime * 1.1) + with open(path, "w") as f: + f.write("Content") + ncalls = len(calls) + assert foo.memoread(1) == "Content" + assert len(calls) == ncalls + 1 + + # If we modify the instance, a new read + foo.extra = 1 + ncalls = len(calls) + assert foo.memoread(1) == "Content" + assert len(calls) == ncalls + 1 + + # Another, unequal instance gets its own read + bar = Foo("bar", path) + ncalls = len(calls) + assert bar.memoread(1) == "Content" + assert len(calls) == ncalls + 1 + assert bar.memoread(1) == "Content" + assert len(calls) == ncalls + 1 + + # Another instance pointing to a different path gets its own read + path2 = str(tmp_path / "path2.txt") + with open(path2, "w") as fp: + print("Different content", file=fp) + baz = Foo("foo", path2) + ncalls = len(calls) + assert baz.memoread(1) == "Different content\n" + assert len(calls) == ncalls + 1 + assert baz.memoread(1) == "Different content\n" + assert len(calls) == ncalls + 1 + + assert foo.memoread(1) == "Content" + assert len(calls) == ncalls + 1 + assert bar.memoread(1) == "Content" + assert len(calls) == ncalls + 1