Parcourir la source

integrate Chunk type, avoid hashing holes

Thomas Waldmann il y a 4 ans
Parent
commit
52bd55b29a
1 fichiers modifiés avec 30 ajouts et 8 suppressions
  1. 30 8
      src/borg/archive.py

+ 30 - 8
src/borg/archive.py

@@ -19,7 +19,7 @@ from .logger import create_logger
 logger = create_logger()
 
 from . import xattr
-from .chunker import get_chunker, max_chunk_size
+from .chunker import get_chunker, max_chunk_size, Chunk
 from .cache import ChunkListEntry
 from .crypto.key import key_factory
 from .compress import Compressor, CompressionSpec
@@ -43,6 +43,7 @@ from .helpers import msgpack
 from .helpers import sig_int
 from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
 from .item import Item, ArchiveItem, ItemDiff
+from .lrucache import LRUCache
 from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
 from .remote import cache_if_remote
 from .repository import Repository, LIST_SCAN_LIMIT
@@ -336,7 +337,9 @@ class ChunkBuffer:
         self.buffer.seek(0)
         # The chunker returns a memoryview to its internal buffer,
         # thus a copy is needed before resuming the chunker iterator.
-        chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
+        # note: this is the items metadata stream chunker, we only will get CH_DATA allocation here,
+        #       thus chunk.data will always be data bytes.
+        chunks = list(bytes(chunk.data) for chunk in self.chunker.chunkify(self.buffer))
         self.buffer.seek(0)
         self.buffer.truncate(0)
         # Leave the last partial chunk in the buffer unless flush is True
@@ -1102,6 +1105,8 @@ class ChunksProcessor:
         self.checkpoint_interval = checkpoint_interval
         self.last_checkpoint = time.monotonic()
         self.rechunkify = rechunkify
+        self.zero_chunk_ids = LRUCache(10, dispose=lambda _: None)  # length of all-zero chunk -> chunk_id
+        self.zeros = memoryview(bytes(MAX_DATA_SIZE))
 
     def write_part_file(self, item, from_chunk, number):
         item = Item(internal_dict=item.as_dict())
@@ -1133,8 +1138,22 @@ class ChunksProcessor:
 
     def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None):
         if not chunk_processor:
-            def chunk_processor(data):
-                chunk_entry = cache.add_chunk(self.key.id_hash(data), data, stats, wait=False)
+            def chunk_processor(chunk):
+                allocation = chunk.meta['allocation']
+                if allocation == CH_DATA:
+                    data = chunk.data
+                    chunk_id = self.key.id_hash(data)
+                elif allocation == CH_HOLE:
+                    size = chunk.meta['size']
+                    data = self.zeros[:size]
+                    try:
+                        chunk_id = self.zero_chunk_ids[size]
+                    except KeyError:
+                        chunk_id = self.key.id_hash(data)
+                        self.zero_chunk_ids[size] = chunk_id
+                else:
+                    raise ValueError('unexpected allocation type')
+                chunk_entry = cache.add_chunk(chunk_id, data, stats, wait=False)
                 self.cache.repository.async_response(wait=False)
                 return chunk_entry
 
@@ -1145,8 +1164,8 @@ class ChunksProcessor:
             del item.chunks_healthy
         from_chunk = 0
         part_number = 1
-        for data in chunk_iter:
-            item.chunks.append(chunk_processor(data))
+        for chunk in chunk_iter:
+            item.chunks.append(chunk_processor(chunk))
             if show_progress:
                 stats.show_progress(item=item, dt=0.2)
             from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False)
@@ -1982,7 +2001,10 @@ class ArchiveRecreater:
         chunk_processor = partial(self.chunk_processor, target)
         target.process_file_chunks(item, self.cache, target.stats, self.progress, chunk_iterator, chunk_processor)
 
-    def chunk_processor(self, target, data):
+    def chunk_processor(self, target, chunk):
+        # as this is recreate (we do not read from the fs), we never have holes here
+        assert chunk.meta['allocation'] == CH_DATA
+        data = chunk.data
         chunk_id = self.key.id_hash(data)
         if chunk_id in self.seen_chunks:
             return self.cache.chunk_incref(chunk_id, target.stats)
@@ -2007,7 +2029,7 @@ class ArchiveRecreater:
             yield from target.chunker.chunkify(file)
         else:
             for chunk in chunk_iterator:
-                yield chunk
+                yield Chunk(chunk, size=len(chunk), allocation=CH_DATA)
 
     def save(self, archive, target, comment=None, replace_original=True):
         if self.dry_run: