Browse Source

Merge pull request #8803 from ThomasWaldmann/transfer-rechunk

transfer with re-chunking
TW 2 weeks ago
parent
commit
a764abd7b0

+ 130 - 61
src/borg/archiver/transfer_cmd.py

@@ -1,13 +1,15 @@
 import argparse
 import argparse
 
 
 from ._common import with_repository, with_other_repository, Highlander
 from ._common import with_repository, with_other_repository, Highlander
-from ..archive import Archive
+from ..archive import Archive, cached_hash, DownloadPipeline
+from ..chunker import get_chunker
 from ..compress import CompressionSpec
 from ..compress import CompressionSpec
 from ..constants import *  # NOQA
 from ..constants import *  # NOQA
 from ..crypto.key import uses_same_id_hash, uses_same_chunker_secret
 from ..crypto.key import uses_same_id_hash, uses_same_chunker_secret
 from ..helpers import Error
 from ..helpers import Error
 from ..helpers import location_validator, Location, archivename_validator, comment_validator
 from ..helpers import location_validator, Location, archivename_validator, comment_validator
 from ..helpers import format_file_size, bin_to_hex
 from ..helpers import format_file_size, bin_to_hex
+from ..helpers import ChunkerParams, ChunkIteratorFileWrapper
 from ..manifest import Manifest
 from ..manifest import Manifest
 from ..legacyrepository import LegacyRepository
 from ..legacyrepository import LegacyRepository
 from ..repository import Repository
 from ..repository import Repository
@@ -17,6 +19,103 @@ from ..logger import create_logger
 logger = create_logger()
 logger = create_logger()
 
 
 
 
+def transfer_chunks(
+    upgrader, other_repository, other_manifest, other_chunks, archive, cache, recompress, dry_run, chunker_params=None
+):
+    """
+    Transfer chunks from another repository to the current repository.
+
+    If chunker_params is provided, the chunks will be re-chunked using the specified parameters.
+    """
+    transfer = 0
+    present = 0
+    chunks = []
+
+    # Determine if re-chunking is needed
+    rechunkify = chunker_params is not None
+
+    if rechunkify:
+        # Similar to ArchiveRecreater.iter_chunks
+        pipeline = DownloadPipeline(other_manifest.repository, other_manifest.repo_objs)
+        chunk_iterator = pipeline.fetch_many(other_chunks, ro_type=ROBJ_FILE_STREAM)
+        file = ChunkIteratorFileWrapper(chunk_iterator)
+
+        # Create a chunker with the specified parameters
+        chunker = get_chunker(*chunker_params, seed=archive.key.chunk_seed, sparse=False)
+        for chunk in chunker.chunkify(file):
+            if not dry_run:
+                chunk_id, data = cached_hash(chunk, archive.key.id_hash)
+                size = len(data)
+                # Check if the chunk is already in the repository
+                chunk_present = cache.seen_chunk(chunk_id, size)
+                if chunk_present:
+                    chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats)
+                    present += size
+                else:
+                    # Add the new chunk to the repository
+                    chunk_entry = cache.add_chunk(
+                        chunk_id, {}, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM
+                    )
+                    cache.repository.async_response(wait=False)
+                    transfer += size
+                chunks.append(chunk_entry)
+            else:
+                # In dry-run mode, just estimate the size
+                size = len(chunk.data) if chunk.data is not None else chunk.size
+                transfer += size
+    else:
+        # Original implementation without re-chunking
+        for chunk_id, size in other_chunks:
+            chunk_present = cache.seen_chunk(chunk_id, size)
+            if not chunk_present:  # target repo does not yet have this chunk
+                if not dry_run:
+                    try:
+                        cdata = other_repository.get(chunk_id)
+                    except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound):
+                        # missing correct chunk in other_repository (source) will result in
+                        # a missing chunk in repository (destination).
+                        # we do NOT want to transfer all-zero replacement chunks from borg1 repos.
+                        pass
+                    else:
+                        if recompress == "never":
+                            # keep compressed payload same, verify via assert_id (that will
+                            # decompress, but avoid needing to compress it again):
+                            meta, data = other_manifest.repo_objs.parse(
+                                chunk_id, cdata, decompress=True, want_compressed=True, ro_type=ROBJ_FILE_STREAM
+                            )
+                            meta, data = upgrader.upgrade_compressed_chunk(meta, data)
+                            chunk_entry = cache.add_chunk(
+                                chunk_id,
+                                meta,
+                                data,
+                                stats=archive.stats,
+                                wait=False,
+                                compress=False,
+                                size=size,
+                                ctype=meta["ctype"],
+                                clevel=meta["clevel"],
+                                ro_type=ROBJ_FILE_STREAM,
+                            )
+                        elif recompress == "always":
+                            # always decompress and re-compress file data chunks
+                            meta, data = other_manifest.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_FILE_STREAM)
+                            chunk_entry = cache.add_chunk(
+                                chunk_id, meta, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM
+                            )
+                        else:
+                            raise ValueError(f"unsupported recompress mode: {recompress}")
+                    cache.repository.async_response(wait=False)
+                    chunks.append(chunk_entry)
+                transfer += size
+            else:
+                if not dry_run:
+                    chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats)
+                    chunks.append(chunk_entry)
+                present += size
+
+    return chunks, transfer, present
+
+
 class TransferMixIn:
 class TransferMixIn:
     @with_other_repository(manifest=True, compatibility=(Manifest.Operation.READ,))
     @with_other_repository(manifest=True, compatibility=(Manifest.Operation.READ,))
     @with_repository(manifest=True, cache=True, compatibility=(Manifest.Operation.WRITE,))
     @with_repository(manifest=True, cache=True, compatibility=(Manifest.Operation.WRITE,))
@@ -76,7 +175,7 @@ class TransferMixIn:
         if UpgraderCls is not upgrade_mod.UpgraderFrom12To20 and other_manifest.repository.version == 1:
         if UpgraderCls is not upgrade_mod.UpgraderFrom12To20 and other_manifest.repository.version == 1:
             raise Error("To transfer from a borg 1.x repo, you need to use: --upgrader=From12To20")
             raise Error("To transfer from a borg 1.x repo, you need to use: --upgrader=From12To20")
 
 
-        upgrader = UpgraderCls(cache=cache)
+        upgrader = UpgraderCls(cache=cache, args=args)
 
 
         for archive_info in archive_infos:
         for archive_info in archive_infos:
             name, id, ts = archive_info.name, archive_info.id, archive_info.ts
             name, id, ts = archive_info.name, archive_info.id, archive_info.ts
@@ -120,68 +219,22 @@ class TransferMixIn:
                     else:
                     else:
                         other_chunks = None
                         other_chunks = None
                     if other_chunks is not None:
                     if other_chunks is not None:
-                        chunks = []
-                        for chunk_id, size in other_chunks:
-                            chunk_present = cache.seen_chunk(chunk_id, size)
-                            if not chunk_present:  # target repo does not yet have this chunk
-                                if not dry_run:
-                                    try:
-                                        cdata = other_repository.get(chunk_id)
-                                    except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound):
-                                        # missing correct chunk in other_repository (source) will result in
-                                        # a missing chunk in repository (destination).
-                                        # we do NOT want to transfer all-zero replacement chunks from borg1 repos.
-                                        pass
-                                    else:
-                                        if args.recompress == "never":
-                                            # keep compressed payload same, verify via assert_id (that will
-                                            # decompress, but avoid needing to compress it again):
-                                            meta, data = other_manifest.repo_objs.parse(
-                                                chunk_id,
-                                                cdata,
-                                                decompress=True,
-                                                want_compressed=True,
-                                                ro_type=ROBJ_FILE_STREAM,
-                                            )
-                                            meta, data = upgrader.upgrade_compressed_chunk(meta, data)
-                                            chunk_entry = cache.add_chunk(
-                                                chunk_id,
-                                                meta,
-                                                data,
-                                                stats=archive.stats,
-                                                wait=False,
-                                                compress=False,
-                                                size=size,
-                                                ctype=meta["ctype"],
-                                                clevel=meta["clevel"],
-                                                ro_type=ROBJ_FILE_STREAM,
-                                            )
-                                        elif args.recompress == "always":
-                                            # always decompress and re-compress file data chunks
-                                            meta, data = other_manifest.repo_objs.parse(
-                                                chunk_id, cdata, ro_type=ROBJ_FILE_STREAM
-                                            )
-                                            chunk_entry = cache.add_chunk(
-                                                chunk_id,
-                                                meta,
-                                                data,
-                                                stats=archive.stats,
-                                                wait=False,
-                                                ro_type=ROBJ_FILE_STREAM,
-                                            )
-                                        else:
-                                            raise ValueError(f"unsupported recompress mode: {args.recompress}")
-                                    cache.repository.async_response(wait=False)
-                                    chunks.append(chunk_entry)
-                                transfer_size += size
-                            else:
-                                if not dry_run:
-                                    chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats)
-                                    chunks.append(chunk_entry)
-                                present_size += size
+                        chunks, transfer, present = transfer_chunks(
+                            upgrader,
+                            other_repository,
+                            other_manifest,
+                            other_chunks,
+                            archive,
+                            cache,
+                            args.recompress,
+                            dry_run,
+                            args.chunker_params,
+                        )
                         if not dry_run:
                         if not dry_run:
                             item.chunks = chunks
                             item.chunks = chunks
                             archive.stats.nfiles += 1
                             archive.stats.nfiles += 1
+                        transfer_size += transfer
+                        present_size += present
                     if not dry_run:
                     if not dry_run:
                         item = upgrader.upgrade_item(item=item)
                         item = upgrader.upgrade_item(item=item)
                         archive.add_item(item, show_progress=args.progress)
                         archive.add_item(item, show_progress=args.progress)
@@ -213,6 +266,7 @@ class TransferMixIn:
         This command transfers archives from one repository to another repository.
         This command transfers archives from one repository to another repository.
         Optionally, it can also upgrade the transferred data.
         Optionally, it can also upgrade the transferred data.
         Optionally, it can also recompress the transferred data.
         Optionally, it can also recompress the transferred data.
+        Optionally, it can also re-chunk the transferred data using different chunker parameters.
 
 
         It is easiest (and fastest) to give ``--compression=COMPRESSION --recompress=never`` using
         It is easiest (and fastest) to give ``--compression=COMPRESSION --recompress=never`` using
         the same COMPRESSION mode as in the SRC_REPO - borg will use that COMPRESSION for metadata (in
         the same COMPRESSION mode as in the SRC_REPO - borg will use that COMPRESSION for metadata (in
@@ -258,6 +312,10 @@ class TransferMixIn:
             borg --repo=DST_REPO transfer --other-repo=SRC_REPO --from-borg1 \\
             borg --repo=DST_REPO transfer --other-repo=SRC_REPO --from-borg1 \\
                  --compress=zstd,3 --recompress=always
                  --compress=zstd,3 --recompress=always
 
 
+            # to re-chunk using different chunker parameters:
+            borg --repo=DST_REPO transfer --other-repo=SRC_REPO \\
+                 --chunker-params=buzhash,19,23,21,4095
+
 
 
         """
         """
         )
         )
@@ -321,5 +379,16 @@ class TransferMixIn:
             "If no MODE is given, `always` will be used. "
             "If no MODE is given, `always` will be used. "
             'Not passing --recompress is equivalent to "--recompress never".',
             'Not passing --recompress is equivalent to "--recompress never".',
         )
         )
+        subparser.add_argument(
+            "--chunker-params",
+            metavar="PARAMS",
+            dest="chunker_params",
+            type=ChunkerParams,
+            default=None,
+            action=Highlander,
+            help="rechunk using given chunker parameters (ALGO, CHUNK_MIN_EXP, CHUNK_MAX_EXP, "
+            "HASH_MASK_BITS, HASH_WINDOW_SIZE) or `default` to use the chunker defaults. "
+            "default: do not rechunk",
+        )
 
 
         define_archive_filters_group(subparser)
         define_archive_filters_group(subparser)

+ 72 - 3
src/borg/testsuite/archiver/transfer_cmd_test.py

@@ -1,5 +1,7 @@
+import hashlib
 import json
 import json
 import os
 import os
+import random
 import re
 import re
 import stat
 import stat
 import tarfile
 import tarfile
@@ -8,12 +10,13 @@ from contextlib import contextmanager
 import pytest
 import pytest
 
 
 from ...constants import *  # NOQA
 from ...constants import *  # NOQA
+from ...helpers import open_item
 from ...helpers.time import parse_timestamp
 from ...helpers.time import parse_timestamp
-from ...helpers.parseformat import parse_file_size
+from ...helpers.parseformat import parse_file_size, ChunkerParams
 from ..platform_test import is_win32
 from ..platform_test import is_win32
-from . import cmd, create_test_files, RK_ENCRYPTION, open_archive, generate_archiver_tests
+from . import cmd, create_regular_file, create_test_files, RK_ENCRYPTION, open_archive, generate_archiver_tests
 
 
-pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary")  # NOQA
+pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote")  # NOQA
 
 
 
 
 def test_transfer_upgrade(archivers, request, monkeypatch):
 def test_transfer_upgrade(archivers, request, monkeypatch):
@@ -285,9 +288,11 @@ def setup_repos(archiver, mp):
     when the context manager is exited, archiver will work with REPO2 (so the transfer can be run).
     when the context manager is exited, archiver will work with REPO2 (so the transfer can be run).
     """
     """
     original_location = archiver.repository_location
     original_location = archiver.repository_location
+    original_path = archiver.repository_path
 
 
     mp.setenv("BORG_PASSPHRASE", "pw1")
     mp.setenv("BORG_PASSPHRASE", "pw1")
     archiver.repository_location = original_location + "1"
     archiver.repository_location = original_location + "1"
+    archiver.repository_path = original_path + "1"
     cmd(archiver, "repo-create", RK_ENCRYPTION)
     cmd(archiver, "repo-create", RK_ENCRYPTION)
 
 
     other_repo1 = f"--other-repo={original_location}1"
     other_repo1 = f"--other-repo={original_location}1"
@@ -296,6 +301,7 @@ def setup_repos(archiver, mp):
     mp.setenv("BORG_PASSPHRASE", "pw2")
     mp.setenv("BORG_PASSPHRASE", "pw2")
     mp.setenv("BORG_OTHER_PASSPHRASE", "pw1")
     mp.setenv("BORG_OTHER_PASSPHRASE", "pw1")
     archiver.repository_location = original_location + "2"
     archiver.repository_location = original_location + "2"
+    archiver.repository_path = original_path + "2"
     cmd(archiver, "repo-create", RK_ENCRYPTION, other_repo1)
     cmd(archiver, "repo-create", RK_ENCRYPTION, other_repo1)
 
 
 
 
@@ -400,3 +406,66 @@ def test_transfer_recompress(archivers, request, monkeypatch, recompress_mode):
         # We allow a small percentage difference to account for metadata changes.
         # We allow a small percentage difference to account for metadata changes.
         size_diff_percent = abs(source_size - dest_size) / source_size * 100
         size_diff_percent = abs(source_size - dest_size) / source_size * 100
         assert size_diff_percent < 5, f"dest_size ({dest_size}) should be similar as source_size ({source_size})."
         assert size_diff_percent < 5, f"dest_size ({dest_size}) should be similar as source_size ({source_size})."
+
+
+def test_transfer_rechunk(archivers, request, monkeypatch):
+    """Test transfer with re-chunking"""
+    archiver = request.getfixturevalue(archivers)
+
+    BLKSIZE = 4096
+    source_chunker_params = "buzhash,19,23,21,4095"  # default buzhash chunks
+    dest_chunker_params = f"fixed,{BLKSIZE}"  # fixed chunk size
+
+    with setup_repos(archiver, monkeypatch) as other_repo1:
+        contents_1 = random.randbytes(1 * BLKSIZE)
+        contents_255 = random.randbytes(255 * BLKSIZE)
+        contents_1024 = random.randbytes(1024 * BLKSIZE)
+        create_regular_file(archiver.input_path, "file_1", contents=contents_1)
+        create_regular_file(archiver.input_path, "file_256", contents=contents_255 + contents_1)
+        create_regular_file(archiver.input_path, "file_1280", contents=contents_1024 + contents_255 + contents_1)
+
+        cmd(archiver, "create", f"--chunker-params={source_chunker_params}", "archive", "input")
+
+        # Get metadata from source archive
+        source_info_json = cmd(archiver, "info", "--json", "archive")
+        source_info = json.loads(source_info_json)
+        source_archive = source_info["archives"][0]
+        source_chunker_params_info = source_archive["chunker_params"]
+
+        # Calculate SHA256 hashes of file contents from source archive
+        source_archive_obj, source_repo = open_archive(archiver.repository_path, "archive")
+        with source_repo:
+            source_file_hashes = {}
+            for item in source_archive_obj.iter_items():
+                if hasattr(item, "chunks"):  # Only process regular files with chunks
+                    f = open_item(source_archive_obj, item)
+                    content = f.read(10 * 1024 * 1024)  # Read up to 10 MB
+                    source_file_hashes[item.path] = hashlib.sha256(content).hexdigest()
+
+    # Transfer with rechunking
+    cmd(archiver, "transfer", other_repo1, f"--chunker-params={dest_chunker_params}")
+
+    # Get metadata from destination archive
+    dest_info_json = cmd(archiver, "info", "--json", "archive")
+    dest_info = json.loads(dest_info_json)
+    dest_archive = dest_info["archives"][0]
+    dest_chunker_params_info = dest_archive["chunker_params"]
+
+    # chunker params in metadata must reflect the chunker params given on the CLI
+    assert tuple(source_chunker_params_info) == ChunkerParams(source_chunker_params)
+    assert tuple(dest_chunker_params_info) == ChunkerParams(dest_chunker_params)
+
+    # Compare file hashes between source and destination archives, also check expected chunk counts.
+    dest_archive_obj, dest_repo = open_archive(archiver.repository_path, "archive")
+    with dest_repo:
+        for item in dest_archive_obj.iter_items():
+            if hasattr(item, "chunks"):  # Only process regular files with chunks
+                # Verify expected chunk count for each file
+                expected_chunk_count = {"input/file_1": 1, "input/file_256": 256, "input/file_1280": 1280}[item.path]
+                assert len(item.chunks) == expected_chunk_count
+                f = open_item(dest_archive_obj, item)
+                content = f.read(10 * 1024 * 1024)  # Read up to 10 MB
+                dest_hash = hashlib.sha256(content).hexdigest()
+                # Verify that the file hash is identical to the source
+                assert item.path in source_file_hashes, f"File {item.path} not found in source archive"
+                assert dest_hash == source_file_hashes[item.path], f"Content hash mismatch for {item.path}"

+ 17 - 7
src/borg/upgrade.py

@@ -10,8 +10,8 @@ logger = create_logger(__name__)
 
 
 
 
 class UpgraderNoOp:
 class UpgraderNoOp:
-    def __init__(self, *, cache):
-        pass
+    def __init__(self, *, cache, args):
+        self.args = args
 
 
     def new_archive(self, *, archive):
     def new_archive(self, *, archive):
         pass
         pass
@@ -37,14 +37,19 @@ class UpgraderNoOp:
         ):
         ):
             if hasattr(metadata, attr):
             if hasattr(metadata, attr):
                 new_metadata[attr] = getattr(metadata, attr)
                 new_metadata[attr] = getattr(metadata, attr)
+        rechunking = self.args.chunker_params is not None
+        if rechunking:
+            # if we are rechunking while transferring, we take the new chunker_params.
+            new_metadata["chunker_params"] = self.args.chunker_params
         return new_metadata
         return new_metadata
 
 
 
 
 class UpgraderFrom12To20:
 class UpgraderFrom12To20:
     borg1_header_fmt = Struct(">I")
     borg1_header_fmt = Struct(">I")
 
 
-    def __init__(self, *, cache):
+    def __init__(self, *, cache, args):
         self.cache = cache
         self.cache = cache
+        self.args = args
 
 
     def new_archive(self, *, archive):
     def new_archive(self, *, archive):
         self.archive = archive
         self.archive = archive
@@ -144,10 +149,15 @@ class UpgraderFrom12To20:
         for attr in ("hostname", "username", "comment", "chunker_params"):
         for attr in ("hostname", "username", "comment", "chunker_params"):
             if hasattr(metadata, attr):
             if hasattr(metadata, attr):
                 new_metadata[attr] = getattr(metadata, attr)
                 new_metadata[attr] = getattr(metadata, attr)
-        if chunker_params := new_metadata.get("chunker_params"):
-            if len(chunker_params) == 4 and isinstance(chunker_params[0], int):
-                # this is a borg < 1.2 chunker_params tuple, no chunker algo specified, but we only had buzhash:
-                new_metadata["chunker_params"] = (CH_BUZHASH,) + chunker_params
+        rechunking = self.args.chunker_params is not None
+        if rechunking:
+            # if we are rechunking while transferring, we take the new chunker_params.
+            new_metadata["chunker_params"] = self.args.chunker_params
+        else:
+            if chunker_params := new_metadata.get("chunker_params"):
+                if len(chunker_params) == 4 and isinstance(chunker_params[0], int):
+                    # this is a borg < 1.2 chunker_params tuple, no chunker algo specified, but we only had buzhash:
+                    new_metadata["chunker_params"] = (CH_BUZHASH,) + chunker_params
         # old borg used UTC timestamps, but did not have the explicit tz offset in them.
         # old borg used UTC timestamps, but did not have the explicit tz offset in them.
         for attr in ("time", "time_end"):
         for attr in ("time", "time_end"):
             if hasattr(metadata, attr):
             if hasattr(metadata, attr):