Browse Source

reduce memory consumption of files cache, fixes #5756

- refactor packing/unpacking of fc entries into separate functions
- instead of a chunks list entry being a tuple of 256bit id [bytes] and 32bit size [int],
  only store a stable 32bit index into kv array of ChunkIndex (where we also have id and
  size [and refcount]).
- only done in memory, the on-disk format has (id, size) tuples.

memory consumption (N = entry.chunks list element count, X = overhead for rest of entry):
- previously:
  - packed = packb(dict(..., chunks=[(id1, size1), (id2, size2), ...]))
  - packed size ~= X + N * (1 + (34 + 5)) Bytes
- now:
  - packed = packb(dict(..., chunks=[ix1, ix2, ...]))
  - packed size ~= X + N * 5 Bytes
Thomas Waldmann 7 months ago
parent
commit
e053307523
2 changed files with 54 additions and 7 deletions
  1. 48 7
      src/borg/cache.py
  2. 6 0
      src/borg/hashindex.pyx

+ 48 - 7
src/borg/cache.py

@@ -381,6 +381,46 @@ class FilesCacheMixin:
         self._newest_path_hashes = set()
         self.start_backup = start_backup
 
+    def compress_entry(self, entry):
+        """
+        compress a files cache entry:
+
+        - use the ChunkIndex to "compress" the entry's chunks list (256bit key + 32bit size -> 32bit index).
+        - use msgpack to pack the entry (reduce memory usage by packing and having less python objects).
+
+        Note: the result is only valid while the ChunkIndex is in memory!
+        """
+        assert isinstance(self.chunks, ChunkIndex), f"{self.chunks} is not a ChunkIndex"
+        assert isinstance(entry, FileCacheEntry)
+        compressed_chunks = []
+        for id, size in entry.chunks:
+            cie = self.chunks.get(id)
+            assert cie is not None
+            assert cie.refcount > 0
+            assert size == cie.size
+            idx = self.chunks.k_to_idx(id)
+            compressed_chunks.append(idx)
+        entry = entry._replace(chunks=compressed_chunks)
+        return msgpack.packb(entry)
+
+    def decompress_entry(self, entry_packed):
+        """reverse operation of compress_entry"""
+        assert isinstance(self.chunks, ChunkIndex), f"{self.chunks} is not a ChunkIndex"
+        assert isinstance(entry_packed, bytes)
+        entry = msgpack.unpackb(entry_packed)
+        entry = FileCacheEntry(*entry)
+        chunks = []
+        for idx in entry.chunks:
+            assert isinstance(idx, int), f"{idx} is not an int"
+            id = self.chunks.idx_to_k(idx)
+            cie = self.chunks.get(id)
+            assert cie is not None
+            assert cie.refcount > 0
+            assert cie.size > 0
+            chunks.append((id, cie.size))
+        entry = entry._replace(chunks=chunks)
+        return entry
+
     @property
     def files(self):
         if self._files is None:
@@ -440,7 +480,7 @@ class FilesCacheMixin:
                     mtime=int_to_timestamp(mtime_ns),
                     chunks=item.chunks,
                 )
-                files[path_hash] = msgpack.packb(entry)  # takes about 240 Bytes per file
+                files[path_hash] = self.compress_entry(entry)
         # deal with special snapshot / timestamp granularity case, see FAQ:
         for path_hash in self._newest_path_hashes:
             del files[path_hash]
@@ -483,8 +523,8 @@ class FilesCacheMixin:
                     try:
                         for path_hash, item in u:
                             entry = FileCacheEntry(*item)
-                            # in the end, this takes about 240 Bytes per file
-                            files[path_hash] = msgpack.packb(entry._replace(age=entry.age + 1))
+                            entry = entry._replace(age=entry.age + 1)
+                            files[path_hash] = self.compress_entry(entry)
                     except (TypeError, ValueError) as exc:
                         msg = "The files cache seems invalid. [%s]" % str(exc)
                         break
@@ -514,7 +554,7 @@ class FilesCacheMixin:
             age_discarded = 0
             race_discarded = 0
             for path_hash, item in files.items():
-                entry = FileCacheEntry(*msgpack.unpackb(item))
+                entry = FileCacheEntry(*self.decompress_entry(item))
                 if entry.age == 0:  # current entries
                     if max(timestamp_to_int(entry.ctime), timestamp_to_int(entry.mtime)) < discard_after:
                         # Only keep files seen in this backup that old enough not to suffer race conditions relating
@@ -567,7 +607,7 @@ class FilesCacheMixin:
             files_cache_logger.debug("UNKNOWN: no file metadata in cache for: %r", hashed_path)
             return False, None
         # we know the file!
-        entry = FileCacheEntry(*msgpack.unpackb(entry))
+        entry = FileCacheEntry(*self.decompress_entry(entry))
         if "s" in cache_mode and entry.size != st.st_size:
             files_cache_logger.debug("KNOWN-CHANGED: file size has changed: %r", hashed_path)
             return True, None
@@ -590,7 +630,8 @@ class FilesCacheMixin:
         # to avoid everything getting chunked again. to be able to re-enable the
         # V comparison in a future backup run (and avoid chunking everything again at
         # that time), we need to update V in the cache with what we see in the filesystem.
-        self.files[path_hash] = msgpack.packb(entry._replace(inode=st.st_ino, ctime=ctime, mtime=mtime, age=0))
+        entry = entry._replace(inode=st.st_ino, ctime=ctime, mtime=mtime, age=0)
+        self.files[path_hash] = self.compress_entry(entry)
         chunks = [ChunkListEntry(*chunk) for chunk in entry.chunks]  # convert to list of namedtuple
         return True, chunks
 
@@ -611,7 +652,7 @@ class FilesCacheMixin:
             mtime=int_to_timestamp(mtime_ns),
             chunks=chunks,
         )
-        self.files[path_hash] = msgpack.packb(entry)
+        self.files[path_hash] = self.compress_entry(entry)
         self._newest_cmtime = max(self._newest_cmtime or 0, ctime_ns)
         self._newest_cmtime = max(self._newest_cmtime or 0, mtime_ns)
         files_cache_logger.debug(

+ 6 - 0
src/borg/hashindex.pyx

@@ -73,6 +73,12 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
     def stats(self):
         return self.ht.stats
 
+    def k_to_idx(self, key):
+        return self.ht.k_to_idx(key)
+
+    def idx_to_k(self, idx):
+        return self.ht.idx_to_k(idx)
+
 
 FuseVersionsIndexEntry = namedtuple('FuseVersionsIndexEntry', 'version hash')