Przeglądaj źródła

implement NewCache

Also:
- move common code to ChunksMixin
- always use ._txn_active (not .txn_active)

Some tests are still failing.
Thomas Waldmann 1 rok temu
rodzic
commit
e2a1999c59
2 zmienionych plików z 298 dodań i 174 usunięć
  1. 296 172
      src/borg/cache.py
  2. 2 2
      src/borg/helpers/parseformat.py

+ 296 - 172
src/borg/cache.py

@@ -420,6 +420,17 @@ class Cache:
                 cache_mode=cache_mode,
                 cache_mode=cache_mode,
             )
             )
 
 
+        def newcache():
+            return NewCache(
+                manifest=manifest,
+                path=path,
+                warn_if_unencrypted=warn_if_unencrypted,
+                progress=progress,
+                iec=iec,
+                lock_wait=lock_wait,
+                cache_mode=cache_mode,
+            )
+
         def adhoc():
         def adhoc():
             return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec)
             return AdHocCache(manifest=manifest, lock_wait=lock_wait, iec=iec)
 
 
@@ -660,7 +671,127 @@ class FilesCacheMixin:
         )
         )
 
 
 
 
-class LocalCache(CacheStatsMixin, FilesCacheMixin):
+class ChunksMixin:
+    """
+    Chunks index related code for misc. Cache implementations.
+    """
+
+    def chunk_incref(self, id, size, stats):
+        assert isinstance(size, int) and size > 0
+        if not self._txn_active:
+            self.begin_txn()
+        count, _size = self.chunks.incref(id)
+        stats.update(size, False)
+        return ChunkListEntry(id, size)
+
+    def chunk_decref(self, id, size, stats, wait=True):
+        assert isinstance(size, int) and size > 0
+        if not self._txn_active:
+            self.begin_txn()
+        count, _size = self.chunks.decref(id)
+        if count == 0:
+            del self.chunks[id]
+            self.repository.delete(id, wait=wait)
+            stats.update(-size, True)
+        else:
+            stats.update(-size, False)
+
+    def seen_chunk(self, id, size=None):
+        if not self._txn_active:
+            self.begin_txn()
+        entry = self.chunks.get(id, ChunkIndexEntry(0, None))
+        if entry.refcount and size is not None:
+            assert isinstance(entry.size, int)
+            if entry.size:
+                # LocalCache: has existing size information and uses *size* to make an effort at detecting collisions.
+                if size != entry.size:
+                    # we already have a chunk with that id, but different size.
+                    # this is either a hash collision (unlikely) or corruption or a bug.
+                    raise Exception(
+                        "chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, entry.size, size)
+                    )
+            else:
+                # NewCache / AdHocCache:
+                # Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
+                self.chunks[id] = entry._replace(size=size)
+        return entry.refcount
+
+    def add_chunk(
+        self,
+        id,
+        meta,
+        data,
+        *,
+        stats,
+        wait=True,
+        compress=True,
+        size=None,
+        ctype=None,
+        clevel=None,
+        ro_type=ROBJ_FILE_STREAM,
+    ):
+        assert ro_type is not None
+        if not self._txn_active:
+            self.begin_txn()
+        if size is None:
+            if compress:
+                size = len(data)  # data is still uncompressed
+            else:
+                raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
+        refcount = self.seen_chunk(id, size)
+        if refcount:
+            return self.chunk_incref(id, size, stats)
+        cdata = self.repo_objs.format(
+            id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
+        )
+        self.repository.put(id, cdata, wait=wait)
+        self.chunks.add(id, 1, size)
+        stats.update(size, not refcount)
+        return ChunkListEntry(id, size)
+
+    def _load_chunks_from_repo(self):
+        # Explicitly set the initial usable hash table capacity to avoid performance issues
+        # due to hash table "resonance".
+        # Since we're creating an archive, add 10 % from the start.
+        num_chunks = len(self.repository)
+        chunks = ChunkIndex(usable=num_chunks * 1.1)
+        pi = ProgressIndicatorPercent(
+            total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
+        )
+        t0 = perf_counter()
+        num_requests = 0
+        marker = None
+        while True:
+            result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
+            num_requests += 1
+            if not result:
+                break
+            pi.show(increase=len(result))
+            marker = result[-1]
+            # All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
+            # therefore we can't/won't delete them. Chunks we added ourselves in this transaction
+            # (e.g. checkpoint archives) are tracked correctly.
+            init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
+            for id_ in result:
+                chunks[id_] = init_entry
+        assert len(chunks) == num_chunks
+        # LocalCache does not contain the manifest, either.
+        del chunks[self.manifest.MANIFEST_ID]
+        duration = perf_counter() - t0 or 0.01
+        pi.finish()
+        logger.debug(
+            "Cache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
+            num_chunks,
+            duration,
+            num_requests,
+            format_file_size(num_chunks * 34 / duration),
+        )
+        # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
+        # Protocol overhead is neglected in this calculation.
+        return chunks
+
+
+class LocalCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
     """
     """
     Persistent, local (client-side) cache.
     Persistent, local (client-side) cache.
     """
     """
@@ -691,7 +822,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin):
         self.repo_objs = manifest.repo_objs
         self.repo_objs = manifest.repo_objs
         self.progress = progress
         self.progress = progress
         self.timestamp = None
         self.timestamp = None
-        self.txn_active = False
+        self._txn_active = False
         self.do_cache = os.environ.get("BORG_USE_CHUNKS_ARCHIVE", "yes").lower() in ["yes", "1", "true"]
         self.do_cache = os.environ.get("BORG_USE_CHUNKS_ARCHIVE", "yes").lower() in ["yes", "1", "true"]
 
 
         self.path = cache_dir(self.repository, path)
         self.path = cache_dir(self.repository, path)
@@ -776,12 +907,12 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin):
         except FileNotFoundError:
         except FileNotFoundError:
             self._create_empty_files_cache(txn_dir)
             self._create_empty_files_cache(txn_dir)
         os.replace(txn_dir, os.path.join(self.path, "txn.active"))
         os.replace(txn_dir, os.path.join(self.path, "txn.active"))
-        self.txn_active = True
+        self._txn_active = True
         pi.finish()
         pi.finish()
 
 
     def commit(self):
     def commit(self):
         """Commit transaction"""
         """Commit transaction"""
-        if not self.txn_active:
+        if not self._txn_active:
             return
             return
         self.security_manager.save(self.manifest, self.key)
         self.security_manager.save(self.manifest, self.key)
         pi = ProgressIndicatorMessage(msgid="cache.commit")
         pi = ProgressIndicatorMessage(msgid="cache.commit")
@@ -797,7 +928,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin):
         self.cache_config.save(self.manifest, self.key)
         self.cache_config.save(self.manifest, self.key)
         os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
         os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
         shutil.rmtree(os.path.join(self.path, "txn.tmp"))
         shutil.rmtree(os.path.join(self.path, "txn.tmp"))
-        self.txn_active = False
+        self._txn_active = False
         pi.finish()
         pi.finish()
 
 
     def rollback(self):
     def rollback(self):
@@ -815,7 +946,7 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin):
             os.replace(txn_dir, txn_tmp)
             os.replace(txn_dir, txn_tmp)
             if os.path.exists(txn_tmp):
             if os.path.exists(txn_tmp):
                 shutil.rmtree(txn_tmp)
                 shutil.rmtree(txn_tmp)
-        self.txn_active = False
+        self._txn_active = False
         self._do_open()
         self._do_open()
 
 
     def sync(self):
     def sync(self):
@@ -1067,75 +1198,171 @@ class LocalCache(CacheStatsMixin, FilesCacheMixin):
         self.cache_config.ignored_features.update(repo_features - my_features)
         self.cache_config.ignored_features.update(repo_features - my_features)
         self.cache_config.mandatory_features.update(repo_features & my_features)
         self.cache_config.mandatory_features.update(repo_features & my_features)
 
 
-    def add_chunk(
+
+class NewCache(CacheStatsMixin, FilesCacheMixin, ChunksMixin):
+    """
+    Like AdHocCache, but with a files cache.
+    """
+
+    def __init__(
         self,
         self,
-        id,
-        meta,
-        data,
-        *,
-        stats,
-        wait=True,
-        compress=True,
-        size=None,
-        ctype=None,
-        clevel=None,
-        ro_type=ROBJ_FILE_STREAM,
+        manifest,
+        path=None,
+        warn_if_unencrypted=True,
+        progress=False,
+        lock_wait=None,
+        cache_mode=FILES_CACHE_MODE_DISABLED,
+        iec=False,
     ):
     ):
-        assert ro_type is not None
-        if not self.txn_active:
-            self.begin_txn()
-        if size is None:
-            if compress:
-                size = len(data)  # data is still uncompressed
-            else:
-                raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
-        refcount = self.seen_chunk(id, size)
-        if refcount:
-            return self.chunk_incref(id, size, stats)
-        if size is None:
-            raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also")
-        cdata = self.repo_objs.format(
-            id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
-        )
-        self.repository.put(id, cdata, wait=wait)
-        self.chunks.add(id, 1, size)
-        stats.update(size, not refcount)
-        return ChunkListEntry(id, size)
+        """
+        :param warn_if_unencrypted: print warning if accessing unknown unencrypted repository
+        :param lock_wait: timeout for lock acquisition (int [s] or None [wait forever])
+        :param cache_mode: what shall be compared in the file stat infos vs. cached stat infos comparison
+        """
+        CacheStatsMixin.__init__(self, iec=iec)
+        FilesCacheMixin.__init__(self, cache_mode)
+        assert isinstance(manifest, Manifest)
+        self.manifest = manifest
+        self.repository = manifest.repository
+        self.key = manifest.key
+        self.repo_objs = manifest.repo_objs
+        self.progress = progress
+        self.timestamp = None
+        self._txn_active = False
 
 
-    def seen_chunk(self, id, size=None):
-        refcount, stored_size = self.chunks.get(id, ChunkIndexEntry(0, None))
-        if size is not None and stored_size is not None and size != stored_size:
-            # we already have a chunk with that id, but different size.
-            # this is either a hash collision (unlikely) or corruption or a bug.
-            raise Exception(
-                "chunk has same id [%r], but different size (stored: %d new: %d)!" % (id, stored_size, size)
-            )
-        return refcount
+        self.path = cache_dir(self.repository, path)
+        self.security_manager = SecurityManager(self.repository)
+        self.cache_config = CacheConfig(self.repository, self.path, lock_wait)
 
 
-    def chunk_incref(self, id, size, stats):
-        assert isinstance(size, int) and size > 0
-        if not self.txn_active:
-            self.begin_txn()
-        count, _size = self.chunks.incref(id)
-        assert size == _size
-        stats.update(size, False)
-        return ChunkListEntry(id, size)
+        # Warn user before sending data to a never seen before unencrypted repository
+        if not os.path.exists(self.path):
+            self.security_manager.assert_access_unknown(warn_if_unencrypted, manifest, self.key)
+            self.create()
 
 
-    def chunk_decref(self, id, size, stats, wait=True):
-        assert isinstance(size, int) and size > 0
-        if not self.txn_active:
-            self.begin_txn()
-        count, _size = self.chunks.decref(id)
-        assert size == 1 or size == _size  # don't check if caller gave fake size 1
-        if count == 0:
-            del self.chunks[id]
-            self.repository.delete(id, wait=wait)
-            stats.update(-size, True)
-        else:
-            stats.update(-size, False)
+        self.open()
+        try:
+            self.security_manager.assert_secure(manifest, self.key, cache_config=self.cache_config)
 
 
+            if not self.check_cache_compatibility():
+                self.wipe_cache()
 
 
-class AdHocCache(CacheStatsMixin):
+            self.update_compatibility()
+        except:  # noqa
+            self.close()
+            raise
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    def create(self):
+        """Create a new empty cache at `self.path`"""
+        os.makedirs(self.path)
+        with open(os.path.join(self.path, "README"), "w") as fd:
+            fd.write(CACHE_README)
+        self.cache_config.create()
+        self._create_empty_files_cache(self.path)
+
+    def _do_open(self):
+        self.cache_config.load()
+        self.chunks = self._load_chunks_from_repo()
+        self._read_files_cache()
+
+    def open(self):
+        if not os.path.isdir(self.path):
+            raise Exception("%s Does not look like a Borg cache" % self.path)
+        self.cache_config.open()
+        self.rollback()
+
+    def close(self):
+        if self.cache_config is not None:
+            self.cache_config.close()
+            self.cache_config = None
+
+    def begin_txn(self):
+        # Initialize transaction snapshot
+        pi = ProgressIndicatorMessage(msgid="cache.begin_transaction")
+        txn_dir = os.path.join(self.path, "txn.tmp")
+        os.mkdir(txn_dir)
+        pi.output("Initializing cache transaction: Reading config")
+        shutil.copy(os.path.join(self.path, "config"), txn_dir)
+        pi.output("Initializing cache transaction: Reading files")
+        try:
+            shutil.copy(os.path.join(self.path, self.files_cache_name()), txn_dir)
+        except FileNotFoundError:
+            self._create_empty_files_cache(txn_dir)
+        os.replace(txn_dir, os.path.join(self.path, "txn.active"))
+        pi.finish()
+        self._txn_active = True
+
+    def commit(self):
+        if not self._txn_active:
+            return
+        self.security_manager.save(self.manifest, self.key)
+        pi = ProgressIndicatorMessage(msgid="cache.commit")
+        if self.files is not None:
+            pi.output("Saving files cache")
+            integrity_data = self._write_files_cache()
+            self.cache_config.integrity[self.files_cache_name()] = integrity_data
+        pi.output("Saving cache config")
+        self.cache_config.save(self.manifest, self.key)
+        os.replace(os.path.join(self.path, "txn.active"), os.path.join(self.path, "txn.tmp"))
+        shutil.rmtree(os.path.join(self.path, "txn.tmp"))
+        self._txn_active = False
+        pi.finish()
+
+    def rollback(self):
+        # Remove partial transaction
+        if os.path.exists(os.path.join(self.path, "txn.tmp")):
+            shutil.rmtree(os.path.join(self.path, "txn.tmp"))
+        # Roll back active transaction
+        txn_dir = os.path.join(self.path, "txn.active")
+        if os.path.exists(txn_dir):
+            shutil.copy(os.path.join(txn_dir, "config"), self.path)
+            shutil.copy(os.path.join(txn_dir, self.discover_files_cache_name(txn_dir)), self.path)
+            txn_tmp = os.path.join(self.path, "txn.tmp")
+            os.replace(txn_dir, txn_tmp)
+            if os.path.exists(txn_tmp):
+                shutil.rmtree(txn_tmp)
+        self._txn_active = False
+        self._do_open()
+
+    def check_cache_compatibility(self):
+        my_features = Manifest.SUPPORTED_REPO_FEATURES
+        if self.cache_config.ignored_features & my_features:
+            # The cache might not contain references of chunks that need a feature that is mandatory for some operation
+            # and which this version supports. To avoid corruption while executing that operation force rebuild.
+            return False
+        if not self.cache_config.mandatory_features <= my_features:
+            # The cache was build with consideration to at least one feature that this version does not understand.
+            # This client might misinterpret the cache. Thus force a rebuild.
+            return False
+        return True
+
+    def wipe_cache(self):
+        logger.warning("Discarding incompatible cache and forcing a cache rebuild")
+        self.chunks = ChunkIndex()
+        self._create_empty_files_cache(self.path)
+        self.cache_config.manifest_id = ""
+        self.cache_config._config.set("cache", "manifest", "")
+
+        self.cache_config.ignored_features = set()
+        self.cache_config.mandatory_features = set()
+
+    def update_compatibility(self):
+        operation_to_features_map = self.manifest.get_all_mandatory_features()
+        my_features = Manifest.SUPPORTED_REPO_FEATURES
+        repo_features = set()
+        for operation, features in operation_to_features_map.items():
+            repo_features.update(features)
+
+        self.cache_config.ignored_features.update(repo_features - my_features)
+        self.cache_config.mandatory_features.update(repo_features & my_features)
+
+
+class AdHocCache(CacheStatsMixin, ChunksMixin):
     """
     """
     Ad-hoc, non-persistent cache.
     Ad-hoc, non-persistent cache.
 
 
@@ -1183,72 +1410,6 @@ Chunk index:    {0.total_unique_chunks:20d}             unknown"""
     def memorize_file(self, hashed_path, path_hash, st, chunks):
     def memorize_file(self, hashed_path, path_hash, st, chunks):
         pass
         pass
 
 
-    def add_chunk(
-        self,
-        id,
-        meta,
-        data,
-        *,
-        stats,
-        wait=True,
-        compress=True,
-        size=None,
-        ctype=None,
-        clevel=None,
-        ro_type=ROBJ_FILE_STREAM,
-    ):
-        assert ro_type is not None
-        if not self._txn_active:
-            self.begin_txn()
-        if size is None:
-            if compress:
-                size = len(data)  # data is still uncompressed
-            else:
-                raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
-        refcount = self.seen_chunk(id, size)
-        if refcount:
-            return self.chunk_incref(id, size, stats)
-        cdata = self.repo_objs.format(
-            id, meta, data, compress=compress, size=size, ctype=ctype, clevel=clevel, ro_type=ro_type
-        )
-        self.repository.put(id, cdata, wait=wait)
-        self.chunks.add(id, 1, size)
-        stats.update(size, not refcount)
-        return ChunkListEntry(id, size)
-
-    def seen_chunk(self, id, size=None):
-        if not self._txn_active:
-            self.begin_txn()
-        entry = self.chunks.get(id, ChunkIndexEntry(0, None))
-        if entry.refcount and size and not entry.size:
-            # The LocalCache has existing size information and uses *size* to make an effort at detecting collisions.
-            # This is of course not possible for the AdHocCache.
-            # Here *size* is used to update the chunk's size information, which will be zero for existing chunks.
-            self.chunks[id] = entry._replace(size=size)
-        return entry.refcount
-
-    def chunk_incref(self, id, size, stats):
-        assert isinstance(size, int) and size > 0
-        if not self._txn_active:
-            self.begin_txn()
-        count, _size = self.chunks.incref(id)
-        assert size == _size
-        stats.update(size, False)
-        return ChunkListEntry(id, size)
-
-    def chunk_decref(self, id, size, stats, wait=True):
-        assert isinstance(size, int) and size > 0
-        if not self._txn_active:
-            self.begin_txn()
-        count, _size = self.chunks.decref(id)
-        assert size == 1 or size == _size  # don't check if caller gave fake size 1
-        if count == 0:
-            del self.chunks[id]
-            self.repository.delete(id, wait=wait)
-            stats.update(-size, True)
-        else:
-            stats.update(-size, False)
-
     def commit(self):
     def commit(self):
         if not self._txn_active:
         if not self._txn_active:
             return
             return
@@ -1261,41 +1422,4 @@ Chunk index:    {0.total_unique_chunks:20d}             unknown"""
 
 
     def begin_txn(self):
     def begin_txn(self):
         self._txn_active = True
         self._txn_active = True
-        # Explicitly set the initial usable hash table capacity to avoid performance issues
-        # due to hash table "resonance".
-        # Since we're creating an archive, add 10 % from the start.
-        num_chunks = len(self.repository)
-        self.chunks = ChunkIndex(usable=num_chunks * 1.1)
-        pi = ProgressIndicatorPercent(
-            total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
-        )
-        t0 = perf_counter()
-        num_requests = 0
-        marker = None
-        while True:
-            result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
-            num_requests += 1
-            if not result:
-                break
-            pi.show(increase=len(result))
-            marker = result[-1]
-            # All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
-            # therefore we can't/won't delete them. Chunks we added ourselves in this transaction
-            # (e.g. checkpoint archives) are tracked correctly.
-            init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
-            for id_ in result:
-                self.chunks[id_] = init_entry
-        assert len(self.chunks) == num_chunks
-        # LocalCache does not contain the manifest, either.
-        del self.chunks[self.manifest.MANIFEST_ID]
-        duration = perf_counter() - t0 or 0.01
-        pi.finish()
-        logger.debug(
-            "AdHocCache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
-            num_chunks,
-            duration,
-            num_requests,
-            format_file_size(num_chunks * 34 / duration),
-        )
-        # Chunk IDs in a list are encoded in 34 bytes: 1 byte msgpack header, 1 byte length, 32 ID bytes.
-        # Protocol overhead is neglected in this calculation.
+        self.chunks = self._load_chunks_from_repo()

+ 2 - 2
src/borg/helpers/parseformat.py

@@ -1184,13 +1184,13 @@ class BorgJsonEncoder(json.JSONEncoder):
         from ..repository import Repository
         from ..repository import Repository
         from ..remote import RemoteRepository
         from ..remote import RemoteRepository
         from ..archive import Archive
         from ..archive import Archive
-        from ..cache import LocalCache, AdHocCache
+        from ..cache import LocalCache, AdHocCache, NewCache
 
 
         if isinstance(o, Repository) or isinstance(o, RemoteRepository):
         if isinstance(o, Repository) or isinstance(o, RemoteRepository):
             return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()}
             return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()}
         if isinstance(o, Archive):
         if isinstance(o, Archive):
             return o.info()
             return o.info()
-        if isinstance(o, LocalCache):
+        if isinstance(o, (LocalCache, NewCache)):
             return {"path": o.path, "stats": o.stats()}
             return {"path": o.path, "stats": o.stats()}
         if isinstance(o, AdHocCache):
         if isinstance(o, AdHocCache):
             return {"stats": o.stats()}
             return {"stats": o.stats()}