Browse Source

move transfer command to archiver.transfer

Thomas Waldmann 2 years ago
parent
commit
8d9e47e374
3 changed files with 220 additions and 195 deletions
  1. 8 194
      src/borg/archiver/__init__.py
  2. 53 1
      src/borg/archiver/common.py
  3. 159 0
      src/borg/archiver/transfer.py

+ 8 - 194
src/borg/archiver/__init__.py

@@ -40,15 +40,15 @@ try:
     from ..crypto.key import key_creator, key_argument_names, tam_required_file
     from ..crypto.key import key_creator, key_argument_names, tam_required_file
     from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, EXIT_SIGNAL_BASE
     from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, EXIT_SIGNAL_BASE
     from ..helpers import Error, NoManifestError, set_ec
     from ..helpers import Error, NoManifestError, set_ec
-    from ..helpers import positive_int_validator, location_validator, archivename_validator, ChunkerParams, Location
-    from ..helpers import GlobSpec, NameSpec, CommentSpec, SortBySpec, FilesCacheMode
+    from ..helpers import location_validator, archivename_validator, ChunkerParams, Location
+    from ..helpers import NameSpec, CommentSpec, FilesCacheMode
     from ..helpers import BaseFormatter, ItemFormatter, ArchiveFormatter
     from ..helpers import BaseFormatter, ItemFormatter, ArchiveFormatter
     from ..helpers import format_timedelta, format_file_size, parse_file_size, format_archive
     from ..helpers import format_timedelta, format_file_size, parse_file_size, format_archive
     from ..helpers import remove_surrogates, bin_to_hex, eval_escapes
     from ..helpers import remove_surrogates, bin_to_hex, eval_escapes
     from ..helpers import interval, prune_within, prune_split, PRUNING_PATTERNS
     from ..helpers import interval, prune_within, prune_split, PRUNING_PATTERNS
     from ..helpers import timestamp
     from ..helpers import timestamp
     from ..helpers import get_cache_dir, os_stat
     from ..helpers import get_cache_dir, os_stat
-    from ..helpers import Manifest, AI_HUMAN_SORT_KEYS
+    from ..helpers import Manifest
     from ..helpers import HardLinkManager
     from ..helpers import HardLinkManager
     from ..helpers import check_python, check_extension_modules
     from ..helpers import check_python, check_extension_modules
     from ..helpers import dir_is_tagged, is_slow_msgpack, is_supported_msgpack, yes, sysinfo
     from ..helpers import dir_is_tagged, is_slow_msgpack, is_supported_msgpack, yes, sysinfo
@@ -107,9 +107,10 @@ from .help import HelpMixIn
 from .keys import KeysMixIn
 from .keys import KeysMixIn
 from .locks import LocksMixIn
 from .locks import LocksMixIn
 from .tar import TarMixIn
 from .tar import TarMixIn
+from .transfer import TransferMixIn
 
 
 
 
-class Archiver(DebugMixIn, TarMixIn, BenchmarkMixIn, KeysMixIn, LocksMixIn, HelpMixIn):
+class Archiver(DebugMixIn, TarMixIn, BenchmarkMixIn, KeysMixIn, LocksMixIn, HelpMixIn, TransferMixIn):
     def __init__(self, lock_wait=None, prog=None):
     def __init__(self, lock_wait=None, prog=None):
         self.exit_code = EXIT_SUCCESS
         self.exit_code = EXIT_SUCCESS
         self.lock_wait = lock_wait
         self.lock_wait = lock_wait
@@ -153,84 +154,6 @@ class Archiver(DebugMixIn, TarMixIn, BenchmarkMixIn, KeysMixIn, LocksMixIn, Help
         ).serve()
         ).serve()
         return EXIT_SUCCESS
         return EXIT_SUCCESS
 
 
-    @with_other_repository(manifest=True, key=True, compatibility=(Manifest.Operation.READ,))
-    @with_repository(exclusive=True, manifest=True, cache=True, compatibility=(Manifest.Operation.WRITE,))
-    def do_transfer(
-        self, args, *, repository, manifest, key, cache, other_repository=None, other_manifest=None, other_key=None
-    ):
-        """archives transfer from other repository, optionally upgrade data format"""
-        dry_run = args.dry_run
-        args.consider_checkpoints = True
-        archive_names = tuple(x.name for x in other_manifest.archives.list_considering(args))
-        if not archive_names:
-            return EXIT_SUCCESS
-
-        from . import upgrade as upgrade_mod
-
-        try:
-            UpgraderCls = getattr(upgrade_mod, f"Upgrader{args.upgrader}")
-        except AttributeError:
-            self.print_error(f"No such upgrader: {args.upgrader}")
-            return EXIT_ERROR
-
-        upgrader = UpgraderCls(cache=cache)
-
-        for name in archive_names:
-            transfer_size = 0
-            present_size = 0
-            if name in manifest.archives and not dry_run:
-                print(f"{name}: archive is already present in destination repo, skipping.")
-            else:
-                if not dry_run:
-                    print(f"{name}: copying archive to destination repo...")
-                other_archive = Archive(other_repository, other_key, other_manifest, name)
-                archive = Archive(repository, key, manifest, name, cache=cache, create=True) if not dry_run else None
-                upgrader.new_archive(archive=archive)
-                for item in other_archive.iter_items():
-                    if "chunks" in item:
-                        chunks = []
-                        for chunk_id, size in item.chunks:
-                            refcount = cache.seen_chunk(chunk_id, size)
-                            if refcount == 0:  # target repo does not yet have this chunk
-                                if not dry_run:
-                                    cdata = other_repository.get(chunk_id)
-                                    # keep compressed payload same, avoid decompression / recompression
-                                    data = other_key.decrypt(chunk_id, cdata, decompress=False)
-                                    data = upgrader.upgrade_compressed_chunk(chunk=data)
-                                    chunk_entry = cache.add_chunk(
-                                        chunk_id, data, archive.stats, wait=False, compress=False, size=size
-                                    )
-                                    cache.repository.async_response(wait=False)
-                                    chunks.append(chunk_entry)
-                                transfer_size += size
-                            else:
-                                if not dry_run:
-                                    chunk_entry = cache.chunk_incref(chunk_id, archive.stats)
-                                    chunks.append(chunk_entry)
-                                present_size += size
-                        if not dry_run:
-                            item.chunks = chunks  # TODO: overwrite? IDs and sizes are same.
-                            archive.stats.nfiles += 1
-                    if not dry_run:
-                        archive.add_item(upgrader.upgrade_item(item=item))
-                if not dry_run:
-                    additional_metadata = upgrader.upgrade_archive_metadata(metadata=other_archive.metadata)
-                    archive.save(stats=archive.stats, additional_metadata=additional_metadata)
-                    print(
-                        f"{name}: finished. "
-                        f"transfer_size: {format_file_size(transfer_size)} "
-                        f"present_size: {format_file_size(present_size)}"
-                    )
-                else:
-                    print(
-                        f"{name}: completed"
-                        if transfer_size == 0
-                        else f"{name}: incomplete, "
-                        f"transfer_size: {format_file_size(transfer_size)} "
-                        f"present_size: {format_file_size(present_size)}"
-                    )
-        return EXIT_SUCCESS
-
     @with_repository(create=True, exclusive=True, manifest=False)
     @with_repository(create=True, exclusive=True, manifest=False)
     @with_other_repository(key=True, compatibility=(Manifest.Operation.READ,))
     @with_other_repository(key=True, compatibility=(Manifest.Operation.READ,))
     def do_rcreate(self, args, repository, *, other_repository=None, other_key=None):
     def do_rcreate(self, args, repository, *, other_repository=None, other_key=None):
@@ -1659,7 +1582,7 @@ class Archiver(DebugMixIn, TarMixIn, BenchmarkMixIn, KeysMixIn, LocksMixIn, Help
     def build_parser(self):
     def build_parser(self):
 
 
         from .common import process_epilog
         from .common import process_epilog
-        from .common import define_exclusion_group
+        from .common import define_exclusion_group, define_archive_filters_group
 
 
         def define_common_options(add_common_option):
         def define_common_options(add_common_option):
             add_common_option("-h", "--help", action="help", help="show this help message and exit")
             add_common_option("-h", "--help", action="help", help="show this help message and exit")
@@ -1801,55 +1724,6 @@ class Archiver(DebugMixIn, TarMixIn, BenchmarkMixIn, KeysMixIn, LocksMixIn, Help
                 help="repository to use",
                 help="repository to use",
             )
             )
 
 
-        def define_archive_filters_group(subparser, *, sort_by=True, first_last=True):
-            filters_group = subparser.add_argument_group(
-                "Archive filters", "Archive filters can be applied to repository targets."
-            )
-            group = filters_group.add_mutually_exclusive_group()
-            group.add_argument(
-                "-a",
-                "--glob-archives",
-                metavar="GLOB",
-                dest="glob_archives",
-                type=GlobSpec,
-                action=Highlander,
-                help="only consider archive names matching the glob. " 'sh: rules apply, see "borg help patterns".',
-            )
-
-            if sort_by:
-                sort_by_default = "timestamp"
-                filters_group.add_argument(
-                    "--sort-by",
-                    metavar="KEYS",
-                    dest="sort_by",
-                    type=SortBySpec,
-                    default=sort_by_default,
-                    help="Comma-separated list of sorting keys; valid keys are: {}; default is: {}".format(
-                        ", ".join(AI_HUMAN_SORT_KEYS), sort_by_default
-                    ),
-                )
-
-            if first_last:
-                group = filters_group.add_mutually_exclusive_group()
-                group.add_argument(
-                    "--first",
-                    metavar="N",
-                    dest="first",
-                    default=0,
-                    type=positive_int_validator,
-                    help="consider first N archives after other filters were applied",
-                )
-                group.add_argument(
-                    "--last",
-                    metavar="N",
-                    dest="last",
-                    default=0,
-                    type=positive_int_validator,
-                    help="consider last N archives after other filters were applied",
-                )
-
-            return filters_group
-
         def define_borg_mount(parser):
         def define_borg_mount(parser):
             parser.set_defaults(func=self.do_mount)
             parser.set_defaults(func=self.do_mount)
             parser.add_argument(
             parser.add_argument(
@@ -2670,68 +2544,6 @@ class Archiver(DebugMixIn, TarMixIn, BenchmarkMixIn, KeysMixIn, LocksMixIn, Help
         )
         )
         define_archive_filters_group(subparser)
         define_archive_filters_group(subparser)
 
 
-        # borg transfer
-        transfer_epilog = process_epilog(
-            """
-        This command transfers archives from one repository to another repository.
-        Optionally, it can also upgrade the transferred data.
-
-        Suggested use for general purpose archive transfer (not repo upgrades)::
-
-            # initialize DST_REPO reusing key material from SRC_REPO, so that
-            # chunking and chunk id generation will work in the same way as before.
-            borg --repo=DST_REPO init --other-repo=SRC_REPO --encryption=DST_ENC
-
-            # transfer archives from SRC_REPO to DST_REPO
-            borg --repo=DST_REPO transfer --other-repo=SRC_REPO --dry-run  # check what it would do
-            borg --repo=DST_REPO transfer --other-repo=SRC_REPO            # do it!
-            borg --repo=DST_REPO transfer --other-repo=SRC_REPO --dry-run  # check! anything left?
-
-        The default is to transfer all archives, including checkpoint archives.
-
-        You could use the misc. archive filter options to limit which archives it will
-        transfer, e.g. using the -a option. This is recommended for big
-        repositories with multiple data sets to keep the runtime per invocation lower.
-
-        For repository upgrades (e.g. from a borg 1.2 repo to a related borg 2.0 repo), usage is
-        quite similar to the above::
-
-            borg --repo=DST_REPO transfer --other-repo=SRC_REPO --upgrader=From12To20
-
-
-        """
-        )
-        subparser = subparsers.add_parser(
-            "transfer",
-            parents=[common_parser],
-            add_help=False,
-            description=self.do_transfer.__doc__,
-            epilog=transfer_epilog,
-            formatter_class=argparse.RawDescriptionHelpFormatter,
-            help="transfer of archives from another repository",
-        )
-        subparser.set_defaults(func=self.do_transfer)
-        subparser.add_argument(
-            "-n", "--dry-run", dest="dry_run", action="store_true", help="do not change repository, just check"
-        )
-        subparser.add_argument(
-            "--other-repo",
-            metavar="SRC_REPOSITORY",
-            dest="other_location",
-            type=location_validator(other=True),
-            default=Location(other=True),
-            help="transfer archives from the other repository",
-        )
-        subparser.add_argument(
-            "--upgrader",
-            metavar="UPGRADER",
-            dest="upgrader",
-            type=str,
-            default="NoOp",
-            help="use the upgrader to convert transferred data (default: no conversion)",
-        )
-        define_archive_filters_group(subparser)
-
         # borg diff
         # borg diff
         diff_epilog = process_epilog(
         diff_epilog = process_epilog(
             """
             """
@@ -3632,6 +3444,8 @@ class Archiver(DebugMixIn, TarMixIn, BenchmarkMixIn, KeysMixIn, LocksMixIn, Help
 
 
         self.build_parser_tar(subparsers, common_parser, mid_common_parser)
         self.build_parser_tar(subparsers, common_parser, mid_common_parser)
 
 
+        self.build_parser_transfer(subparsers, common_parser, mid_common_parser)
+
         return parser
         return parser
 
 
     def get_args(self, argv, cmd):
     def get_args(self, argv, cmd):

+ 53 - 1
src/borg/archiver/common.py

@@ -7,7 +7,8 @@ from ..archive import Archive
 from ..constants import *  # NOQA
 from ..constants import *  # NOQA
 from ..cache import Cache, assert_secure
 from ..cache import Cache, assert_secure
 from ..helpers import Error
 from ..helpers import Error
-from ..helpers import Manifest
+from ..helpers import Manifest, AI_HUMAN_SORT_KEYS
+from ..helpers import GlobSpec, SortBySpec, positive_int_validator
 from ..remote import RemoteRepository
 from ..remote import RemoteRepository
 from ..repository import Repository
 from ..repository import Repository
 from ..nanorst import rst_to_terminal
 from ..nanorst import rst_to_terminal
@@ -18,6 +19,7 @@ from ..patterns import (
     parse_exclude_pattern,
     parse_exclude_pattern,
 )
 )
 
 
+
 from ..logger import create_logger
 from ..logger import create_logger
 
 
 logger = create_logger(__name__)
 logger = create_logger(__name__)
@@ -368,3 +370,53 @@ def define_exclusion_group(subparser, **kwargs):
     exclude_group = subparser.add_argument_group("Exclusion options")
     exclude_group = subparser.add_argument_group("Exclusion options")
     define_exclude_and_patterns(exclude_group.add_argument, **kwargs)
     define_exclude_and_patterns(exclude_group.add_argument, **kwargs)
     return exclude_group
     return exclude_group
+
+
+def define_archive_filters_group(subparser, *, sort_by=True, first_last=True):
+    filters_group = subparser.add_argument_group(
+        "Archive filters", "Archive filters can be applied to repository targets."
+    )
+    group = filters_group.add_mutually_exclusive_group()
+    group.add_argument(
+        "-a",
+        "--glob-archives",
+        metavar="GLOB",
+        dest="glob_archives",
+        type=GlobSpec,
+        action=Highlander,
+        help="only consider archive names matching the glob. " 'sh: rules apply, see "borg help patterns".',
+    )
+
+    if sort_by:
+        sort_by_default = "timestamp"
+        filters_group.add_argument(
+            "--sort-by",
+            metavar="KEYS",
+            dest="sort_by",
+            type=SortBySpec,
+            default=sort_by_default,
+            help="Comma-separated list of sorting keys; valid keys are: {}; default is: {}".format(
+                ", ".join(AI_HUMAN_SORT_KEYS), sort_by_default
+            ),
+        )
+
+    if first_last:
+        group = filters_group.add_mutually_exclusive_group()
+        group.add_argument(
+            "--first",
+            metavar="N",
+            dest="first",
+            default=0,
+            type=positive_int_validator,
+            help="consider first N archives after other filters were applied",
+        )
+        group.add_argument(
+            "--last",
+            metavar="N",
+            dest="last",
+            default=0,
+            type=positive_int_validator,
+            help="consider last N archives after other filters were applied",
+        )
+
+    return filters_group

+ 159 - 0
src/borg/archiver/transfer.py

@@ -0,0 +1,159 @@
+import argparse
+
+from .common import with_repository, with_other_repository
+from ..archive import Archive
+from ..constants import *  # NOQA
+from ..helpers import EXIT_SUCCESS, EXIT_ERROR
+from ..helpers import location_validator, Location
+from ..helpers import format_file_size
+from ..helpers import Manifest
+
+from ..logger import create_logger
+
+logger = create_logger()
+
+
+class TransferMixIn:
+    @with_other_repository(manifest=True, key=True, compatibility=(Manifest.Operation.READ,))
+    @with_repository(exclusive=True, manifest=True, cache=True, compatibility=(Manifest.Operation.WRITE,))
+    def do_transfer(
+        self, args, *, repository, manifest, key, cache, other_repository=None, other_manifest=None, other_key=None
+    ):
+        """archives transfer from other repository, optionally upgrade data format"""
+        dry_run = args.dry_run
+        args.consider_checkpoints = True
+        archive_names = tuple(x.name for x in other_manifest.archives.list_considering(args))
+        if not archive_names:
+            return EXIT_SUCCESS
+
+        from .. import upgrade as upgrade_mod
+
+        try:
+            UpgraderCls = getattr(upgrade_mod, f"Upgrader{args.upgrader}")
+        except AttributeError:
+            self.print_error(f"No such upgrader: {args.upgrader}")
+            return EXIT_ERROR
+
+        upgrader = UpgraderCls(cache=cache)
+
+        for name in archive_names:
+            transfer_size = 0
+            present_size = 0
+            if name in manifest.archives and not dry_run:
+                print(f"{name}: archive is already present in destination repo, skipping.")
+            else:
+                if not dry_run:
+                    print(f"{name}: copying archive to destination repo...")
+                other_archive = Archive(other_repository, other_key, other_manifest, name)
+                archive = Archive(repository, key, manifest, name, cache=cache, create=True) if not dry_run else None
+                upgrader.new_archive(archive=archive)
+                for item in other_archive.iter_items():
+                    if "chunks" in item:
+                        chunks = []
+                        for chunk_id, size in item.chunks:
+                            refcount = cache.seen_chunk(chunk_id, size)
+                            if refcount == 0:  # target repo does not yet have this chunk
+                                if not dry_run:
+                                    cdata = other_repository.get(chunk_id)
+                                    # keep compressed payload same, avoid decompression / recompression
+                                    data = other_key.decrypt(chunk_id, cdata, decompress=False)
+                                    data = upgrader.upgrade_compressed_chunk(chunk=data)
+                                    chunk_entry = cache.add_chunk(
+                                        chunk_id, data, archive.stats, wait=False, compress=False, size=size
+                                    )
+                                    cache.repository.async_response(wait=False)
+                                    chunks.append(chunk_entry)
+                                transfer_size += size
+                            else:
+                                if not dry_run:
+                                    chunk_entry = cache.chunk_incref(chunk_id, archive.stats)
+                                    chunks.append(chunk_entry)
+                                present_size += size
+                        if not dry_run:
+                            item.chunks = chunks  # TODO: overwrite? IDs and sizes are same.
+                            archive.stats.nfiles += 1
+                    if not dry_run:
+                        archive.add_item(upgrader.upgrade_item(item=item))
+                if not dry_run:
+                    additional_metadata = upgrader.upgrade_archive_metadata(metadata=other_archive.metadata)
+                    archive.save(stats=archive.stats, additional_metadata=additional_metadata)
+                    print(
+                        f"{name}: finished. "
+                        f"transfer_size: {format_file_size(transfer_size)} "
+                        f"present_size: {format_file_size(present_size)}"
+                    )
+                else:
+                    print(
+                        f"{name}: completed"
+                        if transfer_size == 0
+                        else f"{name}: incomplete, "
+                        f"transfer_size: {format_file_size(transfer_size)} "
+                        f"present_size: {format_file_size(present_size)}"
+                    )
+        return EXIT_SUCCESS
+
+    def build_parser_transfer(self, subparsers, common_parser, mid_common_parser):
+
+        from .common import process_epilog
+        from .common import define_archive_filters_group
+
+        transfer_epilog = process_epilog(
+            """
+        This command transfers archives from one repository to another repository.
+        Optionally, it can also upgrade the transferred data.
+
+        Suggested use for general purpose archive transfer (not repo upgrades)::
+
+            # initialize DST_REPO reusing key material from SRC_REPO, so that
+            # chunking and chunk id generation will work in the same way as before.
+            borg --repo=DST_REPO init --other-repo=SRC_REPO --encryption=DST_ENC
+
+            # transfer archives from SRC_REPO to DST_REPO
+            borg --repo=DST_REPO transfer --other-repo=SRC_REPO --dry-run  # check what it would do
+            borg --repo=DST_REPO transfer --other-repo=SRC_REPO            # do it!
+            borg --repo=DST_REPO transfer --other-repo=SRC_REPO --dry-run  # check! anything left?
+
+        The default is to transfer all archives, including checkpoint archives.
+
+        You could use the misc. archive filter options to limit which archives it will
+        transfer, e.g. using the -a option. This is recommended for big
+        repositories with multiple data sets to keep the runtime per invocation lower.
+
+        For repository upgrades (e.g. from a borg 1.2 repo to a related borg 2.0 repo), usage is
+        quite similar to the above::
+
+            borg --repo=DST_REPO transfer --other-repo=SRC_REPO --upgrader=From12To20
+
+
+        """
+        )
+        subparser = subparsers.add_parser(
+            "transfer",
+            parents=[common_parser],
+            add_help=False,
+            description=self.do_transfer.__doc__,
+            epilog=transfer_epilog,
+            formatter_class=argparse.RawDescriptionHelpFormatter,
+            help="transfer of archives from another repository",
+        )
+        subparser.set_defaults(func=self.do_transfer)
+        subparser.add_argument(
+            "-n", "--dry-run", dest="dry_run", action="store_true", help="do not change repository, just check"
+        )
+        subparser.add_argument(
+            "--other-repo",
+            metavar="SRC_REPOSITORY",
+            dest="other_location",
+            type=location_validator(other=True),
+            default=Location(other=True),
+            help="transfer archives from the other repository",
+        )
+        subparser.add_argument(
+            "--upgrader",
+            metavar="UPGRADER",
+            dest="upgrader",
+            type=str,
+            default="NoOp",
+            help="use the upgrader to convert transferred data (default: no conversion)",
+        )
+        define_archive_filters_group(subparser)