Browse Source

refactor: create transfer_chunks function

Thomas Waldmann 1 month ago
parent
commit
fee5600481
1 changed files with 66 additions and 59 deletions
  1. 66 59
      src/borg/archiver/transfer_cmd.py

+ 66 - 59
src/borg/archiver/transfer_cmd.py

@@ -17,6 +17,60 @@ from ..logger import create_logger
 logger = create_logger()
 
 
+def transfer_chunks(upgrader, other_repository, other_manifest, other_chunks, archive, cache, recompress, dry_run):
+    transfer = 0
+    present = 0
+    chunks = []
+    for chunk_id, size in other_chunks:
+        chunk_present = cache.seen_chunk(chunk_id, size)
+        if not chunk_present:  # target repo does not yet have this chunk
+            if not dry_run:
+                try:
+                    cdata = other_repository.get(chunk_id)
+                except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound):
+                    # missing correct chunk in other_repository (source) will result in
+                    # a missing chunk in repository (destination).
+                    # we do NOT want to transfer all-zero replacement chunks from borg1 repos.
+                    pass
+                else:
+                    if recompress == "never":
+                        # keep compressed payload same, verify via assert_id (that will
+                        # decompress, but avoid needing to compress it again):
+                        meta, data = other_manifest.repo_objs.parse(
+                            chunk_id, cdata, decompress=True, want_compressed=True, ro_type=ROBJ_FILE_STREAM
+                        )
+                        meta, data = upgrader.upgrade_compressed_chunk(meta, data)
+                        chunk_entry = cache.add_chunk(
+                            chunk_id,
+                            meta,
+                            data,
+                            stats=archive.stats,
+                            wait=False,
+                            compress=False,
+                            size=size,
+                            ctype=meta["ctype"],
+                            clevel=meta["clevel"],
+                            ro_type=ROBJ_FILE_STREAM,
+                        )
+                    elif recompress == "always":
+                        # always decompress and re-compress file data chunks
+                        meta, data = other_manifest.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_FILE_STREAM)
+                        chunk_entry = cache.add_chunk(
+                            chunk_id, meta, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM
+                        )
+                    else:
+                        raise ValueError(f"unsupported recompress mode: {recompress}")
+                cache.repository.async_response(wait=False)
+                chunks.append(chunk_entry)
+            transfer += size
+        else:
+            if not dry_run:
+                chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats)
+                chunks.append(chunk_entry)
+            present += size
+    return chunks, transfer, present
+
+
 class TransferMixIn:
     @with_other_repository(manifest=True, compatibility=(Manifest.Operation.READ,))
     @with_repository(manifest=True, cache=True, compatibility=(Manifest.Operation.WRITE,))
@@ -120,68 +174,21 @@ class TransferMixIn:
                     else:
                         other_chunks = None
                     if other_chunks is not None:
-                        chunks = []
-                        for chunk_id, size in other_chunks:
-                            chunk_present = cache.seen_chunk(chunk_id, size)
-                            if not chunk_present:  # target repo does not yet have this chunk
-                                if not dry_run:
-                                    try:
-                                        cdata = other_repository.get(chunk_id)
-                                    except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound):
-                                        # missing correct chunk in other_repository (source) will result in
-                                        # a missing chunk in repository (destination).
-                                        # we do NOT want to transfer all-zero replacement chunks from borg1 repos.
-                                        pass
-                                    else:
-                                        if args.recompress == "never":
-                                            # keep compressed payload same, verify via assert_id (that will
-                                            # decompress, but avoid needing to compress it again):
-                                            meta, data = other_manifest.repo_objs.parse(
-                                                chunk_id,
-                                                cdata,
-                                                decompress=True,
-                                                want_compressed=True,
-                                                ro_type=ROBJ_FILE_STREAM,
-                                            )
-                                            meta, data = upgrader.upgrade_compressed_chunk(meta, data)
-                                            chunk_entry = cache.add_chunk(
-                                                chunk_id,
-                                                meta,
-                                                data,
-                                                stats=archive.stats,
-                                                wait=False,
-                                                compress=False,
-                                                size=size,
-                                                ctype=meta["ctype"],
-                                                clevel=meta["clevel"],
-                                                ro_type=ROBJ_FILE_STREAM,
-                                            )
-                                        elif args.recompress == "always":
-                                            # always decompress and re-compress file data chunks
-                                            meta, data = other_manifest.repo_objs.parse(
-                                                chunk_id, cdata, ro_type=ROBJ_FILE_STREAM
-                                            )
-                                            chunk_entry = cache.add_chunk(
-                                                chunk_id,
-                                                meta,
-                                                data,
-                                                stats=archive.stats,
-                                                wait=False,
-                                                ro_type=ROBJ_FILE_STREAM,
-                                            )
-                                        else:
-                                            raise ValueError(f"unsupported recompress mode: {args.recompress}")
-                                    cache.repository.async_response(wait=False)
-                                    chunks.append(chunk_entry)
-                                transfer_size += size
-                            else:
-                                if not dry_run:
-                                    chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats)
-                                    chunks.append(chunk_entry)
-                                present_size += size
+                        chunks, transfer, present = transfer_chunks(
+                            upgrader,
+                            other_repository,
+                            other_manifest,
+                            other_chunks,
+                            archive,
+                            cache,
+                            args.recompress,
+                            dry_run,
+                        )
                         if not dry_run:
                             item.chunks = chunks
                             archive.stats.nfiles += 1
+                        transfer_size += transfer
+                        present_size += present
                     if not dry_run:
                         item = upgrader.upgrade_item(item=item)
                         archive.add_item(item, show_progress=args.progress)