浏览代码

remove part files from final archive

checkpoint archives might have a single, incomplete part file as last item.
part files are always a prefix of the full file, growing in size from
checkpoint to checkpoint.

we now manage the archive items metadata stream in a special way:
- checkpoint archive A(n) might end with a partial item PI(n)
- checkpoint archive A(n+1) does not contain PI(n)
- checkpoint archive A(n+1) contains a new partial item PI(n+1)
- the final archive does not contain any partial items
Thomas Waldmann 2 年之前
父节点
当前提交
0fed44110a
共有 4 个文件被更改,包括 58 次插入50 次删除
  1. 53 29
      src/borg/archive.py
  2. 1 0
      src/borg/archiver/create_cmd.py
  3. 1 0
      src/borg/archiver/tar_cmds.py
  4. 3 21
      src/borg/testsuite/archiver/create_cmd.py

+ 53 - 29
src/borg/archive.py

@@ -353,6 +353,7 @@ class ChunkBuffer:
         self.chunks = []
         self.key = key
         self.chunker = get_chunker(*chunker_params, seed=self.key.chunk_seed, sparse=False)
+        self.saved_chunks_len = None
 
     def add(self, item):
         self.buffer.write(self.packer.pack(item.as_dict()))
@@ -392,6 +393,18 @@ class ChunkBuffer:
     def is_full(self):
         return self.buffer.tell() > self.BUFFER_SIZE
 
+    def save_chunks_state(self):
+        # as we only append to self.chunks, remembering the current length is good enough
+        self.saved_chunks_len = len(self.chunks)
+
+    def restore_chunks_state(self):
+        scl = self.saved_chunks_len
+        assert scl is not None, "forgot to call save_chunks_state?"
+        tail_chunks = self.chunks[scl:]
+        del self.chunks[scl:]
+        self.saved_chunks_len = None
+        return tail_chunks
+
 
 class CacheChunkBuffer(ChunkBuffer):
     def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
@@ -649,6 +662,15 @@ Duration: {0.duration}
             stats.show_progress(item=item, dt=0.2)
         self.items_buffer.add(item)
 
+    def prepare_checkpoint(self):
+        # we need to flush the archive metadata stream to repo chunks, so that
+        # we have the metadata stream chunks WITHOUT the part file item we add later.
+        # The part file item will then get into its own metadata stream chunk, which we
+        # can easily NOT include into the next checkpoint or the final archive.
+        self.items_buffer.flush(flush=True)
+        # remember the current state of self.chunks, which corresponds to the flushed chunks
+        self.items_buffer.save_chunks_state()
+
     def write_checkpoint(self):
         metadata = self.save(self.checkpoint_name)
         # that .save() has committed the repo.
@@ -660,6 +682,11 @@ Duration: {0.duration}
         self.cache.chunk_decref(self.id, self.stats)
         for id in metadata.item_ptrs:
             self.cache.chunk_decref(id, self.stats)
+        # also get rid of that part item, we do not want to have it in next checkpoint or final archive
+        tail_chunks = self.items_buffer.restore_chunks_state()
+        # tail_chunks contain the tail of the archive items metadata stream, not needed for next commit.
+        for id in tail_chunks:
+            self.cache.chunk_decref(id, self.stats)
 
     def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
         name = name or self.name
@@ -1234,10 +1261,22 @@ def cached_hash(chunk, id_hash):
 class ChunksProcessor:
     # Processes an iterator of chunks for an Item
 
-    def __init__(self, *, key, cache, add_item, write_checkpoint, checkpoint_interval, checkpoint_volume, rechunkify):
+    def __init__(
+        self,
+        *,
+        key,
+        cache,
+        add_item,
+        prepare_checkpoint,
+        write_checkpoint,
+        checkpoint_interval,
+        checkpoint_volume,
+        rechunkify,
+    ):
         self.key = key
         self.cache = cache
         self.add_item = add_item
+        self.prepare_checkpoint = prepare_checkpoint
         self.write_checkpoint = write_checkpoint
         self.rechunkify = rechunkify
         # time interval based checkpointing
@@ -1248,38 +1287,35 @@ class ChunksProcessor:
         self.current_volume = 0
         self.last_volume_checkpoint = 0
 
-    def write_part_file(self, item, from_chunk, number):
+    def write_part_file(self, item):
+        self.prepare_checkpoint()
         item = Item(internal_dict=item.as_dict())
-        length = len(item.chunks)
-        # the item should only have the *additional* chunks we processed after the last partial item:
-        item.chunks = item.chunks[from_chunk:]
         # for borg recreate, we already have a size member in the source item (giving the total file size),
         # but we consider only a part of the file here, thus we must recompute the size from the chunks:
         item.get_size(memorize=True, from_chunks=True)
-        item.path += ".borg_part_%d" % number
-        item.part = number
-        number += 1
+        item.path += ".borg_part"
+        item.part = 1  # used to be an increasing number, but now just always 1 IF this is a partial file
         self.add_item(item, show_progress=False)
         self.write_checkpoint()
-        return length, number
 
-    def maybe_checkpoint(self, item, from_chunk, part_number, forced=False):
+    def maybe_checkpoint(self, item):
+        checkpoint_done = False
         sig_int_triggered = sig_int and sig_int.action_triggered()
         if (
-            forced
-            or sig_int_triggered
+            sig_int_triggered
             or (self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval)
             or (self.checkpoint_volume and self.current_volume - self.last_volume_checkpoint >= self.checkpoint_volume)
         ):
             if sig_int_triggered:
                 logger.info("checkpoint requested: starting checkpoint creation...")
-            from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
+            self.write_part_file(item)
+            checkpoint_done = True
             self.last_checkpoint = time.monotonic()
             self.last_volume_checkpoint = self.current_volume
             if sig_int_triggered:
                 sig_int.action_completed()
                 logger.info("checkpoint requested: finished checkpoint creation!")
-        return from_chunk, part_number
+        return checkpoint_done  # whether a checkpoint archive was created
 
     def process_file_chunks(self, item, cache, stats, show_progress, chunk_iter, chunk_processor=None):
         if not chunk_processor:
@@ -1297,28 +1333,15 @@ class ChunksProcessor:
         # to get rid of .chunks_healthy, as it might not correspond to .chunks any more.
         if self.rechunkify and "chunks_healthy" in item:
             del item.chunks_healthy
-        from_chunk = 0
-        part_number = 1
         for chunk in chunk_iter:
             cle = chunk_processor(chunk)
             item.chunks.append(cle)
             self.current_volume += cle[1]
             if show_progress:
                 stats.show_progress(item=item, dt=0.2)
-            from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False)
+            self.maybe_checkpoint(item)
         else:
-            if part_number > 1:
-                if item.chunks[from_chunk:]:
-                    # if we already have created a part item inside this file, we want to put the final
-                    # chunks (if any) into a part item also (so all parts can be concatenated to get
-                    # the complete file):
-                    from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=True)
-
-                # if we created part files, we have referenced all chunks from the part files,
-                # but we also will reference the same chunks also from the final, complete file:
-                for chunk in item.chunks:
-                    cache.chunk_incref(chunk.id, stats, size=chunk.size, part=True)
-                stats.nfiles_parts += part_number - 1
+            stats.nfiles_parts += 0  # TODO: remove tracking of this
 
 
 class FilesystemObjectProcessors:
@@ -2474,6 +2497,7 @@ class ArchiveRecreater:
             cache=self.cache,
             key=self.key,
             add_item=target.add_item,
+            prepare_checkpoint=target.prepare_checkpoint,
             write_checkpoint=target.write_checkpoint,
             checkpoint_interval=self.checkpoint_interval,
             checkpoint_volume=self.checkpoint_volume,

+ 1 - 0
src/borg/archiver/create_cmd.py

@@ -255,6 +255,7 @@ class CreateMixIn:
                     cache=cache,
                     key=key,
                     add_item=archive.add_item,
+                    prepare_checkpoint=archive.prepare_checkpoint,
                     write_checkpoint=archive.write_checkpoint,
                     checkpoint_interval=args.checkpoint_interval,
                     checkpoint_volume=args.checkpoint_volume,

+ 1 - 0
src/borg/archiver/tar_cmds.py

@@ -271,6 +271,7 @@ class TarMixIn:
             cache=cache,
             key=key,
             add_item=archive.add_item,
+            prepare_checkpoint=archive.prepare_checkpoint,
             write_checkpoint=archive.write_checkpoint,
             checkpoint_interval=args.checkpoint_interval,
             checkpoint_volume=args.checkpoint_volume,

+ 3 - 21
src/borg/testsuite/archiver/create_cmd.py

@@ -182,27 +182,9 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         )
         # repo looking good overall? checks for rc == 0.
         self.cmd(f"--repo={self.repository_location}", "check", "--debug")
-        # verify part files
-        out = self.cmd(
-            f"--repo={self.repository_location}",
-            "extract",
-            "test",
-            "stdin.borg_part_1",
-            "--consider-part-files",
-            "--stdout",
-            binary_output=True,
-        )
-        assert out == input_data[:chunk_size]
-        out = self.cmd(
-            f"--repo={self.repository_location}",
-            "extract",
-            "test",
-            "stdin.borg_part_2",
-            "--consider-part-files",
-            "--stdout",
-            binary_output=True,
-        )
-        assert out == input_data[: chunk_size - 1]
+        # verify that there are no part files in final archive
+        out = self.cmd(f"--repo={self.repository_location}", "list", "test", "--consider-part-files")
+        assert "stdin.borg_part" not in out
         # verify full file
         out = self.cmd(f"--repo={self.repository_location}", "extract", "test", "stdin", "--stdout", binary_output=True)
         assert out == input_data