Переглянути джерело

refactor files cache code into FilesCacheMixin class

Thomas Waldmann 1 рік тому
батько
коміт
d466005682
1 змінених файлів з 187 додано та 168 видалено
  1. 187 168
      src/borg/cache.py

+ 187 - 168
src/borg/cache.py

@@ -248,15 +248,6 @@ def cache_dir(repository, path=None):
     return path or os.path.join(get_cache_dir(), repository.id_str)
 
 
-def files_cache_name():
-    suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
-    return "files." + suffix if suffix else "files"
-
-
-def discover_files_cache_name(path):
-    return [fn for fn in os.listdir(path) if fn == "files" or fn.startswith("files.")][0]
-
-
 class CacheConfig:
     def __init__(self, repository, path=None, lock_wait=None):
         self.repository = repository
@@ -493,7 +484,183 @@ Total chunks: {0.total_chunks}
         return self.Summary(**stats)
 
 
-class LocalCache(CacheStatsMixin):
+class FilesCacheMixin:
+    """
+    Massively accelerate processing of unchanged files by caching their chunks list.
+    With that, we can avoid having to read and chunk them to get their chunks list.
+    """
+
+    FILES_CACHE_NAME = "files"
+
+    def __init__(self, cache_mode):
+        self.cache_mode = cache_mode
+        self.files = None
+        self._newest_cmtime = None
+
+    def files_cache_name(self):
+        suffix = os.environ.get("BORG_FILES_CACHE_SUFFIX", "")
+        return self.FILES_CACHE_NAME + "." + suffix if suffix else self.FILES_CACHE_NAME
+
+    def discover_files_cache_name(self, path):
+        return [
+            fn for fn in os.listdir(path) if fn == self.FILES_CACHE_NAME or fn.startswith(self.FILES_CACHE_NAME + ".")
+        ][0]
+
+    def _create_empty_files_cache(self, path):
+        with IntegrityCheckedFile(path=os.path.join(path, self.files_cache_name()), write=True) as fd:
+            pass  # empty file
+        return fd.integrity_data
+
+    def _read_files_cache(self):
+        if "d" in self.cache_mode:  # d(isabled)
+            return
+
+        self.files = {}
+        logger.debug("Reading files cache ...")
+        files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
+        msg = None
+        try:
+            with IntegrityCheckedFile(
+                path=os.path.join(self.path, self.files_cache_name()),
+                write=False,
+                integrity_data=self.cache_config.integrity.get(self.files_cache_name()),
+            ) as fd:
+                u = msgpack.Unpacker(use_list=True)
+                while True:
+                    data = fd.read(64 * 1024)
+                    if not data:
+                        break
+                    u.feed(data)
+                    try:
+                        for path_hash, item in u:
+                            entry = FileCacheEntry(*item)
+                            # in the end, this takes about 240 Bytes per file
+                            self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
+                    except (TypeError, ValueError) as exc:
+                        msg = "The files cache seems invalid. [%s]" % str(exc)
+                        break
+        except OSError as exc:
+            msg = "The files cache can't be read. [%s]" % str(exc)
+        except FileIntegrityError as fie:
+            msg = "The files cache is corrupted. [%s]" % str(fie)
+        if msg is not None:
+            logger.warning(msg)
+            logger.warning("Continuing without files cache - expect lower performance.")
+            self.files = {}
+        files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
+
+    def _write_files_cache(self):
+        if self._newest_cmtime is None:
+            # was never set because no files were modified/added
+            self._newest_cmtime = 2**63 - 1  # nanoseconds, good until y2262
+        ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
+        files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
+        with IntegrityCheckedFile(path=os.path.join(self.path, self.files_cache_name()), write=True) as fd:
+            entry_count = 0
+            for path_hash, item in self.files.items():
+                # Only keep files seen in this backup that are older than newest cmtime seen in this backup -
+                # this is to avoid issues with filesystem snapshots and cmtime granularity.
+                # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
+                entry = FileCacheEntry(*msgpack.unpackb(item))
+                if (
+                    entry.age == 0
+                    and timestamp_to_int(entry.cmtime) < self._newest_cmtime
+                    or entry.age > 0
+                    and entry.age < ttl
+                ):
+                    msgpack.pack((path_hash, entry), fd)
+                    entry_count += 1
+        files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
+        files_cache_logger.debug(
+            "FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
+        )
+        files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
+        return fd.integrity_data
+
+    def file_known_and_unchanged(self, hashed_path, path_hash, st):
+        """
+        Check if we know the file that has this path_hash (know == it is in our files cache) and
+        whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
+
+        :param hashed_path: the file's path as we gave it to hash(hashed_path)
+        :param path_hash: hash(hashed_path), to save some memory in the files cache
+        :param st: the file's stat() result
+        :return: known, chunks (known is True if we have infos about this file in the cache,
+                               chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None).
+        """
+        if not stat.S_ISREG(st.st_mode):
+            return False, None
+        cache_mode = self.cache_mode
+        if "d" in cache_mode:  # d(isabled)
+            files_cache_logger.debug("UNKNOWN: files cache disabled")
+            return False, None
+        # note: r(echunk) does not need the files cache in this method, but the files cache will
+        # be updated and saved to disk to memorize the files. To preserve previous generations in
+        # the cache, this means that it also needs to get loaded from disk first.
+        if "r" in cache_mode:  # r(echunk)
+            files_cache_logger.debug("UNKNOWN: rechunking enforced")
+            return False, None
+        entry = self.files.get(path_hash)
+        if not entry:
+            files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
+            return False, None
+        # we know the file!
+        entry = FileCacheEntry(*msgpack.unpackb(entry))
+        if "s" in cache_mode and entry.size != st.st_size:
+            files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
+            return True, None
+        if "i" in cache_mode and entry.inode != st.st_ino:
+            files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
+            return True, None
+        if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
+            files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
+            return True, None
+        elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
+            files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
+            return True, None
+        # we ignored the inode number in the comparison above or it is still same.
+        # if it is still the same, replacing it in the tuple doesn't change it.
+        # if we ignored it, a reason for doing that is that files were moved to a new
+        # disk / new fs (so a one-time change of inode number is expected) and we wanted
+        # to avoid everything getting chunked again. to be able to re-enable the inode
+        # number comparison in a future backup run (and avoid chunking everything
+        # again at that time), we need to update the inode number in the cache with what
+        # we see in the filesystem.
+        self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
+        chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks]  # convert to list of namedtuple
+        return True, chunks
+
+    def memorize_file(self, hashed_path, path_hash, st, chunks):
+        if not stat.S_ISREG(st.st_mode):
+            return
+        cache_mode = self.cache_mode
+        # note: r(echunk) modes will update the files cache, d(isabled) mode won't
+        if "d" in cache_mode:
+            files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
+            return
+        if "c" in cache_mode:
+            cmtime_type = "ctime"
+            cmtime_ns = safe_ns(st.st_ctime_ns)
+        elif "m" in cache_mode:
+            cmtime_type = "mtime"
+            cmtime_ns = safe_ns(st.st_mtime_ns)
+        else:  # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError
+            cmtime_type = "ctime"
+            cmtime_ns = safe_ns(st.st_ctime_ns)
+        entry = FileCacheEntry(
+            age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks
+        )
+        self.files[path_hash] = msgpack.packb(entry)
+        self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
+        files_cache_logger.debug(
+            "FILES-CACHE-UPDATE: put %r [has %s] <- %r",
+            entry._replace(chunks="[%d entries]" % len(entry.chunks)),
+            cmtime_type,
+            hashed_path,
+        )
+
+
+class LocalCache(CacheStatsMixin, FilesCacheMixin):
     """
     Persistent, local (client-side) cache.
     """
@@ -516,13 +683,13 @@ class LocalCache(CacheStatsMixin):
         :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
         """
         CacheStatsMixin.__init__(self, iec=iec)
+        FilesCacheMixin.__init__(self, cache_mode)
         assert isinstance(manifest, Manifest)
         self.manifest = manifest
         self.repository = manifest.repository
         self.key = manifest.key
         self.repo_objs = manifest.repo_objs
         self.progress = progress
-        self.cache_mode = cache_mode
         self.timestamp = None
         self.txn_active = False
         self.do_cache = os.environ.get("BORG_USE_CHUNKS_ARCHIVE", "yes").lower() in ["yes", "1", "true"]
@@ -571,8 +738,7 @@ class LocalCache(CacheStatsMixin):
         self.cache_config.create()
         ChunkIndex().write(os.path.join(self.path, "chunks"))
         os.makedirs(os.path.join(self.path, "chunks.archive.d"))
-        with SaveFile(os.path.join(self.path, files_cache_name()), binary=True):
-            pass  # empty file
+        self._create_empty_files_cache(self.path)
 
     def _do_open(self):
         self.cache_config.load()
@@ -582,10 +748,7 @@ class LocalCache(CacheStatsMixin):
             integrity_data=self.cache_config.integrity.get("chunks"),
         ) as fd:
             self.chunks = ChunkIndex.read(fd)
-        if "d" in self.cache_mode:  # d(isabled)
-            self.files = None
-        else:
-            self._read_files()
+        self._read_files_cache()
 
     def open(self):
         if not os.path.isdir(self.path):
@@ -598,42 +761,6 @@ class LocalCache(CacheStatsMixin):
             self.cache_config.close()
             self.cache_config = None
 
-    def _read_files(self):
-        self.files = {}
-        self._newest_cmtime = None
-        logger.debug("Reading files cache ...")
-        files_cache_logger.debug("FILES-CACHE-LOAD: starting...")
-        msg = None
-        try:
-            with IntegrityCheckedFile(
-                path=os.path.join(self.path, files_cache_name()),
-                write=False,
-                integrity_data=self.cache_config.integrity.get(files_cache_name()),
-            ) as fd:
-                u = msgpack.Unpacker(use_list=True)
-                while True:
-                    data = fd.read(64 * 1024)
-                    if not data:
-                        break
-                    u.feed(data)
-                    try:
-                        for path_hash, item in u:
-                            entry = FileCacheEntry(*item)
-                            # in the end, this takes about 240 Bytes per file
-                            self.files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
-                    except (TypeError, ValueError) as exc:
-                        msg = "The files cache seems invalid. [%s]" % str(exc)
-                        break
-        except OSError as exc:
-            msg = "The files cache can't be read. [%s]" % str(exc)
-        except FileIntegrityError as fie:
-            msg = "The files cache is corrupted. [%s]" % str(fie)
-        if msg is not None:
-            logger.warning(msg)
-            logger.warning("Continuing without files cache - expect lower performance.")
-            self.files = {}
-        files_cache_logger.debug("FILES-CACHE-LOAD: finished, %d entries loaded.", len(self.files))
-
     def begin_txn(self):
         # Initialize transaction snapshot
         pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
@@ -645,10 +772,9 @@ class LocalCache(CacheStatsMixin):
         shutil.copy(os.path.join(self.path, "chunks"), txn_dir)
         pi.output("Initializing cache transaction: Reading files")
         try:
-            shutil.copy(os.path.join(self.path, files_cache_name()), txn_dir)
+            shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
         except FileNotFoundError:
-            with SaveFile(os.path.join(txn_dir, files_cache_name()), binary=True):
-                pass  # empty file
+            self._create_empty_files_cache(txn_dir)
         os.replace(txn_dir, os.path.join(self.path, "txn.active"))
         self.txn_active = True
         pi.finish()
@@ -660,33 +786,9 @@ class LocalCache(CacheStatsMixin):
         self.security_manager.save(self.manifest, self.key)
         pi = ProgressIndicatorMessage(msgid="cache.commit")
         if self.files is not None:
-            if self._newest_cmtime is None:
-                # was never set because no files were modified/added
-                self._newest_cmtime = 2**63 - 1  # nanoseconds, good until y2262
-            ttl = int(os.environ.get("BORG_FILES_CACHE_TTL", 20))
             pi.output("Saving files cache")
-            files_cache_logger.debug("FILES-CACHE-SAVE: starting...")
-            with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd:
-                entry_count = 0
-                for path_hash, item in self.files.items():
-                    # Only keep files seen in this backup that are older than newest cmtime seen in this backup -
-                    # this is to avoid issues with filesystem snapshots and cmtime granularity.
-                    # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
-                    entry = FileCacheEntry(*msgpack.unpackb(item))
-                    if (
-                        entry.age == 0
-                        and timestamp_to_int(entry.cmtime) < self._newest_cmtime
-                        or entry.age > 0
-                        and entry.age < ttl
-                    ):
-                        msgpack.pack((path_hash, entry), fd)
-                        entry_count += 1
-            files_cache_logger.debug("FILES-CACHE-KILL: removed all old entries with age >= TTL [%d]", ttl)
-            files_cache_logger.debug(
-                "FILES-CACHE-KILL: removed all current entries with newest cmtime %d", self._newest_cmtime
-            )
-            files_cache_logger.debug("FILES-CACHE-SAVE: finished, %d remaining entries saved.", entry_count)
-            self.cache_config.integrity[files_cache_name()] = fd.integrity_data
+            integrity_data = self._write_files_cache()
+            self.cache_config.integrity[self.files_cache_name()] = integrity_data
         pi.output("Saving chunks cache")
         with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd:
             self.chunks.write(fd)
@@ -708,7 +810,7 @@ class LocalCache(CacheStatsMixin):
         if os.path.exists(txn_dir):
             shutil.copy(os.path.join(txn_dir, "config"), self.path)
             shutil.copy(os.path.join(txn_dir, "chunks"), self.path)
-            shutil.copy(os.path.join(txn_dir, discover_files_cache_name(txn_dir)), self.path)
+            shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
             txn_tmp = os.path.join(self.path, "txn.tmp")
             os.replace(txn_dir, txn_tmp)
             if os.path.exists(txn_tmp):
@@ -939,9 +1041,8 @@ class LocalCache(CacheStatsMixin):
         with IntegrityCheckedFile(path=os.path.join(self.path, "chunks"), write=True) as fd:
             self.chunks.write(fd)
         self.cache_config.integrity["chunks"] = fd.integrity_data
-        with IntegrityCheckedFile(path=os.path.join(self.path, files_cache_name()), write=True) as fd:
-            pass  # empty file
-        self.cache_config.integrity[files_cache_name()] = fd.integrity_data
+        integrity_data = self._create_empty_files_cache(self.path)
+        self.cache_config.integrity[self.files_cache_name()] = integrity_data
         self.cache_config.manifest_id = ""
         self.cache_config._config.set("cache", "manifest", "")
         if not self.cache_config._config.has_section("integrity"):
@@ -1033,88 +1134,6 @@ class LocalCache(CacheStatsMixin):
         else:
             stats.update(-size, False)
 
-    def file_known_and_unchanged(self, hashed_path, path_hash, st):
-        """
-        Check if we know the file that has this path_hash (know == it is in our files cache) and
-        whether it is unchanged (the size/inode number/cmtime is same for stuff we check in this cache_mode).
-
-        :param hashed_path: the file's path as we gave it to hash(hashed_path)
-        :param path_hash: hash(hashed_path), to save some memory in the files cache
-        :param st: the file's stat() result
-        :return: known, chunks (known is True if we have infos about this file in the cache,
-                               chunks is a list[ChunkListEntry] IF the file has not changed, otherwise None).
-        """
-        if not stat.S_ISREG(st.st_mode):
-            return False, None
-        cache_mode = self.cache_mode
-        if "d" in cache_mode:  # d(isabled)
-            files_cache_logger.debug("UNKNOWN: files cache disabled")
-            return False, None
-        # note: r(echunk) does not need the files cache in this method, but the files cache will
-        # be updated and saved to disk to memorize the files. To preserve previous generations in
-        # the cache, this means that it also needs to get loaded from disk first.
-        if "r" in cache_mode:  # r(echunk)
-            files_cache_logger.debug("UNKNOWN: rechunking enforced")
-            return False, None
-        entry = self.files.get(path_hash)
-        if not entry:
-            files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
-            return False, None
-        # we know the file!
-        entry = FileCacheEntry(*msgpack.unpackb(entry))
-        if "s" in cache_mode and entry.size != st.st_size:
-            files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
-            return True, None
-        if "i" in cache_mode and entry.inode != st.st_ino:
-            files_cache_logger.debug("KNOWN-CHANGED: file inode number has changed: %r", hashed_path)
-            return True, None
-        if "c" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_ctime_ns:
-            files_cache_logger.debug("KNOWN-CHANGED: file ctime has changed: %r", hashed_path)
-            return True, None
-        elif "m" in cache_mode and timestamp_to_int(entry.cmtime) != st.st_mtime_ns:
-            files_cache_logger.debug("KNOWN-CHANGED: file mtime has changed: %r", hashed_path)
-            return True, None
-        # we ignored the inode number in the comparison above or it is still same.
-        # if it is still the same, replacing it in the tuple doesn't change it.
-        # if we ignored it, a reason for doing that is that files were moved to a new
-        # disk / new fs (so a one-time change of inode number is expected) and we wanted
-        # to avoid everything getting chunked again. to be able to re-enable the inode
-        # number comparison in a future backup run (and avoid chunking everything
-        # again at that time), we need to update the inode number in the cache with what
-        # we see in the filesystem.
-        self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, age=0))
-        chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks]  # convert to list of namedtuple
-        return True, chunks
-
-    def memorize_file(self, hashed_path, path_hash, st, chunks):
-        if not stat.S_ISREG(st.st_mode):
-            return
-        cache_mode = self.cache_mode
-        # note: r(echunk) modes will update the files cache, d(isabled) mode won't
-        if "d" in cache_mode:
-            files_cache_logger.debug("FILES-CACHE-NOUPDATE: files cache disabled")
-            return
-        if "c" in cache_mode:
-            cmtime_type = "ctime"
-            cmtime_ns = safe_ns(st.st_ctime_ns)
-        elif "m" in cache_mode:
-            cmtime_type = "mtime"
-            cmtime_ns = safe_ns(st.st_mtime_ns)
-        else:  # neither 'c' nor 'm' in cache_mode, avoid UnboundLocalError
-            cmtime_type = "ctime"
-            cmtime_ns = safe_ns(st.st_ctime_ns)
-        entry = FileCacheEntry(
-            age=0, inode=st.st_ino, size=st.st_size, cmtime=int_to_timestamp(cmtime_ns), chunks=chunks
-        )
-        self.files[path_hash] = msgpack.packb(entry)
-        self._newest_cmtime = max(self._newest_cmtime or 0, cmtime_ns)
-        files_cache_logger.debug(
-            "FILES-CACHE-UPDATE: put %r [has %s] <- %r",
-            entry._replace(chunks="[%d entries]" % len(entry.chunks)),
-            cmtime_type,
-            hashed_path,
-        )
-
 
 class AdHocCache(CacheStatsMixin):
     """