Jelajahi Sumber

Merge pull request #8403 from ThomasWaldmann/cache-chunkindex

chunks index caching
TW 8 bulan lalu
induk
melakukan
7d02fe2b8f
3 mengubah file dengan 157 tambahan dan 66 penghapusan
  1. 14 2
      src/borg/archive.py
  2. 71 61
      src/borg/archiver/compact_cmd.py
  3. 72 3
      src/borg/cache.py

+ 14 - 2
src/borg/archive.py

@@ -50,7 +50,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
 from .item import Item, ArchiveItem, ItemDiff
 from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
 from .remote import RemoteRepository, cache_if_remote
-from .repository import Repository, NoManifestError
+from .repository import Repository, NoManifestError, StoreObjectNotFound
 from .repoobj import RepoObj
 
 has_link = hasattr(os, "link")
@@ -1644,7 +1644,7 @@ class ArchiveChecker:
         self.check_all = not any((first, last, match, older, newer, oldest, newest))
         self.repair = repair
         self.repository = repository
-        self.chunks = build_chunkindex_from_repo(self.repository)
+        self.chunks = build_chunkindex_from_repo(self.repository, disable_caches=True, cache_immediately=not repair)
         self.key = self.make_key(repository)
         self.repo_objs = RepoObj(self.key)
         if verify_data:
@@ -2100,6 +2100,18 @@ class ArchiveChecker:
 
     def finish(self):
         if self.repair:
+            logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.")
+            # we may have deleted chunks, invalidate/remove the chunks index cache!
+            try:
+                self.repository.store_delete("cache/chunks_hash")
+            except (Repository.ObjectNotFound, StoreObjectNotFound):
+                # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
+                pass
+            try:
+                self.repository.store_delete("cache/chunks")
+            except (Repository.ObjectNotFound, StoreObjectNotFound):
+                # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
+                pass
             logger.info("Writing Manifest.")
             self.manifest.write()
 

+ 71 - 61
src/borg/archiver/compact_cmd.py

@@ -1,9 +1,11 @@
 import argparse
-from typing import Tuple, Dict
+from typing import Tuple, Set
 
 from ._common import with_repository
 from ..archive import Archive
+from ..cache import write_chunkindex_to_repo_cache
 from ..constants import *  # NOQA
+from ..hashindex import ChunkIndex, ChunkIndexEntry
 from ..helpers import set_ec, EXIT_WARNING, EXIT_ERROR, format_file_size, bin_to_hex
 from ..helpers import ProgressIndicatorPercent
 from ..manifest import Manifest
@@ -20,34 +22,33 @@ class ArchiveGarbageCollector:
         self.repository = repository
         assert isinstance(repository, (Repository, RemoteRepository))
         self.manifest = manifest
-        self.repository_chunks = None  # what we have in the repository, id -> stored_size
-        self.used_chunks = None  # what archives currently reference
-        self.wanted_chunks = None  # chunks that would be nice to have for next borg check --repair
+        self.chunks = None  # a ChunkIndex, here used for: id -> (is_used, stored_size)
         self.total_files = None  # overall number of source files written to all archives in this repo
         self.total_size = None  # overall size of source file content data written to all archives
         self.archives_count = None  # number of archives
 
     @property
     def repository_size(self):
-        if self.repository_chunks is None:
+        if self.chunks is None:
             return None
-        return sum(self.repository_chunks.values())  # sum of stored sizes
+        return sum(entry.size for id, entry in self.chunks.iteritems())  # sum of stored sizes
 
     def garbage_collect(self):
         """Removes unused chunks from a repository."""
         logger.info("Starting compaction / garbage collection...")
         logger.info("Getting object IDs present in the repository...")
-        self.repository_chunks = self.get_repository_chunks()
+        self.chunks = self.get_repository_chunks()
         logger.info("Computing object IDs used by archives...")
-        (self.used_chunks, self.wanted_chunks, self.total_files, self.total_size, self.archives_count) = (
+        (self.missing_chunks, self.reappeared_chunks, self.total_files, self.total_size, self.archives_count) = (
             self.analyze_archives()
         )
         self.report_and_delete()
+        self.save_chunk_index()
         logger.info("Finished compaction / garbage collection...")
 
-    def get_repository_chunks(self) -> Dict[bytes, int]:
+    def get_repository_chunks(self) -> ChunkIndex:
         """Build a dict id -> size of all chunks present in the repository"""
-        repository_chunks = {}
+        chunks = ChunkIndex()
         marker = None
         while True:
             result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
@@ -55,13 +56,41 @@ class ArchiveGarbageCollector:
                 break
             marker = result[-1][0]
             for id, stored_size in result:
-                repository_chunks[id] = stored_size
-        return repository_chunks
-
-    def analyze_archives(self) -> Tuple[Dict[bytes, int], Dict[bytes, int], int, int, int]:
+                # we add this id to the chunks index, using refcount == 0, because
+                # we do not know yet whether it is actually referenced from some archives.
+                # we "abuse" the size field here. usually there is the plaintext size,
+                # but we use it for the size of the stored object here.
+                chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size)
+        return chunks
+
+    def save_chunk_index(self):
+        # first clean up:
+        for id, entry in self.chunks.iteritems():
+            # we already deleted the unused chunks, so everything left must be used:
+            assert entry.refcount == ChunkIndex.MAX_VALUE
+            # as we put the wrong size in there, we need to clean up the size:
+            self.chunks[id] = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
+        # now self.chunks is an uptodate ChunkIndex, usable for general borg usage!
+        write_chunkindex_to_repo_cache(self.repository, self.chunks, compact=True, clear=True, force_write=True)
+        self.chunks = None  # nothing there (cleared!)
+
+    def analyze_archives(self) -> Tuple[Set, Set, int, int, int]:
         """Iterate over all items in all archives, create the dicts id -> size of all used/wanted chunks."""
-        used_chunks = {}  # chunks referenced by item.chunks
-        wanted_chunks = {}  # additional "wanted" chunks seen in item.chunks_healthy
+
+        def use_it(id, *, wanted=False):
+            entry = self.chunks.get(id)
+            if entry is not None:
+                # the chunk is in the repo, mark it used by setting refcount to max.
+                self.chunks[id] = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=entry.size)
+                if wanted:
+                    # chunk id is from chunks_healthy list: a lost chunk has re-appeared!
+                    reappeared_chunks.add(id)
+            else:
+                # we do NOT have this chunk in the repository!
+                missing_chunks.add(id)
+
+        missing_chunks: set[bytes] = set()
+        reappeared_chunks: set[bytes] = set()
         archive_infos = self.manifest.archives.list(sort_by=["ts"])
         num_archives = len(archive_infos)
         pi = ProgressIndicatorPercent(
@@ -73,79 +102,60 @@ class ArchiveGarbageCollector:
             logger.info(f"Analyzing archive {info.name} {info.ts} {bin_to_hex(info.id)} ({i + 1}/{num_archives})")
             archive = Archive(self.manifest, info.id)
             # archive metadata size unknown, but usually small/irrelevant:
-            used_chunks[archive.id] = 0
+            use_it(archive.id)
             for id in archive.metadata.item_ptrs:
-                used_chunks[id] = 0
+                use_it(id)
             for id in archive.metadata.items:
-                used_chunks[id] = 0
+                use_it(id)
             # archive items content data:
             for item in archive.iter_items():
                 total_files += 1  # every fs object counts, not just regular files
                 if "chunks" in item:
                     for id, size in item.chunks:
                         total_size += size  # original, uncompressed file content size
-                        used_chunks[id] = size
+                        use_it(id)
                     if "chunks_healthy" in item:
                         # we also consider the chunks_healthy chunks as referenced - do not throw away
                         # anything that borg check --repair might still need.
                         for id, size in item.chunks_healthy:
-                            if id not in used_chunks:
-                                wanted_chunks[id] = size
+                            use_it(id, wanted=True)
         pi.finish()
-        return used_chunks, wanted_chunks, total_files, total_size, num_archives
+        return missing_chunks, reappeared_chunks, total_files, total_size, num_archives
 
     def report_and_delete(self):
         run_repair = " Run borg check --repair!"
 
-        missing_new = set(self.used_chunks) - set(self.repository_chunks)
-        if missing_new:
-            logger.error(f"Repository has {len(missing_new)} new missing objects." + run_repair)
+        if self.missing_chunks:
+            logger.error(f"Repository has {len(self.missing_chunks)} missing objects." + run_repair)
             set_ec(EXIT_ERROR)
 
-        missing_known = set(self.wanted_chunks) - set(self.repository_chunks)
-        if missing_known:
-            logger.warning(f"Repository has {len(missing_known)} known missing objects.")
-            set_ec(EXIT_WARNING)
-
-        missing_found = set(self.wanted_chunks) & set(self.repository_chunks)
-        if missing_found:
-            logger.warning(f"{len(missing_found)} previously missing objects re-appeared!" + run_repair)
+        if self.reappeared_chunks:
+            logger.warning(f"{len(self.reappeared_chunks)} previously missing objects re-appeared!" + run_repair)
             set_ec(EXIT_WARNING)
 
         repo_size_before = self.repository_size
-        referenced_chunks = set(self.used_chunks) | set(self.wanted_chunks)
-        unused = set(self.repository_chunks) - referenced_chunks
-        logger.info(f"Repository has {len(unused)} objects to delete.")
-        if unused:
-            logger.info(f"Deleting {len(unused)} unused objects...")
-            pi = ProgressIndicatorPercent(
-                total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete"
-            )
-            for i, id in enumerate(unused):
-                pi.show(i)
-                self.repository.delete(id)
-                del self.repository_chunks[id]
-            pi.finish()
+        logger.info("Determining unused objects...")
+        unused = set()
+        for id, entry in self.chunks.iteritems():
+            if entry.refcount == 0:
+                unused.add(id)
+        logger.info(f"Deleting {len(unused)} unused objects...")
+        pi = ProgressIndicatorPercent(
+            total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete"
+        )
+        for i, id in enumerate(unused):
+            pi.show(i)
+            self.repository.delete(id)
+            del self.chunks[id]
+        pi.finish()
         repo_size_after = self.repository_size
 
-        count = len(self.repository_chunks)
+        count = len(self.chunks)
         logger.info(f"Overall statistics, considering all {self.archives_count} archives in this repository:")
         logger.info(
             f"Source data size was {format_file_size(self.total_size, precision=0)} in {self.total_files} files."
         )
-        dsize = 0
-        for id in self.repository_chunks:
-            if id in self.used_chunks:
-                dsize += self.used_chunks[id]
-            elif id in self.wanted_chunks:
-                dsize += self.wanted_chunks[id]
-            else:
-                raise KeyError(bin_to_hex(id))
-        logger.info(f"Repository size is {format_file_size(self.repository_size, precision=0)} in {count} objects.")
-        if self.total_size != 0:
-            logger.info(f"Space reduction factor due to deduplication: {dsize / self.total_size:.3f}")
-        if dsize != 0:
-            logger.info(f"Space reduction factor due to compression: {self.repository_size / dsize:.3f}")
+        logger.info(f"Repository size is {format_file_size(repo_size_after, precision=0)} in {count} objects.")
         logger.info(f"Compaction saved {format_file_size(repo_size_before - repo_size_after, precision=0)}.")
 
 

+ 72 - 3
src/borg/cache.py

@@ -1,4 +1,5 @@
 import configparser
+import io
 import os
 import shutil
 import stat
@@ -30,7 +31,7 @@ from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
 from .manifest import Manifest
 from .platform import SaveFile
 from .remote import RemoteRepository
-from .repository import LIST_SCAN_LIMIT, Repository
+from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound
 
 # chunks is a list of ChunkListEntry
 FileCacheEntry = namedtuple("FileCacheEntry", "age inode size ctime mtime chunks")
@@ -618,7 +619,64 @@ class FilesCacheMixin:
         )
 
 
-def build_chunkindex_from_repo(repository):
+def load_chunks_hash(repository) -> bytes:
+    try:
+        hash = repository.store_load("cache/chunks_hash")
+        logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.")
+    except (Repository.ObjectNotFound, StoreObjectNotFound):
+        # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
+        hash = b""
+        logger.debug("cache/chunks_hash missing!")
+    return hash
+
+
+def write_chunkindex_to_repo_cache(repository, chunks, *, compact=False, clear=False, force_write=False):
+    cached_hash = load_chunks_hash(repository)
+    if compact:
+        # if we don't need the in-memory chunks index anymore:
+        chunks.compact()  # vacuum the hash table
+    with io.BytesIO() as f:
+        chunks.write(f)
+        data = f.getvalue()
+    if clear:
+        # if we don't need the in-memory chunks index anymore:
+        chunks.clear()  # free memory, immediately
+    new_hash = xxh64(data)
+    if force_write or new_hash != cached_hash:
+        # when an updated chunks index is stored into the cache, we also store its hash into the cache.
+        # when a client is loading the chunks index from a cache, it has to compare its xxh64
+        # hash against cache/chunks_hash in the repository. if it is the same, the cache
+        # is valid. If it is different, the cache is either corrupted or out of date and
+        # has to be discarded.
+        # when some functionality is DELETING chunks from the repository, it has to either update
+        # both cache/chunks and cache/chunks_hash (like borg compact does) or it has to delete both,
+        # so that all clients will discard any client-local chunks index caches.
+        logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...")
+        repository.store_store("cache/chunks", data)
+        repository.store_store("cache/chunks_hash", new_hash)
+    return new_hash
+
+
+def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
+    chunks = None
+    # first, try to load a pre-computed and centrally cached chunks index:
+    if not disable_caches:
+        wanted_hash = load_chunks_hash(repository)
+        logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash)}) from the repo...")
+        try:
+            chunks_data = repository.store_load("cache/chunks")
+        except (Repository.ObjectNotFound, StoreObjectNotFound):
+            # TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
+            logger.debug("cache/chunks not found in the repository.")
+        else:
+            if xxh64(chunks_data) == wanted_hash:
+                logger.debug("cache/chunks is valid.")
+                with io.BytesIO(chunks_data) as f:
+                    chunks = ChunkIndex.read(f)
+                return chunks
+            else:
+                logger.debug("cache/chunks is invalid.")
+    # if we didn't get anything from the cache, compute the ChunkIndex the slow way:
     logger.debug("querying the chunk IDs list from the repo...")
     chunks = ChunkIndex()
     t0 = perf_counter()
@@ -646,6 +704,9 @@ def build_chunkindex_from_repo(repository):
     # Protocol overhead is neglected in this calculation.
     speed = format_file_size(num_chunks * 34 / duration)
     logger.debug(f"queried {num_chunks} chunk IDs in {duration} s ({num_requests} requests), ~{speed}/s")
+    if cache_immediately:
+        # immediately update cache/chunks, so we only rarely have to do it the slow way:
+        write_chunkindex_to_repo_cache(repository, chunks, compact=False, clear=False, force_write=True)
     return chunks
 
 
@@ -660,7 +721,7 @@ class ChunksMixin:
     @property
     def chunks(self):
         if self._chunks is None:
-            self._chunks = build_chunkindex_from_repo(self.repository)
+            self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True)
         return self._chunks
 
     def seen_chunk(self, id, size=None):
@@ -709,6 +770,11 @@ class ChunksMixin:
         stats.update(size, not exists)
         return ChunkListEntry(id, size)
 
+    def _write_chunks_cache(self, chunks):
+        # this is called from .close, so we can clear/compact here:
+        write_chunkindex_to_repo_cache(self.repository, self._chunks, compact=True, clear=True)
+        self._chunks = None  # nothing there (cleared!)
+
 
 class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
     """
@@ -794,6 +860,9 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
             pi.output("Saving files cache")
             integrity_data = self._write_files_cache(self._files)
             self.cache_config.integrity[self.files_cache_name()] = integrity_data
+        if self._chunks is not None:
+            pi.output("Saving chunks cache")
+            self._write_chunks_cache(self._chunks)  # cache/chunks in repo has a different integrity mechanism
         pi.output("Saving cache config")
         self.cache_config.save(self.manifest)
         self.cache_config.close()