Ver Fonte

fix upgrader

Thomas Waldmann há 2 anos atrás
pai
commit
1e156ca02b

+ 7 - 8
src/borg/archive.py

@@ -370,7 +370,7 @@ class CacheChunkBuffer(ChunkBuffer):
         self.stats = stats
 
     def write_chunk(self, chunk):
-        id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats, wait=False)
+        id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False)
         self.cache.repository.async_response(wait=False)
         return id_
 
@@ -415,7 +415,7 @@ def archive_put_items(chunk_ids, *, repo_objs, cache=None, stats=None, add_refer
         data = msgpack.packb(chunk_ids[i : i + IDS_PER_CHUNK])
         id = repo_objs.id_hash(data)
         if cache is not None and stats is not None:
-            cache.add_chunk(id, data, stats)
+            cache.add_chunk(id, {}, data, stats=stats)
         elif add_reference is not None:
             cdata = repo_objs.format(id, {}, data)
             add_reference(id, len(data), cdata)
@@ -667,7 +667,7 @@ Duration: {0.duration}
         data = self.key.pack_and_authenticate_metadata(metadata.as_dict(), context=b"archive")
         self.id = self.repo_objs.id_hash(data)
         try:
-            self.cache.add_chunk(self.id, data, self.stats)
+            self.cache.add_chunk(self.id, {}, data, stats=self.stats)
         except IntegrityError as err:
             err_msg = str(err)
             # hack to avoid changing the RPC protocol by introducing new (more specific) exception class
@@ -967,7 +967,7 @@ Duration: {0.duration}
             del metadata.items
         data = msgpack.packb(metadata.as_dict())
         new_id = self.key.id_hash(data)
-        self.cache.add_chunk(new_id, data, self.stats)
+        self.cache.add_chunk(new_id, {}, data, stats=self.stats)
         self.manifest.archives[self.name] = (new_id, metadata.time)
         self.cache.chunk_decref(self.id, self.stats)
         self.id = new_id
@@ -1233,7 +1233,7 @@ class ChunksProcessor:
 
             def chunk_processor(chunk):
                 chunk_id, data = cached_hash(chunk, self.key.id_hash)
-                chunk_entry = cache.add_chunk(chunk_id, data, stats, wait=False)
+                chunk_entry = cache.add_chunk(chunk_id, {}, data, stats=stats, wait=False)
                 self.cache.repository.async_response(wait=False)
                 return chunk_entry
 
@@ -2269,8 +2269,7 @@ class ArchiveRecreater:
         overwrite = self.recompress
         if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks:
             # Check if this chunk is already compressed the way we want it
-            old_meta, old_data = self.repo_objs.parse(chunk_id, self.repository.get(chunk_id), decompress=False)
-            # TODO simplify code below
+            old_meta = self.repo_objs.parse_meta(chunk_id, self.repository.get(chunk_id))
             compr_hdr = bytes((old_meta["ctype"], old_meta["clevel"]))
             compressor_cls, level = Compressor.detect(compr_hdr)
             if (
@@ -2279,7 +2278,7 @@ class ArchiveRecreater:
             ):
                 # Stored chunk has the same compression method and level as we wanted
                 overwrite = False
-        chunk_entry = self.cache.add_chunk(chunk_id, data, target.stats, overwrite=overwrite, wait=False)
+        chunk_entry = self.cache.add_chunk(chunk_id, {}, data, stats=target.stats, overwrite=overwrite, wait=False)
         self.cache.repository.async_response(wait=False)
         self.seen_chunks.add(chunk_entry.id)
         return chunk_entry

+ 3 - 2
src/borg/archiver/transfer_cmd.py

@@ -70,9 +70,10 @@ class TransferMixIn:
                                     cdata = other_repository.get(chunk_id)
                                     # keep compressed payload same, avoid decompression / recompression
                                     meta, data = other_manifest.repo_objs.parse(chunk_id, cdata, decompress=False)
-                                    data = upgrader.upgrade_compressed_chunk(chunk=data)
+                                    meta, data = upgrader.upgrade_compressed_chunk(meta, data)
+
                                     chunk_entry = cache.add_chunk(
-                                        chunk_id, data, archive.stats, wait=False, compress=False, size=size
+                                        chunk_id, meta, data, stats=archive.stats, wait=False, compress=False, size=size
                                     )
                                     cache.repository.async_response(wait=False)
                                     chunks.append(chunk_entry)

+ 8 - 8
src/borg/cache.py

@@ -943,18 +943,18 @@ class LocalCache(CacheStatsMixin):
         self.cache_config.ignored_features.update(repo_features - my_features)
         self.cache_config.mandatory_features.update(repo_features & my_features)
 
-    def add_chunk(self, id, chunk, stats, *, overwrite=False, wait=True, compress=True, size=None):
+    def add_chunk(self, id, meta, data, *, stats, overwrite=False, wait=True, compress=True, size=None):
         if not self.txn_active:
             self.begin_txn()
         if size is None and compress:
-            size = len(chunk)  # chunk is still uncompressed
+            size = len(data)  # data is still uncompressed
         refcount = self.seen_chunk(id, size)
         if refcount and not overwrite:
             return self.chunk_incref(id, stats)
         if size is None:
             raise ValueError("when giving compressed data for a new chunk, the uncompressed size must be given also")
-        data = self.repo_objs.format(id, {}, chunk, compress=compress, size=size)
-        self.repository.put(id, data, wait=wait)
+        cdata = self.repo_objs.format(id, meta, data, compress=compress, size=size)
+        self.repository.put(id, cdata, wait=wait)
         self.chunks.add(id, 1, size)
         stats.update(size, not refcount)
         return ChunkListEntry(id, size)
@@ -1115,19 +1115,19 @@ Chunk index:    {0.total_unique_chunks:20d}             unknown"""
     def memorize_file(self, hashed_path, path_hash, st, ids):
         pass
 
-    def add_chunk(self, id, chunk, stats, *, overwrite=False, wait=True, compress=True, size=None):
+    def add_chunk(self, id, meta, data, *, stats, overwrite=False, wait=True, compress=True, size=None):
         assert not overwrite, "AdHocCache does not permit overwrites — trying to use it for recreate?"
         if not self._txn_active:
             self.begin_txn()
         if size is None and compress:
-            size = len(chunk)  # chunk is still uncompressed
+            size = len(data)  # data is still uncompressed
         if size is None:
             raise ValueError("when giving compressed data for a chunk, the uncompressed size must be given also")
         refcount = self.seen_chunk(id, size)
         if refcount:
             return self.chunk_incref(id, stats, size=size)
-        data = self.repo_objs.format(id, {}, chunk, compress=compress)
-        self.repository.put(id, data, wait=wait)
+        cdata = self.repo_objs.format(id, meta, data, compress=compress)
+        self.repository.put(id, cdata, wait=wait)
         self.chunks.add(id, 1, size)
         stats.update(size, not refcount)
         return ChunkListEntry(id, size)

+ 4 - 1
src/borg/repoobj.py

@@ -92,10 +92,13 @@ class RepoObj:
         if decompress:
             ctype = meta["ctype"]
             clevel = meta["clevel"]
+            csize = meta["csize"]  # for obfuscation purposes, data_compressed may be longer than csize
             compr_hdr = bytes((ctype, clevel))
             compressor_cls, compression_level = Compressor.detect(compr_hdr)
             compressor = compressor_cls(level=compression_level)
-            data = compressor.decompress(compr_hdr + data_compressed)  # TODO: decompressor still needs type/level bytes
+            data = compressor.decompress(
+                compr_hdr + data_compressed[:csize]
+            )  # TODO: decompressor still needs type/level bytes
             self.key.assert_id(id, data)
         else:
             data = data_compressed  # does not include the type/level bytes

+ 3 - 3
src/borg/testsuite/archive.py

@@ -101,9 +101,9 @@ class MockCache:
         self.objects = {}
         self.repository = self.MockRepo()
 
-    def add_chunk(self, id, chunk, stats=None, wait=True):
-        self.objects[id] = chunk
-        return id, len(chunk)
+    def add_chunk(self, id, meta, data, stats=None, wait=True):
+        self.objects[id] = data
+        return id, len(data)
 
 
 class ArchiveTimestampTestCase(BaseTestCase):

+ 2 - 1
src/borg/testsuite/archiver.py

@@ -4014,7 +4014,8 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase):
             for item in archive.iter_items():
                 if item.path.endswith("testsuite/archiver.py"):
                     chunk = item.chunks[-1]
-                    data = repository.get(chunk.id) + b"1234"
+                    data = repository.get(chunk.id)
+                    data = data[0:100] + b"x" + data[101:]
                     repository.put(chunk.id, data)
                     break
             repository.commit(compact=False)

+ 5 - 5
src/borg/testsuite/cache.py

@@ -187,14 +187,14 @@ class TestAdHocCache:
 
     def test_does_not_overwrite(self, cache):
         with pytest.raises(AssertionError):
-            cache.add_chunk(H(1), b"5678", Statistics(), overwrite=True)
+            cache.add_chunk(H(1), {}, b"5678", stats=Statistics(), overwrite=True)
 
     def test_seen_chunk_add_chunk_size(self, cache):
-        assert cache.add_chunk(H(1), b"5678", Statistics()) == (H(1), 4)
+        assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4)
 
     def test_deletes_chunks_during_lifetime(self, cache, repository):
         """E.g. checkpoint archives"""
-        cache.add_chunk(H(5), b"1010", Statistics())
+        cache.add_chunk(H(5), {}, b"1010", stats=Statistics())
         assert cache.seen_chunk(H(5)) == 1
         cache.chunk_decref(H(5), Statistics())
         assert not cache.seen_chunk(H(5))
@@ -216,10 +216,10 @@ class TestAdHocCache:
         assert not hasattr(cache, "chunks")
 
     def test_incref_after_add_chunk(self, cache):
-        assert cache.add_chunk(H(3), b"5678", Statistics()) == (H(3), 4)
+        assert cache.add_chunk(H(3), {}, b"5678", stats=Statistics()) == (H(3), 4)
         assert cache.chunk_incref(H(3), Statistics()) == (H(3), 4)
 
     def test_existing_incref_after_add_chunk(self, cache):
         """This case occurs with part files, see Archive.chunk_file."""
-        assert cache.add_chunk(H(1), b"5678", Statistics()) == (H(1), 4)
+        assert cache.add_chunk(H(1), {}, b"5678", stats=Statistics()) == (H(1), 4)
         assert cache.chunk_incref(H(1), Statistics()) == (H(1), 4)

+ 26 - 23
src/borg/upgrade.py

@@ -19,8 +19,8 @@ class UpgraderNoOp:
     def upgrade_item(self, *, item):
         return item
 
-    def upgrade_compressed_chunk(self, *, chunk):
-        return chunk
+    def upgrade_compressed_chunk(self, *, meta, data):
+        return meta, data
 
     def upgrade_archive_metadata(self, *, metadata):
         new_metadata = {}
@@ -98,33 +98,36 @@ class UpgraderFrom12To20:
         assert all(key in new_item for key in REQUIRED_ITEM_KEYS)
         return new_item
 
-    def upgrade_compressed_chunk(self, *, chunk):
-        def upgrade_zlib_and_level(chunk):
-            if ZLIB_legacy.detect(chunk):
-                ctype = ZLIB.ID
-                chunk = ctype + level + bytes(chunk)  # get rid of the legacy: prepend separate type/level bytes
+    def upgrade_compressed_chunk(self, *, meta, data):
+        # meta/data was parsed via RepoObj1.parse, which returns data **including** the ctype/clevel bytes prefixed
+        def upgrade_zlib_and_level(meta, data):
+            if ZLIB_legacy.detect(data):
+                ctype = ZLIB.ID[0]
+                data = bytes(data)  # ZLIB_legacy has no ctype/clevel prefix
             else:
-                ctype = bytes(chunk[0:1])
-                chunk = ctype + level + bytes(chunk[2:])  # keep type same, but set level
-            return chunk
+                ctype = data[0]
+                data = bytes(data[2:])  # strip ctype/clevel bytes
+            meta["ctype"] = ctype
+            meta["clevel"] = level
+            meta["csize"] = len(data)  # we may have stripped some prefixed ctype/clevel bytes
+            return meta, data
 
-        ctype = chunk[0:1]
-        level = b"\xFF"  # FF means unknown compression level
+        ctype = data[0]
+        level = 0xFF  # means unknown compression level
 
         if ctype == ObfuscateSize.ID:
             # in older borg, we used unusual byte order
-            old_header_fmt = Struct(">I")
-            new_header_fmt = ObfuscateSize.header_fmt
-            length = ObfuscateSize.header_len
-            size_bytes = chunk[2 : 2 + length]
-            size = old_header_fmt.unpack(size_bytes)
-            size_bytes = new_header_fmt.pack(size)
-            compressed = chunk[2 + length :]
-            compressed = upgrade_zlib_and_level(compressed)
-            chunk = ctype + level + size_bytes + compressed
+            borg1_header_fmt = Struct(">I")
+            hlen = borg1_header_fmt.size
+            csize_bytes = data[2 : 2 + hlen]
+            csize = borg1_header_fmt.unpack(csize_bytes)
+            compressed = data[2 + hlen : 2 + hlen + csize]
+            meta, compressed = upgrade_zlib_and_level(meta, compressed)
+            osize = len(data) - 2 - hlen - csize  # amount of 0x00 bytes appended for obfuscation
+            data = compressed + bytes(osize)
         else:
-            chunk = upgrade_zlib_and_level(chunk)
-        return chunk
+            meta, data = upgrade_zlib_and_level(meta, data)
+        return meta, data
 
     def upgrade_archive_metadata(self, *, metadata):
         new_metadata = {}