浏览代码

manifest: no read-modify-write for borgstore archives list

previously, borg always read all archives entries, modified the
list in memory, wrote back to the repository (similar as borg 1.x
did).

now borg works directly with archives/* in the borgstore.
Thomas Waldmann 9 月之前
父节点
当前提交
ef7dd76da1
共有 3 个文件被更改,包括 98 次插入65 次删除
  1. 94 61
      src/borg/manifest.py
  2. 1 1
      src/borg/testsuite/archiver/create_cmd.py
  3. 3 3
      src/borg/testsuite/archiver/rename_cmd.py

+ 94 - 61
src/borg/manifest.py

@@ -77,7 +77,7 @@ class Archives:
     borg2 has separate items archives/* in the borgstore.
     """
 
-    def __init__(self, repository):
+    def __init__(self, repository, manifest):
         from .repository import Repository
         from .remote import RemoteRepository
 
@@ -85,87 +85,98 @@ class Archives:
         self.legacy = not isinstance(repository, (Repository, RemoteRepository))
         # key: str archive name, value: dict('id': bytes_id, 'time': str_iso_ts)
         self._archives = {}
-        self.manifest = None
+        self.manifest = manifest
 
     def prepare(self, manifest, m):
-        self.manifest = manifest
         if not self.legacy:
-            self._load()
+            pass
         else:
             self._set_raw_dict(m.archives)
 
     def finish(self, manifest):
-        self.manifest = manifest  # note: .prepare is not always called
         if not self.legacy:
-            self._save()
             manifest_archives = {}
         else:
             manifest_archives = StableDict(self._get_raw_dict())
         return manifest_archives
 
-    def _load(self):
-        # load archives list from store
-        from .helpers import msgpack
-
-        archives = {}
-        try:
-            infos = list(self.repository.store_list("archives"))
-        except ObjectNotFound:
-            infos = []
-        for info in infos:
-            info = ItemInfo(*info)  # RPC does not give us a NamedTuple
-            value = self.repository.store_load(f"archives/{info.name}")
-            _, value = self.manifest.repo_objs.parse(hex_to_bin(info.name), value, ro_type=ROBJ_MANIFEST)
-            archive = msgpack.unpackb(value)
-            archives[archive["name"]] = dict(id=archive["id"], time=archive["time"])
-        self._set_raw_dict(archives)
-
-    def _save(self):
-        # save archives list to store
-        valid_keys = set()
-        for name, info in self._get_raw_dict().items():
-            archive = dict(name=name, id=info["id"], time=info["time"])
-            value = self.manifest.key.pack_metadata(archive)  #
-            id = self.manifest.repo_objs.id_hash(value)
-            key = bin_to_hex(id)
-            value = self.manifest.repo_objs.format(id, {}, value, ro_type=ROBJ_MANIFEST)
-            self.repository.store_store(f"archives/{key}", value)
-            valid_keys.add(key)
-        # now, delete all other keys in archives/ which are not in valid keys / in the manifest anymore.
-        # TODO: this is a dirty hack to simulate the old manifest behaviour closely, but also means
-        #       keeping its problems, like read-modify-write behaviour requiring an exclusive lock.
-        try:
-            infos = list(self.repository.store_list("archives"))
-        except ObjectNotFound:
-            infos = []
-        for info in infos:
-            info = ItemInfo(*info)  # RPC does not give us a NamedTuple
-            if info.name not in valid_keys:
-                self.repository.store_delete(f"archives/{info.name}")
-
     def count(self):
         # return the count of archives in the repo
-        return len(self._archives)
+        if not self.legacy:
+            try:
+                infos = list(self.repository.store_list("archives"))
+            except ObjectNotFound:
+                infos = []
+            return len(infos)  # we do not check here if entries are valid
+        else:
+            return len(self._archives)
 
     def exists(self, name):
         # check if an archive with this name exists
         assert isinstance(name, str)
-        return name in self._archives
+        if not self.legacy:
+            return name in self.names()
+        else:
+            return name in self._archives
+
+    def _infos(self):
+        # yield the infos of all archives: (store_key, archive_info)
+        from .helpers import msgpack
+
+        if not self.legacy:
+            try:
+                infos = list(self.repository.store_list("archives"))
+            except ObjectNotFound:
+                infos = []
+            for info in infos:
+                info = ItemInfo(*info)  # RPC does not give us a NamedTuple
+                value = self.repository.store_load(f"archives/{info.name}")
+                _, value = self.manifest.repo_objs.parse(hex_to_bin(info.name), value, ro_type=ROBJ_MANIFEST)
+                archive_info = msgpack.unpackb(value)
+                yield info.name, archive_info
+        else:
+            for name in self._archives:
+                archive_info = dict(name=name, id=self._archives[name]["id"], time=self._archives[name]["time"])
+                yield None, archive_info
+
+    def _lookup_name(self, name, raw=False):
+        assert isinstance(name, str)
+        assert not self.legacy
+        for store_key, archive_info in self._infos():
+            if archive_info["name"] == name:
+                if not raw:
+                    ts = parse_timestamp(archive_info["time"])
+                    return store_key, ArchiveInfo(name=name, id=archive_info["id"], ts=ts)
+                else:
+                    return store_key, archive_info
+        else:
+            raise KeyError(name)
 
     def names(self):
         # yield the names of all archives
-        yield from self._archives
+        if not self.legacy:
+            for _, archive_info in self._infos():
+                yield archive_info["name"]
+        else:
+            yield from self._archives
 
     def get(self, name, raw=False):
         assert isinstance(name, str)
-        values = self._archives.get(name)
-        if values is None:
-            raise KeyError
-        if not raw:
-            ts = parse_timestamp(values["time"])
-            return ArchiveInfo(name=name, id=values["id"], ts=ts)
+        if not self.legacy:
+            try:
+                store_key, archive_info = self._lookup_name(name, raw=raw)
+                return archive_info
+            except KeyError:
+                return None
         else:
-            return dict(name=name, id=values["id"], time=values["time"])
+            values = self._archives.get(name)
+            if values is None:
+                return None
+            if not raw:
+                ts = parse_timestamp(values["time"])
+                return ArchiveInfo(name=name, id=values["id"], ts=ts)
+            else:
+                return dict(name=name, id=values["id"], time=values["time"])
 
     def create(self, name, id, ts, *, overwrite=False):
         assert isinstance(name, str)
@@ -173,14 +184,36 @@ class Archives:
         if isinstance(ts, datetime):
             ts = ts.isoformat(timespec="microseconds")
         assert isinstance(ts, str)
-        if name in self._archives and not overwrite:
-            raise KeyError("archive already exists")
-        self._archives[name] = {"id": id, "time": ts}
+        if not self.legacy:
+            try:
+                store_key, _ = self._lookup_name(name)
+            except KeyError:
+                pass
+            else:
+                # looks like we already have an archive list entry with that name
+                if not overwrite:
+                    raise KeyError("archive already exists")
+                else:
+                    self.repository.store_delete(f"archives/{store_key}")
+            archive = dict(name=name, id=id, time=ts)
+            value = self.manifest.key.pack_metadata(archive)
+            id = self.manifest.repo_objs.id_hash(value)
+            key = bin_to_hex(id)
+            value = self.manifest.repo_objs.format(id, {}, value, ro_type=ROBJ_MANIFEST)
+            self.repository.store_store(f"archives/{key}", value)
+        else:
+            if self.exists(name) and not overwrite:
+                raise KeyError("archive already exists")
+            self._archives[name] = {"id": id, "time": ts}
 
     def delete(self, name):
         # delete an archive
         assert isinstance(name, str)
-        self._archives.pop(name)
+        if not self.legacy:
+            store_key, archive_info = self._lookup_name(name)
+            self.repository.store_delete(f"archives/{store_key}")
+        else:
+            self._archives.pop(name)
 
     def list(
         self,
@@ -297,7 +330,7 @@ class Manifest:
     MANIFEST_ID = b"\0" * 32
 
     def __init__(self, key, repository, item_keys=None, ro_cls=RepoObj):
-        self.archives = Archives(repository)
+        self.archives = Archives(repository, self)
         self.config = {}
         self.key = key
         self.repo_objs = ro_cls(key)

+ 1 - 1
src/borg/testsuite/archiver/create_cmd.py

@@ -646,7 +646,7 @@ def test_create_dry_run(archivers, request):
     # Make sure no archive has been created
     with Repository(archiver.repository_path) as repository:
         manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
-    assert manifest.archives.count() == 0
+        assert manifest.archives.count() == 0
 
 
 def test_progress_on(archivers, request):

+ 3 - 3
src/borg/testsuite/archiver/rename_cmd.py

@@ -23,6 +23,6 @@ def test_rename(archivers, request):
     # Make sure both archives have been renamed
     with Repository(archiver.repository_path) as repository:
         manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
-    assert manifest.archives.count() == 2
-    assert manifest.archives.exists("test.3")
-    assert manifest.archives.exists("test.4")
+        assert manifest.archives.count() == 2
+        assert manifest.archives.exists("test.3")
+        assert manifest.archives.exists("test.4")