| 
					
				 | 
			
			
				@@ -1,13 +1,15 @@ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 import argparse 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from ._common import with_repository, with_other_repository, Highlander 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-from ..archive import Archive 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from ..archive import Archive, cached_hash, DownloadPipeline 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from ..chunker import get_chunker 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from ..compress import CompressionSpec 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from ..constants import *  # NOQA 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from ..crypto.key import uses_same_id_hash, uses_same_chunker_secret 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from ..helpers import Error 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from ..helpers import location_validator, Location, archivename_validator, comment_validator 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from ..helpers import format_file_size, bin_to_hex 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from ..helpers import ChunkerParams, ChunkIteratorFileWrapper 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from ..manifest import Manifest 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from ..legacyrepository import LegacyRepository 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from ..repository import Repository 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -17,57 +19,100 @@ from ..logger import create_logger 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 logger = create_logger() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-def transfer_chunks(upgrader, other_repository, other_manifest, other_chunks, archive, cache, recompress, dry_run): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+def transfer_chunks( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    upgrader, other_repository, other_manifest, other_chunks, archive, cache, recompress, dry_run, chunker_params=None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    Transfer chunks from another repository to the current repository. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    If chunker_params is provided, the chunks will be re-chunked using the specified parameters. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    """ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     transfer = 0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     present = 0 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     chunks = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    for chunk_id, size in other_chunks: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        chunk_present = cache.seen_chunk(chunk_id, size) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        if not chunk_present:  # target repo does not yet have this chunk 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # Determine if re-chunking is needed 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    rechunkify = chunker_params is not None 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    if rechunkify: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # Similar to ArchiveRecreater.iter_chunks 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        pipeline = DownloadPipeline(other_manifest.repository, other_manifest.repo_objs) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        chunk_iterator = pipeline.fetch_many(other_chunks, ro_type=ROBJ_FILE_STREAM) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        file = ChunkIteratorFileWrapper(chunk_iterator) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # Create a chunker with the specified parameters 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        chunker = get_chunker(*chunker_params, seed=archive.key.chunk_seed, sparse=False) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for chunk in chunker.chunkify(file): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             if not dry_run: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                try: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    cdata = other_repository.get(chunk_id) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    # missing correct chunk in other_repository (source) will result in 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    # a missing chunk in repository (destination). 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    # we do NOT want to transfer all-zero replacement chunks from borg1 repos. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    pass 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                chunk_id, data = cached_hash(chunk, archive.key.id_hash) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                size = len(data) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                # Check if the chunk is already in the repository 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                chunk_present = cache.seen_chunk(chunk_id, size) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                if chunk_present: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    present += size 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    if recompress == "never": 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        # keep compressed payload same, verify via assert_id (that will 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        # decompress, but avoid needing to compress it again): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        meta, data = other_manifest.repo_objs.parse( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            chunk_id, cdata, decompress=True, want_compressed=True, ro_type=ROBJ_FILE_STREAM 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        meta, data = upgrader.upgrade_compressed_chunk(meta, data) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        chunk_entry = cache.add_chunk( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            chunk_id, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            meta, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            data, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            stats=archive.stats, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            wait=False, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            compress=False, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            size=size, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            ctype=meta["ctype"], 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            clevel=meta["clevel"], 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            ro_type=ROBJ_FILE_STREAM, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    elif recompress == "always": 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        # always decompress and re-compress file data chunks 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        meta, data = other_manifest.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_FILE_STREAM) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        chunk_entry = cache.add_chunk( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                            chunk_id, meta, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                    else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                        raise ValueError(f"unsupported recompress mode: {recompress}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                cache.repository.async_response(wait=False) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                chunks.append(chunk_entry) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            transfer += size 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            if not dry_run: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-                chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    # Add the new chunk to the repository 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    chunk_entry = cache.add_chunk( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        chunk_id, {}, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    cache.repository.async_response(wait=False) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    transfer += size 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                 chunks.append(chunk_entry) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-            present += size 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                # In dry-run mode, just estimate the size 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                size = len(chunk.data) if chunk.data is not None else chunk.size 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                transfer += size 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # Original implementation without re-chunking 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        for chunk_id, size in other_chunks: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            chunk_present = cache.seen_chunk(chunk_id, size) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            if not chunk_present:  # target repo does not yet have this chunk 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                if not dry_run: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    try: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        cdata = other_repository.get(chunk_id) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        # missing correct chunk in other_repository (source) will result in 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        # a missing chunk in repository (destination). 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        # we do NOT want to transfer all-zero replacement chunks from borg1 repos. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        pass 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        if recompress == "never": 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            # keep compressed payload same, verify via assert_id (that will 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            # decompress, but avoid needing to compress it again): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            meta, data = other_manifest.repo_objs.parse( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                chunk_id, cdata, decompress=True, want_compressed=True, ro_type=ROBJ_FILE_STREAM 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            meta, data = upgrader.upgrade_compressed_chunk(meta, data) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            chunk_entry = cache.add_chunk( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                chunk_id, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                meta, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                data, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                stats=archive.stats, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                wait=False, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                compress=False, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                size=size, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                ctype=meta["ctype"], 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                clevel=meta["clevel"], 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                ro_type=ROBJ_FILE_STREAM, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        elif recompress == "always": 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            # always decompress and re-compress file data chunks 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            meta, data = other_manifest.repo_objs.parse(chunk_id, cdata, ro_type=ROBJ_FILE_STREAM) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            chunk_entry = cache.add_chunk( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                                chunk_id, meta, data, stats=archive.stats, wait=False, ro_type=ROBJ_FILE_STREAM 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                        else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            raise ValueError(f"unsupported recompress mode: {recompress}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    cache.repository.async_response(wait=False) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    chunks.append(chunk_entry) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                transfer += size 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            else: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                if not dry_run: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    chunk_entry = cache.reuse_chunk(chunk_id, size, archive.stats) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                    chunks.append(chunk_entry) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                present += size 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     return chunks, transfer, present 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -130,7 +175,7 @@ class TransferMixIn: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if UpgraderCls is not upgrade_mod.UpgraderFrom12To20 and other_manifest.repository.version == 1: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             raise Error("To transfer from a borg 1.x repo, you need to use: --upgrader=From12To20") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        upgrader = UpgraderCls(cache=cache) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        upgrader = UpgraderCls(cache=cache, args=args) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         for archive_info in archive_infos: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             name, id, ts = archive_info.name, archive_info.id, archive_info.ts 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -183,6 +228,7 @@ class TransferMixIn: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             cache, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             args.recompress, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             dry_run, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                            args.chunker_params, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                         if not dry_run: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                             item.chunks = chunks 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -220,6 +266,7 @@ class TransferMixIn: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         This command transfers archives from one repository to another repository. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         Optionally, it can also upgrade the transferred data. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         Optionally, it can also recompress the transferred data. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        Optionally, it can also re-chunk the transferred data using different chunker parameters. 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         It is easiest (and fastest) to give ``--compression=COMPRESSION --recompress=never`` using 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         the same COMPRESSION mode as in the SRC_REPO - borg will use that COMPRESSION for metadata (in 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -265,6 +312,10 @@ class TransferMixIn: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             borg --repo=DST_REPO transfer --other-repo=SRC_REPO --from-borg1 \\ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				                  --compress=zstd,3 --recompress=always 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            # to re-chunk using different chunker parameters: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            borg --repo=DST_REPO transfer --other-repo=SRC_REPO \\ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+                 --chunker-params=buzhash,19,23,21,4095 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         """ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         ) 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -328,5 +379,16 @@ class TransferMixIn: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             "If no MODE is given, `always` will be used. " 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             'Not passing --recompress is equivalent to "--recompress never".', 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        subparser.add_argument( 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            "--chunker-params", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            metavar="PARAMS", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            dest="chunker_params", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            type=ChunkerParams, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            default=None, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            action=Highlander, 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            help="rechunk using given chunker parameters (ALGO, CHUNK_MIN_EXP, CHUNK_MAX_EXP, " 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            "HASH_MASK_BITS, HASH_WINDOW_SIZE) or `default` to use the chunker defaults. " 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+            "default: do not rechunk", 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         define_archive_filters_group(subparser) 
			 |