Browse Source

Repository3 / RemoteRepository3: implement a borgstore based repository

Simplify the repository a lot:

No repository transactions, no log-like appending, no append-only, no segments,
just using a key/value store for the individual chunks.

No locking yet.

Also:

mypy: ignore missing import
there are no library stubs for borgstore yet, so mypy errors without that option.

pyproject.toml: install borgstore directly from github
There is no pypi release yet.

use pip install -e . rather than python setup.py develop
The latter is deprecated and had issues installing the "borgstore from github" dependency.
Thomas Waldmann 10 months ago
parent
commit
d30d5f4aec
37 changed files with 1967 additions and 601 deletions
  1. 1 2
      .github/workflows/ci.yml
  2. 2 0
      pyproject.toml
  3. 5 5
      src/borg/archive.py
  4. 3 22
      src/borg/archiver/__init__.py
  5. 14 15
      src/borg/archiver/_common.py
  6. 0 177
      src/borg/archiver/config_cmd.py
  7. 3 36
      src/borg/archiver/debug_cmd.py
  8. 1 10
      src/borg/archiver/rcompress_cmd.py
  9. 1 1
      src/borg/archiver/serve_cmd.py
  10. 2 2
      src/borg/archiver/version_cmd.py
  11. 4 12
      src/borg/cache.py
  12. 2 2
      src/borg/crypto/keymanager.py
  13. 1 1
      src/borg/fuse.py
  14. 1 1
      src/borg/helpers/misc.py
  15. 3 1
      src/borg/helpers/parseformat.py
  16. 2 2
      src/borg/manifest.py
  17. 3 1
      src/borg/remote.py
  18. 1269 0
      src/borg/remote3.py
  19. 314 0
      src/borg/repository3.py
  20. 8 12
      src/borg/testsuite/archiver/__init__.py
  21. 0 130
      src/borg/testsuite/archiver/bypass_lock_option.py
  22. 3 5
      src/borg/testsuite/archiver/check_cmd.py
  23. 12 12
      src/borg/testsuite/archiver/checks.py
  24. 0 64
      src/borg/testsuite/archiver/config_cmd.py
  25. 0 18
      src/borg/testsuite/archiver/corruption.py
  26. 2 2
      src/borg/testsuite/archiver/create_cmd.py
  27. 3 3
      src/borg/testsuite/archiver/delete_cmd.py
  28. 7 7
      src/borg/testsuite/archiver/key_cmds.py
  29. 2 2
      src/borg/testsuite/archiver/rcompress_cmd.py
  30. 0 29
      src/borg/testsuite/archiver/rcreate_cmd.py
  31. 2 2
      src/borg/testsuite/archiver/rename_cmd.py
  32. 1 1
      src/borg/testsuite/archiver/return_codes.py
  33. 0 18
      src/borg/testsuite/archiver/rinfo_cmd.py
  34. 3 3
      src/borg/testsuite/cache.py
  35. 2 2
      src/borg/testsuite/repoobj.py
  36. 290 0
      src/borg/testsuite/repository3.py
  37. 1 1
      tox.ini

+ 1 - 2
.github/workflows/ci.yml

@@ -104,8 +104,7 @@ jobs:
         pip install -r requirements.d/development.txt
     - name: Install borgbackup
       run: |
-        # pip install -e .
-        python setup.py -v develop
+        pip install -e .
     - name: run tox env
       env:
         XDISTN: "4"

+ 2 - 0
pyproject.toml

@@ -34,6 +34,8 @@ dependencies = [
   "platformdirs >=3.0.0, <5.0.0; sys_platform == 'darwin'",  # for macOS: breaking changes in 3.0.0,
   "platformdirs >=2.6.0, <5.0.0; sys_platform != 'darwin'",  # for others: 2.6+ works consistently.
   "argon2-cffi",
+  "borgstore",
+
 ]
 
 [project.optional-dependencies]

+ 5 - 5
src/borg/archive.py

@@ -51,7 +51,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
 from .item import Item, ArchiveItem, ItemDiff
 from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
 from .remote import cache_if_remote
-from .repository import Repository, LIST_SCAN_LIMIT
+from .repository3 import Repository3, LIST_SCAN_LIMIT
 from .repoobj import RepoObj
 
 has_link = hasattr(os, "link")
@@ -1046,7 +1046,7 @@ Duration: {0.duration}
         def fetch_async_response(wait=True):
             try:
                 return self.repository.async_response(wait=wait)
-            except Repository.ObjectNotFound:
+            except Repository3.ObjectNotFound:
                 nonlocal error
                 # object not in repo - strange, but we wanted to delete it anyway.
                 if forced == 0:
@@ -1093,7 +1093,7 @@ Duration: {0.duration}
                     error = True
             if progress:
                 pi.finish()
-        except (msgpack.UnpackException, Repository.ObjectNotFound):
+        except (msgpack.UnpackException, Repository3.ObjectNotFound):
             # items metadata corrupted
             if forced == 0:
                 raise
@@ -1887,7 +1887,7 @@ class ArchiveChecker:
         # Explicitly set the initial usable hash table capacity to avoid performance issues
         # due to hash table "resonance".
         # Since reconstruction of archive items can add some new chunks, add 10 % headroom.
-        self.chunks = ChunkIndex(usable=len(self.repository) * 1.1)
+        self.chunks = ChunkIndex()
         marker = None
         while True:
             result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
@@ -1939,7 +1939,7 @@ class ArchiveChecker:
                 chunk_id = chunk_ids_revd.pop(-1)  # better efficiency
                 try:
                     encrypted_data = next(chunk_data_iter)
-                except (Repository.ObjectNotFound, IntegrityErrorBase) as err:
+                except (Repository3.ObjectNotFound, IntegrityErrorBase) as err:
                     self.error_found = True
                     errors += 1
                     logger.error("chunk %s: %s", bin_to_hex(chunk_id), err)

+ 3 - 22
src/borg/archiver/__init__.py

@@ -36,7 +36,7 @@ try:
     from ..helpers import ErrorIgnoringTextIOWrapper
     from ..helpers import msgpack
     from ..helpers import sig_int
-    from ..remote import RemoteRepository
+    from ..remote3 import RemoteRepository3
     from ..selftest import selftest
 except BaseException:
     # an unhandled exception in the try-block would cause the borg cli command to exit with rc 1 due to python's
@@ -68,7 +68,6 @@ def get_func(args):
 from .benchmark_cmd import BenchmarkMixIn
 from .check_cmd import CheckMixIn
 from .compact_cmd import CompactMixIn
-from .config_cmd import ConfigMixIn
 from .create_cmd import CreateMixIn
 from .debug_cmd import DebugMixIn
 from .delete_cmd import DeleteMixIn
@@ -98,7 +97,6 @@ class Archiver(
     BenchmarkMixIn,
     CheckMixIn,
     CompactMixIn,
-    ConfigMixIn,
     CreateMixIn,
     DebugMixIn,
     DeleteMixIn,
@@ -336,7 +334,6 @@ class Archiver(
         self.build_parser_benchmarks(subparsers, common_parser, mid_common_parser)
         self.build_parser_check(subparsers, common_parser, mid_common_parser)
         self.build_parser_compact(subparsers, common_parser, mid_common_parser)
-        self.build_parser_config(subparsers, common_parser, mid_common_parser)
         self.build_parser_create(subparsers, common_parser, mid_common_parser)
         self.build_parser_debug(subparsers, common_parser, mid_common_parser)
         self.build_parser_delete(subparsers, common_parser, mid_common_parser)
@@ -412,22 +409,6 @@ class Archiver(
             elif not args.paths_from_stdin:
                 # need at least 1 path but args.paths may also be populated from patterns
                 parser.error("Need at least one PATH argument.")
-        if not getattr(args, "lock", True):  # Option --bypass-lock sets args.lock = False
-            bypass_allowed = {
-                self.do_check,
-                self.do_config,
-                self.do_diff,
-                self.do_export_tar,
-                self.do_extract,
-                self.do_info,
-                self.do_rinfo,
-                self.do_list,
-                self.do_rlist,
-                self.do_mount,
-                self.do_umount,
-            }
-            if func not in bypass_allowed:
-                raise Error("Not allowed to bypass locking mechanism for chosen command")
         # we can only have a complete knowledge of placeholder replacements we should do **after** arg parsing,
         # e.g. due to options like --timestamp that override the current time.
         # thus we have to initialize replace_placeholders here and process all args that need placeholder replacement.
@@ -581,7 +562,7 @@ def sig_trace_handler(sig_no, stack):  # pragma: no cover
 
 def format_tb(exc):
     qualname = type(exc).__qualname__
-    remote = isinstance(exc, RemoteRepository.RPCError)
+    remote = isinstance(exc, RemoteRepository3.RPCError)
     if remote:
         prefix = "Borg server: "
         trace_back = "\n".join(prefix + line for line in exc.exception_full.splitlines())
@@ -659,7 +640,7 @@ def main():  # pragma: no cover
             tb_log_level = logging.ERROR if e.traceback else logging.DEBUG
             tb = format_tb(e)
             exit_code = e.exit_code
-        except RemoteRepository.RPCError as e:
+        except RemoteRepository3.RPCError as e:
             important = e.traceback
             msg = e.exception_full if important else e.get_message()
             msgid = e.exception_class

+ 14 - 15
src/borg/archiver/_common.py

@@ -14,7 +14,9 @@ from ..helpers.nanorst import rst_to_terminal
 from ..manifest import Manifest, AI_HUMAN_SORT_KEYS
 from ..patterns import PatternMatcher
 from ..remote import RemoteRepository
+from ..remote3 import RemoteRepository3
 from ..repository import Repository
+from ..repository3 import Repository3
 from ..repoobj import RepoObj, RepoObj1
 from ..patterns import (
     ArgparsePatternAction,
@@ -29,9 +31,10 @@ from ..logger import create_logger
 logger = create_logger(__name__)
 
 
-def get_repository(location, *, create, exclusive, lock_wait, lock, append_only, make_parent_dirs, storage_quota, args):
+def get_repository(location, *, create, exclusive, lock_wait, lock, append_only, make_parent_dirs, storage_quota, args, v1_or_v2):
     if location.proto in ("ssh", "socket"):
-        repository = RemoteRepository(
+        RemoteRepoCls = RemoteRepository if v1_or_v2 else RemoteRepository3
+        repository = RemoteRepoCls(
             location,
             create=create,
             exclusive=exclusive,
@@ -43,7 +46,8 @@ def get_repository(location, *, create, exclusive, lock_wait, lock, append_only,
         )
 
     else:
-        repository = Repository(
+        RepoCls = Repository if v1_or_v2 else Repository3
+        repository = RepoCls(
             location.path,
             create=create,
             exclusive=exclusive,
@@ -98,8 +102,7 @@ def with_repository(
         decorator_name="with_repository",
     )
 
-    # To process the `--bypass-lock` option if specified, we need to
-    # modify `lock` inside `wrapper`. Therefore we cannot use the
+    # We may need to modify `lock` inside `wrapper`. Therefore we cannot use the
     # `nonlocal` statement to access `lock` as modifications would also
     # affect the scope outside of `wrapper`. Subsequent calls would
     # only see the overwritten value of `lock`, not the original one.
@@ -129,13 +132,15 @@ def with_repository(
                 make_parent_dirs=make_parent_dirs,
                 storage_quota=storage_quota,
                 args=args,
+                v1_or_v2=False,
             )
 
             with repository:
-                if repository.version not in (2,):
+                if repository.version not in (3,):
                     raise Error(
-                        "This borg version only accepts version 2 repos for -r/--repo. "
-                        "You can use 'borg transfer' to copy archives from old to new repos."
+                        f"This borg version only accepts version 3 repos for -r/--repo, "
+                        f"but not version {repository.version}. "
+                        f"You can use 'borg transfer' to copy archives from old to new repos."
                     )
                 if manifest or cache:
                     manifest_ = Manifest.load(repository, compatibility)
@@ -195,6 +200,7 @@ def with_other_repository(manifest=False, cache=False, compatibility=None):
                 make_parent_dirs=False,
                 storage_quota=None,
                 args=args,
+                v1_or_v2=True
             )
 
             with repository:
@@ -504,13 +510,6 @@ def define_common_options(add_common_option):
         action=Highlander,
         help="wait at most SECONDS for acquiring a repository/cache lock (default: %(default)d).",
     )
-    add_common_option(
-        "--bypass-lock",
-        dest="lock",
-        action="store_false",
-        default=argparse.SUPPRESS,  # only create args attribute if option is specified
-        help="Bypass locking mechanism",
-    )
     add_common_option("--show-version", dest="show_version", action="store_true", help="show/log the borg version")
     add_common_option("--show-rc", dest="show_rc", action="store_true", help="show/log the return code (rc)")
     add_common_option(

+ 0 - 177
src/borg/archiver/config_cmd.py

@@ -1,177 +0,0 @@
-import argparse
-import configparser
-
-from ._common import with_repository
-from ..cache import Cache, assert_secure
-from ..constants import *  # NOQA
-from ..helpers import Error, CommandError
-from ..helpers import parse_file_size, hex_to_bin
-from ..manifest import Manifest
-
-from ..logger import create_logger
-
-logger = create_logger()
-
-
-class ConfigMixIn:
-    @with_repository(exclusive=True, manifest=False)
-    def do_config(self, args, repository):
-        """get, set, and delete values in a repository or cache config file"""
-
-        def repo_validate(section, name, value=None, check_value=True):
-            if section not in ["repository"]:
-                raise ValueError("Invalid section")
-            if name in ["segments_per_dir", "last_segment_checked"]:
-                if check_value:
-                    try:
-                        int(value)
-                    except ValueError:
-                        raise ValueError("Invalid value") from None
-            elif name in ["max_segment_size", "additional_free_space", "storage_quota"]:
-                if check_value:
-                    try:
-                        parse_file_size(value)
-                    except ValueError:
-                        raise ValueError("Invalid value") from None
-                    if name == "storage_quota":
-                        if parse_file_size(value) < parse_file_size("10M"):
-                            raise ValueError("Invalid value: storage_quota < 10M")
-                    elif name == "max_segment_size":
-                        if parse_file_size(value) >= MAX_SEGMENT_SIZE_LIMIT:
-                            raise ValueError("Invalid value: max_segment_size >= %d" % MAX_SEGMENT_SIZE_LIMIT)
-            elif name in ["append_only"]:
-                if check_value and value not in ["0", "1"]:
-                    raise ValueError("Invalid value")
-            elif name in ["id"]:
-                if check_value:
-                    hex_to_bin(value, length=32)
-            else:
-                raise ValueError("Invalid name")
-
-        def cache_validate(section, name, value=None, check_value=True):
-            if section not in ["cache"]:
-                raise ValueError("Invalid section")
-            # currently, we do not support setting anything in the cache via borg config.
-            raise ValueError("Invalid name")
-
-        def list_config(config):
-            default_values = {
-                "version": "1",
-                "segments_per_dir": str(DEFAULT_SEGMENTS_PER_DIR),
-                "max_segment_size": str(MAX_SEGMENT_SIZE_LIMIT),
-                "additional_free_space": "0",
-                "storage_quota": repository.storage_quota,
-                "append_only": repository.append_only,
-            }
-            print("[repository]")
-            for key in [
-                "version",
-                "segments_per_dir",
-                "max_segment_size",
-                "storage_quota",
-                "additional_free_space",
-                "append_only",
-                "id",
-            ]:
-                value = config.get("repository", key, fallback=False)
-                if value is None:
-                    value = default_values.get(key)
-                    if value is None:
-                        raise Error("The repository config is missing the %s key which has no default value" % key)
-                print(f"{key} = {value}")
-            for key in ["last_segment_checked"]:
-                value = config.get("repository", key, fallback=None)
-                if value is None:
-                    continue
-                print(f"{key} = {value}")
-
-        if not args.list:
-            if args.name is None:
-                raise CommandError("No config key name was provided.")
-            try:
-                section, name = args.name.split(".")
-            except ValueError:
-                section = args.cache and "cache" or "repository"
-                name = args.name
-
-        if args.cache:
-            manifest = Manifest.load(repository, (Manifest.Operation.WRITE,))
-            assert_secure(repository, manifest, self.lock_wait)
-            cache = Cache(repository, manifest, lock_wait=self.lock_wait)
-
-        try:
-            if args.cache:
-                cache.cache_config.load()
-                config = cache.cache_config._config
-                save = cache.cache_config.save
-                validate = cache_validate
-            else:
-                config = repository.config
-                save = lambda: repository.save_config(repository.path, repository.config)  # noqa
-                validate = repo_validate
-
-            if args.delete:
-                validate(section, name, check_value=False)
-                config.remove_option(section, name)
-                if len(config.options(section)) == 0:
-                    config.remove_section(section)
-                save()
-            elif args.list:
-                list_config(config)
-            elif args.value:
-                validate(section, name, args.value)
-                if section not in config.sections():
-                    config.add_section(section)
-                config.set(section, name, args.value)
-                save()
-            else:
-                try:
-                    print(config.get(section, name))
-                except (configparser.NoOptionError, configparser.NoSectionError) as e:
-                    raise Error(e)
-        finally:
-            if args.cache:
-                cache.close()
-
-    def build_parser_config(self, subparsers, common_parser, mid_common_parser):
-        from ._common import process_epilog
-
-        config_epilog = process_epilog(
-            """
-        This command gets and sets options in a local repository or cache config file.
-        For security reasons, this command only works on local repositories.
-
-        To delete a config value entirely, use ``--delete``. To list the values
-        of the configuration file or the default values, use ``--list``.  To get an existing
-        key, pass only the key name. To set a key, pass both the key name and
-        the new value. Keys can be specified in the format "section.name" or
-        simply "name"; the section will default to "repository" and "cache" for
-        the repo and cache configs, respectively.
-
-
-        By default, borg config manipulates the repository config file. Using ``--cache``
-        edits the repository cache's config file instead.
-        """
-        )
-        subparser = subparsers.add_parser(
-            "config",
-            parents=[common_parser],
-            add_help=False,
-            description=self.do_config.__doc__,
-            epilog=config_epilog,
-            formatter_class=argparse.RawDescriptionHelpFormatter,
-            help="get and set configuration values",
-        )
-        subparser.set_defaults(func=self.do_config)
-        subparser.add_argument(
-            "-c", "--cache", dest="cache", action="store_true", help="get and set values from the repo cache"
-        )
-
-        group = subparser.add_mutually_exclusive_group()
-        group.add_argument(
-            "-d", "--delete", dest="delete", action="store_true", help="delete the key from the config file"
-        )
-        group.add_argument("-l", "--list", dest="list", action="store_true", help="list the configuration of the repo")
-
-        subparser.add_argument("name", metavar="NAME", nargs="?", help="name of config key")
-        subparser.add_argument("value", metavar="VALUE", nargs="?", help="new value for key")

+ 3 - 36
src/borg/archiver/debug_cmd.py

@@ -15,7 +15,8 @@ from ..helpers import positive_int_validator, archivename_validator
 from ..helpers import CommandError, RTError
 from ..manifest import Manifest
 from ..platform import get_process_id
-from ..repository import Repository, LIST_SCAN_LIMIT, TAG_PUT, TAG_DELETE, TAG_COMMIT
+from ..repository import Repository, TAG_PUT, TAG_DELETE, TAG_COMMIT
+from ..repository3 import Repository3, LIST_SCAN_LIMIT
 from ..repoobj import RepoObj
 
 from ._common import with_repository, Highlander
@@ -330,7 +331,7 @@ class DebugMixIn:
                     repository.delete(id)
                     modified = True
                     print("object %s deleted." % hex_id)
-                except Repository.ObjectNotFound:
+                except Repository3.ObjectNotFound:
                     print("object %s not found." % hex_id)
         if modified:
             repository.commit(compact=False)
@@ -351,23 +352,6 @@ class DebugMixIn:
                 except KeyError:
                     print("object %s not found [info from chunks cache]." % hex_id)
 
-    @with_repository(manifest=False, exclusive=True)
-    def do_debug_dump_hints(self, args, repository):
-        """dump repository hints"""
-        if not repository._active_txn:
-            repository.prepare_txn(repository.get_transaction_id())
-        try:
-            hints = dict(
-                segments=repository.segments,
-                compact=repository.compact,
-                storage_quota_use=repository.storage_quota_use,
-                shadow_index={bin_to_hex(k): v for k, v in repository.shadow_index.items()},
-            )
-            with dash_open(args.path, "w") as fd:
-                json.dump(hints, fd, indent=4)
-        finally:
-            repository.rollback()
-
     def do_debug_convert_profile(self, args):
         """convert Borg profile to Python profile"""
         import marshal
@@ -689,23 +673,6 @@ class DebugMixIn:
         subparser.set_defaults(func=self.do_debug_refcount_obj)
         subparser.add_argument("ids", metavar="IDs", nargs="+", type=str, help="hex object ID(s) to show refcounts for")
 
-        debug_dump_hints_epilog = process_epilog(
-            """
-        This command dumps the repository hints data.
-        """
-        )
-        subparser = debug_parsers.add_parser(
-            "dump-hints",
-            parents=[common_parser],
-            add_help=False,
-            description=self.do_debug_dump_hints.__doc__,
-            epilog=debug_dump_hints_epilog,
-            formatter_class=argparse.RawDescriptionHelpFormatter,
-            help="dump repo hints (debug)",
-        )
-        subparser.set_defaults(func=self.do_debug_dump_hints)
-        subparser.add_argument("path", metavar="PATH", type=str, help="file to dump data into")
-
         debug_convert_profile_epilog = process_epilog(
             """
         Convert a Borg profile to a Python cProfile compatible profile.

+ 1 - 10
src/borg/archiver/rcompress_cmd.py

@@ -24,14 +24,7 @@ def find_chunks(repository, repo_objs, stats, ctype, clevel, olevel):
     compr_keys = stats["compr_keys"] = set()
     compr_wanted = ctype, clevel, olevel
     state = None
-    chunks_count = len(repository)
-    chunks_limit = min(1000, max(100, chunks_count // 1000))
-    pi = ProgressIndicatorPercent(
-        total=chunks_count,
-        msg="Searching for recompression candidates %3.1f%%",
-        step=0.1,
-        msgid="rcompress.find_chunks",
-    )
+    chunks_limit = 1000
     while True:
         chunk_ids, state = repository.scan(limit=chunks_limit, state=state)
         if not chunk_ids:
@@ -44,8 +37,6 @@ def find_chunks(repository, repo_objs, stats, ctype, clevel, olevel):
             compr_keys.add(compr_found)
             stats[compr_found] += 1
             stats["checked_count"] += 1
-            pi.show(increase=1)
-    pi.finish()
     return recompress_ids
 
 

+ 1 - 1
src/borg/archiver/serve_cmd.py

@@ -3,7 +3,7 @@ import argparse
 from ._common import Highlander
 from ..constants import *  # NOQA
 from ..helpers import parse_storage_quota
-from ..remote import RepositoryServer
+from ..remote3 import RepositoryServer
 
 from ..logger import create_logger
 

+ 2 - 2
src/borg/archiver/version_cmd.py

@@ -2,7 +2,7 @@ import argparse
 
 from .. import __version__
 from ..constants import *  # NOQA
-from ..remote import RemoteRepository
+from ..remote3 import RemoteRepository3
 
 from ..logger import create_logger
 
@@ -16,7 +16,7 @@ class VersionMixIn:
 
         client_version = parse_version(__version__)
         if args.location.proto in ("ssh", "socket"):
-            with RemoteRepository(args.location, lock=False, args=args) as repository:
+            with RemoteRepository3(args.location, lock=False, args=args) as repository:
                 server_version = repository.server_version
         else:
             server_version = client_version

+ 4 - 12
src/borg/cache.py

@@ -32,7 +32,7 @@ from .locking import Lock
 from .manifest import Manifest
 from .platform import SaveFile
 from .remote import cache_if_remote
-from .repository import LIST_SCAN_LIMIT
+from .repository3 import LIST_SCAN_LIMIT
 
 # note: cmtime might be either a ctime or a mtime timestamp, chunks is a list of ChunkListEntry
 FileCacheEntry = namedtuple("FileCacheEntry", "age inode size cmtime chunks")
@@ -718,35 +718,27 @@ class ChunksMixin:
         return ChunkListEntry(id, size)
 
     def _load_chunks_from_repo(self):
-        # Explicitly set the initial usable hash table capacity to avoid performance issues
-        # due to hash table "resonance".
-        # Since we're creating an archive, add 10 % from the start.
-        num_chunks = len(self.repository)
-        chunks = ChunkIndex(usable=num_chunks * 1.1)
-        pi = ProgressIndicatorPercent(
-            total=num_chunks, msg="Downloading chunk list... %3.0f%%", msgid="cache.download_chunks"
-        )
+        chunks = ChunkIndex()
         t0 = perf_counter()
         num_requests = 0
+        num_chunks = 0
         marker = None
         while True:
             result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
             num_requests += 1
             if not result:
                 break
-            pi.show(increase=len(result))
             marker = result[-1]
             # All chunks from the repository have a refcount of MAX_VALUE, which is sticky,
             # therefore we can't/won't delete them. Chunks we added ourselves in this transaction
             # (e.g. checkpoint archives) are tracked correctly.
             init_entry = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
             for id_ in result:
+                num_chunks += 1
                 chunks[id_] = init_entry
-        assert len(chunks) == num_chunks
         # LocalCache does not contain the manifest, either.
         del chunks[self.manifest.MANIFEST_ID]
         duration = perf_counter() - t0 or 0.01
-        pi.finish()
         logger.debug(
             "Cache: downloaded %d chunk IDs in %.2f s (%d requests), ~%s/s",
             num_chunks,

+ 2 - 2
src/borg/crypto/keymanager.py

@@ -5,7 +5,7 @@ from hashlib import sha256
 
 from ..helpers import Error, yes, bin_to_hex, hex_to_bin, dash_open
 from ..manifest import Manifest, NoManifestError
-from ..repository import Repository
+from ..repository3 import Repository3
 from ..repoobj import RepoObj
 
 
@@ -50,7 +50,7 @@ class KeyManager:
 
         try:
             manifest_chunk = self.repository.get(Manifest.MANIFEST_ID)
-        except Repository.ObjectNotFound:
+        except Repository3.ObjectNotFound:
             raise NoManifestError
 
         manifest_data = RepoObj.extract_crypted_data(manifest_chunk)

+ 1 - 1
src/borg/fuse.py

@@ -46,7 +46,7 @@ from .helpers.lrucache import LRUCache
 from .item import Item
 from .platform import uid2user, gid2group
 from .platformflags import is_darwin
-from .remote import RemoteRepository
+from .remote import RemoteRepository  # TODO 3
 
 
 def fuse_main():

+ 1 - 1
src/borg/helpers/misc.py

@@ -2,7 +2,7 @@ import logging
 import io
 import os
 import os.path
-import platform
+import platform  # python stdlib import - if this fails, check that cwd != src/borg/
 import sys
 from collections import deque
 from itertools import islice

+ 3 - 1
src/borg/helpers/parseformat.py

@@ -1182,11 +1182,13 @@ def ellipsis_truncate(msg, space):
 class BorgJsonEncoder(json.JSONEncoder):
     def default(self, o):
         from ..repository import Repository
+        from ..repository3 import Repository3
         from ..remote import RemoteRepository
+        from ..remote3 import RemoteRepository3
         from ..archive import Archive
         from ..cache import LocalCache, AdHocCache, AdHocWithFilesCache
 
-        if isinstance(o, Repository) or isinstance(o, RemoteRepository):
+        if isinstance(o, (Repository, Repository3)) or isinstance(o, (RemoteRepository, RemoteRepository3)):
             return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()}
         if isinstance(o, Archive):
             return o.info()

+ 2 - 2
src/borg/manifest.py

@@ -246,11 +246,11 @@ class Manifest:
     def load(cls, repository, operations, key=None, *, ro_cls=RepoObj):
         from .item import ManifestItem
         from .crypto.key import key_factory
-        from .repository import Repository
+        from .repository3 import Repository3
 
         try:
             cdata = repository.get(cls.MANIFEST_ID)
-        except Repository.ObjectNotFound:
+        except Repository3.ObjectNotFound:
             raise NoManifestError
         if not key:
             key = key_factory(repository, cdata, ro_cls=ro_cls)

+ 3 - 1
src/borg/remote.py

@@ -640,6 +640,7 @@ class RemoteRepository:
                 exclusive=exclusive,
                 append_only=append_only,
                 make_parent_dirs=make_parent_dirs,
+                v1_or_v2=True,  # make remote use Repository, not Repository3
             )
             info = self.info()
             self.version = info["version"]
@@ -939,9 +940,10 @@ class RemoteRepository:
         since=parse_version("1.0.0"),
         append_only={"since": parse_version("1.0.7"), "previously": False},
         make_parent_dirs={"since": parse_version("1.1.9"), "previously": False},
+        v1_or_v2={"since": parse_version("2.0.0b8"), "previously": True},  # TODO fix version
     )
     def open(
-        self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False, make_parent_dirs=False
+        self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False, make_parent_dirs=False, v1_or_v2=False
     ):
         """actual remoting is done via self.call in the @api decorator"""
 

+ 1269 - 0
src/borg/remote3.py

@@ -0,0 +1,1269 @@
+import atexit
+import errno
+import functools
+import inspect
+import logging
+import os
+import queue
+import select
+import shlex
+import shutil
+import socket
+import struct
+import sys
+import tempfile
+import textwrap
+import time
+import traceback
+from subprocess import Popen, PIPE
+
+import borg.logger
+from . import __version__
+from .compress import Compressor
+from .constants import *  # NOQA
+from .helpers import Error, ErrorWithTraceback, IntegrityError
+from .helpers import bin_to_hex
+from .helpers import get_limited_unpacker
+from .helpers import replace_placeholders
+from .helpers import sysinfo
+from .helpers import format_file_size
+from .helpers import safe_unlink
+from .helpers import prepare_subprocess_env, ignore_sigint
+from .helpers import get_socket_filename
+from .locking import LockTimeout, NotLocked, NotMyLock, LockFailed
+from .logger import create_logger, borg_serve_log_queue
+from .helpers import msgpack
+from .repository import Repository
+from .repository3 import Repository3
+from .version import parse_version, format_version
+from .checksums import xxh64
+from .helpers.datastruct import EfficientCollectionQueue
+
+logger = create_logger(__name__)
+
+BORG_VERSION = parse_version(__version__)
+MSGID, MSG, ARGS, RESULT, LOG = "i", "m", "a", "r", "l"
+
+MAX_INFLIGHT = 100
+
+RATELIMIT_PERIOD = 0.1
+
+
+def os_write(fd, data):
+    """os.write wrapper so we do not lose data for partial writes."""
+    # TODO: this issue is fixed in cygwin since at least 2.8.0, remove this
+    #       wrapper / workaround when this version is considered ancient.
+    # This is happening frequently on cygwin due to its small pipe buffer size of only 64kiB
+    # and also due to its different blocking pipe behaviour compared to Linux/*BSD.
+    # Neither Linux nor *BSD ever do partial writes on blocking pipes, unless interrupted by a
+    # signal, in which case serve() would terminate.
+    amount = remaining = len(data)
+    while remaining:
+        count = os.write(fd, data)
+        remaining -= count
+        if not remaining:
+            break
+        data = data[count:]
+        time.sleep(count * 1e-09)
+    return amount
+
+
+class ConnectionClosed(Error):
+    """Connection closed by remote host"""
+
+    exit_mcode = 80
+
+
+class ConnectionClosedWithHint(ConnectionClosed):
+    """Connection closed by remote host. {}"""
+
+    exit_mcode = 81
+
+
+class PathNotAllowed(Error):
+    """Repository path not allowed: {}"""
+
+    exit_mcode = 83
+
+
+class InvalidRPCMethod(Error):
+    """RPC method {} is not valid"""
+
+    exit_mcode = 82
+
+
+class UnexpectedRPCDataFormatFromClient(Error):
+    """Borg {}: Got unexpected RPC data format from client."""
+
+    exit_mcode = 85
+
+
+class UnexpectedRPCDataFormatFromServer(Error):
+    """Got unexpected RPC data format from server:\n{}"""
+
+    exit_mcode = 86
+
+    def __init__(self, data):
+        try:
+            data = data.decode()[:128]
+        except UnicodeDecodeError:
+            data = data[:128]
+            data = ["%02X" % byte for byte in data]
+            data = textwrap.fill(" ".join(data), 16 * 3)
+        super().__init__(data)
+
+
+class ConnectionBrokenWithHint(Error):
+    """Connection to remote host is broken. {}"""
+
+    exit_mcode = 87
+
+
+# Protocol compatibility:
+# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting
+# too old servers. This ensures that the knowledge what is compatible is always held by the newer component.
+#
+# For the client the return of the negotiate method is a dict which includes the server version.
+#
+# All method calls on the remote repository object must be allowlisted in RepositoryServer.rpc_methods and have api
+# stubs in RemoteRepository*. The @api decorator on these stubs is used to set server version requirements.
+#
+# Method parameters are identified only by name and never by position. Unknown parameters are ignored by the server.
+# If a new parameter is important and may not be ignored, on the client a parameter specific version requirement needs
+# to be added.
+# When parameters are removed, they need to be preserved as defaulted parameters on the client stubs so that older
+# servers still get compatible input.
+
+
+class RepositoryServer:  # pragma: no cover
+    _rpc_methods = (
+        "__len__",
+        "check",
+        "commit",
+        "delete",
+        "destroy",
+        "flags",
+        "flags_many",
+        "get",
+        "list",
+        "scan",
+        "negotiate",
+        "open",
+        "close",
+        "info",
+        "put",
+        "rollback",
+        "save_key",
+        "load_key",
+        "break_lock",
+        "inject_exception",
+    )
+
+    _rpc_methods3 = (
+        "__len__",
+        "check",
+        "commit",
+        "delete",
+        "destroy",
+        "get",
+        "list",
+        "scan",
+        "negotiate",
+        "open",
+        "close",
+        "info",
+        "put",
+        "save_key",
+        "load_key",
+        "break_lock",
+        "inject_exception",
+    )
+
+    def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota, use_socket):
+        self.repository = None
+        self.RepoCls = None
+        self.rpc_methods = ("open", "close", "negotiate")
+        self.restrict_to_paths = restrict_to_paths
+        self.restrict_to_repositories = restrict_to_repositories
+        # This flag is parsed from the serve command line via Archiver.do_serve,
+        # i.e. it reflects local system policy and generally ranks higher than
+        # whatever the client wants, except when initializing a new repository
+        # (see RepositoryServer.open below).
+        self.append_only = append_only
+        self.storage_quota = storage_quota
+        self.client_version = None  # we update this after client sends version information
+        if use_socket is False:
+            self.socket_path = None
+        elif use_socket is True:  # --socket
+            self.socket_path = get_socket_filename()
+        else:  # --socket=/some/path
+            self.socket_path = use_socket
+
+    def filter_args(self, f, kwargs):
+        """Remove unknown named parameters from call, because client did (implicitly) say it's ok."""
+        known = set(inspect.signature(f).parameters)
+        return {name: kwargs[name] for name in kwargs if name in known}
+
+    def send_queued_log(self):
+        while True:
+            try:
+                # lr_dict contents see BorgQueueHandler
+                lr_dict = borg_serve_log_queue.get_nowait()
+            except queue.Empty:
+                break
+            else:
+                msg = msgpack.packb({LOG: lr_dict})
+                os_write(self.stdout_fd, msg)
+
+    def serve(self):
+        def inner_serve():
+            os.set_blocking(self.stdin_fd, False)
+            assert not os.get_blocking(self.stdin_fd)
+            os.set_blocking(self.stdout_fd, True)
+            assert os.get_blocking(self.stdout_fd)
+
+            unpacker = get_limited_unpacker("server")
+            shutdown_serve = False
+            while True:
+                # before processing any new RPCs, send out all pending log output
+                self.send_queued_log()
+
+                if shutdown_serve:
+                    # shutdown wanted! get out of here after sending all log output.
+                    assert self.repository is None
+                    return
+
+                # process new RPCs
+                r, w, es = select.select([self.stdin_fd], [], [], 10)
+                if r:
+                    data = os.read(self.stdin_fd, BUFSIZE)
+                    if not data:
+                        shutdown_serve = True
+                        continue
+                    unpacker.feed(data)
+                    for unpacked in unpacker:
+                        if isinstance(unpacked, dict):
+                            msgid = unpacked[MSGID]
+                            method = unpacked[MSG]
+                            args = unpacked[ARGS]
+                        else:
+                            if self.repository is not None:
+                                self.repository.close()
+                            raise UnexpectedRPCDataFormatFromClient(__version__)
+                        try:
+                            # logger.debug(f"{type(self)} method: {type(self.repository)}.{method}")
+                            if method not in self.rpc_methods:
+                                raise InvalidRPCMethod(method)
+                            try:
+                                f = getattr(self, method)
+                            except AttributeError:
+                                f = getattr(self.repository, method)
+                            args = self.filter_args(f, args)
+                            res = f(**args)
+                        except BaseException as e:
+                            # logger.exception(e)
+                            ex_short = traceback.format_exception_only(e.__class__, e)
+                            ex_full = traceback.format_exception(*sys.exc_info())
+                            ex_trace = True
+                            if isinstance(e, Error):
+                                ex_short = [e.get_message()]
+                                ex_trace = e.traceback
+                            if isinstance(e, (self.RepoCls.DoesNotExist, self.RepoCls.AlreadyExists, PathNotAllowed)):
+                                # These exceptions are reconstructed on the client end in RemoteRepository*.call_many(),
+                                # and will be handled just like locally raised exceptions. Suppress the remote traceback
+                                # for these, except ErrorWithTraceback, which should always display a traceback.
+                                pass
+                            else:
+                                logging.debug("\n".join(ex_full))
+
+                            sys_info = sysinfo()
+                            try:
+                                msg = msgpack.packb(
+                                    {
+                                        MSGID: msgid,
+                                        "exception_class": e.__class__.__name__,
+                                        "exception_args": e.args,
+                                        "exception_full": ex_full,
+                                        "exception_short": ex_short,
+                                        "exception_trace": ex_trace,
+                                        "sysinfo": sys_info,
+                                    }
+                                )
+                            except TypeError:
+                                msg = msgpack.packb(
+                                    {
+                                        MSGID: msgid,
+                                        "exception_class": e.__class__.__name__,
+                                        "exception_args": [
+                                            x if isinstance(x, (str, bytes, int)) else None for x in e.args
+                                        ],
+                                        "exception_full": ex_full,
+                                        "exception_short": ex_short,
+                                        "exception_trace": ex_trace,
+                                        "sysinfo": sys_info,
+                                    }
+                                )
+                            os_write(self.stdout_fd, msg)
+                        else:
+                            os_write(self.stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
+                if es:
+                    shutdown_serve = True
+                    continue
+
+        if self.socket_path:  # server for socket:// connections
+            try:
+                # remove any left-over socket file
+                os.unlink(self.socket_path)
+            except OSError:
+                if os.path.exists(self.socket_path):
+                    raise
+            sock_dir = os.path.dirname(self.socket_path)
+            os.makedirs(sock_dir, exist_ok=True)
+            if self.socket_path.endswith(".sock"):
+                pid_file = self.socket_path.replace(".sock", ".pid")
+            else:
+                pid_file = self.socket_path + ".pid"
+            pid = os.getpid()
+            with open(pid_file, "w") as f:
+                f.write(str(pid))
+            atexit.register(functools.partial(os.remove, pid_file))
+            atexit.register(functools.partial(os.remove, self.socket_path))
+            sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
+            sock.bind(self.socket_path)  # this creates the socket file in the fs
+            sock.listen(0)  # no backlog
+            os.chmod(self.socket_path, mode=0o0770)  # group members may use the socket, too.
+            print(f"borg serve: PID {pid}, listening on socket {self.socket_path} ...", file=sys.stderr)
+
+            while True:
+                connection, client_address = sock.accept()
+                print(f"Accepted a connection on socket {self.socket_path} ...", file=sys.stderr)
+                self.stdin_fd = connection.makefile("rb").fileno()
+                self.stdout_fd = connection.makefile("wb").fileno()
+                inner_serve()
+                print(f"Finished with connection on socket {self.socket_path} .", file=sys.stderr)
+        else:  # server for one ssh:// connection
+            self.stdin_fd = sys.stdin.fileno()
+            self.stdout_fd = sys.stdout.fileno()
+            inner_serve()
+
+    def negotiate(self, client_data):
+        if isinstance(client_data, dict):
+            self.client_version = client_data["client_version"]
+        else:
+            self.client_version = BORG_VERSION  # seems to be newer than current version (no known old format)
+
+        # not a known old format, send newest negotiate this version knows
+        return {"server_version": BORG_VERSION}
+
+    def _resolve_path(self, path):
+        if isinstance(path, bytes):
+            path = os.fsdecode(path)
+        if path.startswith("/~/"):  # /~/x = path x relative to own home dir
+            home_dir = os.environ.get("HOME") or os.path.expanduser("~%s" % os.environ.get("USER", ""))
+            path = os.path.join(home_dir, path[3:])
+        elif path.startswith("/./"):  # /./x = path x relative to cwd
+            path = path[3:]
+        return os.path.realpath(path)
+
+    def open(
+        self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False, make_parent_dirs=False, v1_or_v2=False
+    ):
+        self.RepoCls = Repository if v1_or_v2 else Repository3
+        self.rpc_methods = self._rpc_methods if v1_or_v2 else self._rpc_methods3
+        logging.debug("Resolving repository path %r", path)
+        path = self._resolve_path(path)
+        logging.debug("Resolved repository path to %r", path)
+        path_with_sep = os.path.join(path, "")  # make sure there is a trailing slash (os.sep)
+        if self.restrict_to_paths:
+            # if --restrict-to-path P is given, we make sure that we only operate in/below path P.
+            # for the prefix check, it is important that the compared paths both have trailing slashes,
+            # so that a path /foobar will NOT be accepted with --restrict-to-path /foo option.
+            for restrict_to_path in self.restrict_to_paths:
+                restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), "")  # trailing slash
+                if path_with_sep.startswith(restrict_to_path_with_sep):
+                    break
+            else:
+                raise PathNotAllowed(path)
+        if self.restrict_to_repositories:
+            for restrict_to_repository in self.restrict_to_repositories:
+                restrict_to_repository_with_sep = os.path.join(os.path.realpath(restrict_to_repository), "")
+                if restrict_to_repository_with_sep == path_with_sep:
+                    break
+            else:
+                raise PathNotAllowed(path)
+        # "borg init" on "borg serve --append-only" (=self.append_only) does not create an append only repo,
+        # while "borg init --append-only" (=append_only) does, regardless of the --append-only (self.append_only)
+        # flag for serve.
+        append_only = (not create and self.append_only) or append_only
+        self.repository = self.RepoCls(
+            path,
+            create,
+            lock_wait=lock_wait,
+            lock=lock,
+            append_only=append_only,
+            storage_quota=self.storage_quota,
+            exclusive=exclusive,
+            make_parent_dirs=make_parent_dirs,
+            send_log_cb=self.send_queued_log,
+        )
+        self.repository.__enter__()  # clean exit handled by serve() method
+        return self.repository.id
+
+    def close(self):
+        if self.repository is not None:
+            self.repository.__exit__(None, None, None)
+            self.repository = None
+        borg.logger.flush_logging()
+        self.send_queued_log()
+
+    def inject_exception(self, kind):
+        s1 = "test string"
+        s2 = "test string2"
+        if kind == "DoesNotExist":
+            raise self.RepoCls.DoesNotExist(s1)
+        elif kind == "AlreadyExists":
+            raise self.RepoCls.AlreadyExists(s1)
+        elif kind == "CheckNeeded":
+            raise self.RepoCls.CheckNeeded(s1)
+        elif kind == "IntegrityError":
+            raise IntegrityError(s1)
+        elif kind == "PathNotAllowed":
+            raise PathNotAllowed("foo")
+        elif kind == "ObjectNotFound":
+            raise self.RepoCls.ObjectNotFound(s1, s2)
+        elif kind == "InvalidRPCMethod":
+            raise InvalidRPCMethod(s1)
+        elif kind == "divide":
+            0 // 0
+
+
+class SleepingBandwidthLimiter:
+    def __init__(self, limit):
+        if limit:
+            self.ratelimit = int(limit * RATELIMIT_PERIOD)
+            self.ratelimit_last = time.monotonic()
+            self.ratelimit_quota = self.ratelimit
+        else:
+            self.ratelimit = None
+
+    def write(self, fd, to_send):
+        if self.ratelimit:
+            now = time.monotonic()
+            if self.ratelimit_last + RATELIMIT_PERIOD <= now:
+                self.ratelimit_quota += self.ratelimit
+                if self.ratelimit_quota > 2 * self.ratelimit:
+                    self.ratelimit_quota = 2 * self.ratelimit
+                self.ratelimit_last = now
+            if self.ratelimit_quota == 0:
+                tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now
+                time.sleep(tosleep)
+                self.ratelimit_quota += self.ratelimit
+                self.ratelimit_last = time.monotonic()
+            if len(to_send) > self.ratelimit_quota:
+                to_send = to_send[: self.ratelimit_quota]
+        try:
+            written = os.write(fd, to_send)
+        except BrokenPipeError:
+            raise ConnectionBrokenWithHint("Broken Pipe") from None
+        if self.ratelimit:
+            self.ratelimit_quota -= written
+        return written
+
+
+def api(*, since, **kwargs_decorator):
+    """Check version requirements and use self.call to do the remote method call.
+
+    <since> specifies the version in which borg introduced this method.
+    Calling this method when connected to an older version will fail without transmitting anything to the server.
+
+    Further kwargs can be used to encode version specific restrictions:
+
+    <previously> is the value resulting in the behaviour before introducing the new parameter.
+    If a previous hardcoded behaviour is parameterized in a version, this allows calls that use the previously
+    hardcoded behaviour to pass through and generates an error if another behaviour is requested by the client.
+    E.g. when 'append_only' was introduced in 1.0.7 the previous behaviour was what now is append_only=False.
+    Thus @api(..., append_only={'since': parse_version('1.0.7'), 'previously': False}) allows calls
+    with append_only=False for all version but rejects calls using append_only=True on versions older than 1.0.7.
+
+    <dontcare> is a flag to set the behaviour if an old version is called the new way.
+    If set to True, the method is called without the (not yet supported) parameter (this should be done if that is the
+    more desirable behaviour). If False, an exception is generated.
+    E.g. before 'threshold' was introduced in 1.2.0a8, a hardcoded threshold of 0.1 was used in commit().
+    """
+
+    def decorator(f):
+        @functools.wraps(f)
+        def do_rpc(self, *args, **kwargs):
+            sig = inspect.signature(f)
+            bound_args = sig.bind(self, *args, **kwargs)
+            named = {}  # Arguments for the remote process
+            extra = {}  # Arguments for the local process
+            for name, param in sig.parameters.items():
+                if name == "self":
+                    continue
+                if name in bound_args.arguments:
+                    if name == "wait":
+                        extra[name] = bound_args.arguments[name]
+                    else:
+                        named[name] = bound_args.arguments[name]
+                else:
+                    if param.default is not param.empty:
+                        named[name] = param.default
+
+            if self.server_version < since:
+                raise self.RPCServerOutdated(f.__name__, format_version(since))
+
+            for name, restriction in kwargs_decorator.items():
+                if restriction["since"] <= self.server_version:
+                    continue
+                if "previously" in restriction and named[name] == restriction["previously"]:
+                    continue
+                if restriction.get("dontcare", False):
+                    continue
+
+                raise self.RPCServerOutdated(
+                    f"{f.__name__} {name}={named[name]!s}", format_version(restriction["since"])
+                )
+
+            return self.call(f.__name__, named, **extra)
+
+        return do_rpc
+
+    return decorator
+
+
+class RemoteRepository3:
+    extra_test_args = []  # type: ignore
+
+    class RPCError(Exception):
+        def __init__(self, unpacked):
+            # unpacked has keys: 'exception_args', 'exception_full', 'exception_short', 'sysinfo'
+            self.unpacked = unpacked
+
+        def get_message(self):
+            return "\n".join(self.unpacked["exception_short"])
+
+        @property
+        def traceback(self):
+            return self.unpacked.get("exception_trace", True)
+
+        @property
+        def exception_class(self):
+            return self.unpacked["exception_class"]
+
+        @property
+        def exception_full(self):
+            return "\n".join(self.unpacked["exception_full"])
+
+        @property
+        def sysinfo(self):
+            return self.unpacked["sysinfo"]
+
+    class RPCServerOutdated(Error):
+        """Borg server is too old for {}. Required version {}"""
+
+        exit_mcode = 84
+
+        @property
+        def method(self):
+            return self.args[0]
+
+        @property
+        def required_version(self):
+            return self.args[1]
+
+    def __init__(
+        self,
+        location,
+        create=False,
+        exclusive=False,
+        lock_wait=None,
+        lock=True,
+        append_only=False,
+        make_parent_dirs=False,
+        args=None,
+    ):
+        self.location = self._location = location
+        self.preload_ids = []
+        self.msgid = 0
+        self.rx_bytes = 0
+        self.tx_bytes = 0
+        self.to_send = EfficientCollectionQueue(1024 * 1024, bytes)
+        self.stdin_fd = self.stdout_fd = self.stderr_fd = None
+        self.stderr_received = b""  # incomplete stderr line bytes received (no \n yet)
+        self.chunkid_to_msgids = {}
+        self.ignore_responses = set()
+        self.responses = {}
+        self.async_responses = {}
+        self.shutdown_time = None
+        self.ratelimit = SleepingBandwidthLimiter(args.upload_ratelimit * 1024 if args and args.upload_ratelimit else 0)
+        self.upload_buffer_size_limit = args.upload_buffer * 1024 * 1024 if args and args.upload_buffer else 0
+        self.unpacker = get_limited_unpacker("client")
+        self.server_version = None  # we update this after server sends its version
+        self.p = self.sock = None
+        self._args = args
+        if self.location.proto == "ssh":
+            testing = location.host == "__testsuite__"
+            # when testing, we invoke and talk to a borg process directly (no ssh).
+            # when not testing, we invoke the system-installed ssh binary to talk to a remote borg.
+            env = prepare_subprocess_env(system=not testing)
+            borg_cmd = self.borg_cmd(args, testing)
+            if not testing:
+                borg_cmd = self.ssh_cmd(location) + borg_cmd
+            logger.debug("SSH command line: %s", borg_cmd)
+            # we do not want the ssh getting killed by Ctrl-C/SIGINT because it is needed for clean shutdown of borg.
+            # borg's SIGINT handler tries to write a checkpoint and requires the remote repo connection.
+            self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env, preexec_fn=ignore_sigint)
+            self.stdin_fd = self.p.stdin.fileno()
+            self.stdout_fd = self.p.stdout.fileno()
+            self.stderr_fd = self.p.stderr.fileno()
+            self.r_fds = [self.stdout_fd, self.stderr_fd]
+            self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]
+        elif self.location.proto == "socket":
+            if args.use_socket is False or args.use_socket is True:  # nothing or --socket
+                socket_path = get_socket_filename()
+            else:  # --socket=/some/path
+                socket_path = args.use_socket
+            self.sock = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM)
+            try:
+                self.sock.connect(socket_path)  # note: socket_path length is rather limited.
+            except FileNotFoundError:
+                self.sock = None
+                raise Error(f"The socket file {socket_path} does not exist.")
+            except ConnectionRefusedError:
+                self.sock = None
+                raise Error(f"There is no borg serve running for the socket file {socket_path}.")
+            self.stdin_fd = self.sock.makefile("wb").fileno()
+            self.stdout_fd = self.sock.makefile("rb").fileno()
+            self.stderr_fd = None
+            self.r_fds = [self.stdout_fd]
+            self.x_fds = [self.stdin_fd, self.stdout_fd]
+        else:
+            raise Error(f"Unsupported protocol {location.proto}")
+
+        os.set_blocking(self.stdin_fd, False)
+        assert not os.get_blocking(self.stdin_fd)
+        os.set_blocking(self.stdout_fd, False)
+        assert not os.get_blocking(self.stdout_fd)
+        if self.stderr_fd is not None:
+            os.set_blocking(self.stderr_fd, False)
+            assert not os.get_blocking(self.stderr_fd)
+
+        try:
+            try:
+                version = self.call("negotiate", {"client_data": {"client_version": BORG_VERSION}})
+            except ConnectionClosed:
+                raise ConnectionClosedWithHint("Is borg working on the server?") from None
+            if isinstance(version, dict):
+                self.server_version = version["server_version"]
+            else:
+                raise Exception("Server insisted on using unsupported protocol version %s" % version)
+
+            self.id = self.open(
+                path=self.location.path,
+                create=create,
+                lock_wait=lock_wait,
+                lock=lock,
+                exclusive=exclusive,
+                append_only=append_only,
+                make_parent_dirs=make_parent_dirs,
+            )
+            info = self.info()
+            self.version = info["version"]
+            self.append_only = info["append_only"]
+
+        except Exception:
+            self.close()
+            raise
+
+    def __del__(self):
+        if len(self.responses):
+            logging.debug("still %d cached responses left in RemoteRepository3" % (len(self.responses),))
+        if self.p or self.sock:
+            self.close()
+            assert False, "cleanup happened in Repository3.__del__"
+
+    def __repr__(self):
+        return f"<{self.__class__.__name__} {self.location.canonical_path()}>"
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        try:
+            if exc_type is not None:
+                self.shutdown_time = time.monotonic() + 30
+        finally:
+            # in any case, we want to close the repo cleanly.
+            logger.debug(
+                "RemoteRepository3: %s bytes sent, %s bytes received, %d messages sent",
+                format_file_size(self.tx_bytes),
+                format_file_size(self.rx_bytes),
+                self.msgid,
+            )
+            self.close()
+
+    @property
+    def id_str(self):
+        return bin_to_hex(self.id)
+
+    def borg_cmd(self, args, testing):
+        """return a borg serve command line"""
+        # give some args/options to 'borg serve' process as they were given to us
+        opts = []
+        if args is not None:
+            root_logger = logging.getLogger()
+            if root_logger.isEnabledFor(logging.DEBUG):
+                opts.append("--debug")
+            elif root_logger.isEnabledFor(logging.INFO):
+                opts.append("--info")
+            elif root_logger.isEnabledFor(logging.WARNING):
+                pass  # warning is default
+            elif root_logger.isEnabledFor(logging.ERROR):
+                opts.append("--error")
+            elif root_logger.isEnabledFor(logging.CRITICAL):
+                opts.append("--critical")
+            else:
+                raise ValueError("log level missing, fix this code")
+
+            # Tell the remote server about debug topics it may need to consider.
+            # Note that debug topics are usable for "spew" or "trace" logs which would
+            # be too plentiful to transfer for normal use, so the server doesn't send
+            # them unless explicitly enabled.
+            #
+            # Needless to say, if you do --debug-topic=repository.compaction, for example,
+            # with a 1.0.x server it won't work, because the server does not recognize the
+            # option.
+            #
+            # This is not considered a problem, since this is a debugging feature that
+            # should not be used for regular use.
+            for topic in args.debug_topics:
+                if "." not in topic:
+                    topic = "borg.debug." + topic
+                if "repository" in topic:
+                    opts.append("--debug-topic=%s" % topic)
+
+            if "storage_quota" in args and args.storage_quota:
+                opts.append("--storage-quota=%s" % args.storage_quota)
+        env_vars = []
+        if testing:
+            return env_vars + [sys.executable, "-m", "borg", "serve"] + opts + self.extra_test_args
+        else:  # pragma: no cover
+            remote_path = args.remote_path or os.environ.get("BORG_REMOTE_PATH", "borg")
+            remote_path = replace_placeholders(remote_path)
+            return env_vars + [remote_path, "serve"] + opts
+
+    def ssh_cmd(self, location):
+        """return a ssh command line that can be prefixed to a borg command line"""
+        rsh = self._args.rsh or os.environ.get("BORG_RSH", "ssh")
+        args = shlex.split(rsh)
+        if location.port:
+            args += ["-p", str(location.port)]
+        if location.user:
+            args.append(f"{location.user}@{location.host}")
+        else:
+            args.append("%s" % location.host)
+        return args
+
+    def call(self, cmd, args, **kw):
+        for resp in self.call_many(cmd, [args], **kw):
+            return resp
+
+    def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True):
+        if not calls and cmd != "async_responses":
+            return
+
+        def send_buffer():
+            if self.to_send:
+                try:
+                    written = self.ratelimit.write(self.stdin_fd, self.to_send.peek_front())
+                    self.tx_bytes += written
+                    self.to_send.pop_front(written)
+                except OSError as e:
+                    # io.write might raise EAGAIN even though select indicates
+                    # that the fd should be writable.
+                    # EWOULDBLOCK is added for defensive programming sake.
+                    if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]:
+                        raise
+
+        def pop_preload_msgid(chunkid):
+            msgid = self.chunkid_to_msgids[chunkid].pop(0)
+            if not self.chunkid_to_msgids[chunkid]:
+                del self.chunkid_to_msgids[chunkid]
+            return msgid
+
+        def handle_error(unpacked):
+            if "exception_class" not in unpacked:
+                return
+
+            error = unpacked["exception_class"]
+            args = unpacked["exception_args"]
+
+            if error == "Error":
+                raise Error(args[0])
+            elif error == "ErrorWithTraceback":
+                raise ErrorWithTraceback(args[0])
+            elif error == "DoesNotExist":
+                raise Repository3.DoesNotExist(self.location.processed)
+            elif error == "AlreadyExists":
+                raise Repository3.AlreadyExists(self.location.processed)
+            elif error == "CheckNeeded":
+                raise Repository3.CheckNeeded(self.location.processed)
+            elif error == "IntegrityError":
+                raise IntegrityError(args[0])
+            elif error == "PathNotAllowed":
+                raise PathNotAllowed(args[0])
+            elif error == "PathPermissionDenied":
+                raise Repository3.PathPermissionDenied(args[0])
+            elif error == "ParentPathDoesNotExist":
+                raise Repository3.ParentPathDoesNotExist(args[0])
+            elif error == "ObjectNotFound":
+                raise Repository3.ObjectNotFound(args[0], self.location.processed)
+            elif error == "InvalidRPCMethod":
+                raise InvalidRPCMethod(args[0])
+            elif error == "LockTimeout":
+                raise LockTimeout(args[0])
+            elif error == "LockFailed":
+                raise LockFailed(args[0], args[1])
+            elif error == "NotLocked":
+                raise NotLocked(args[0])
+            elif error == "NotMyLock":
+                raise NotMyLock(args[0])
+            else:
+                raise self.RPCError(unpacked)
+
+        calls = list(calls)
+        waiting_for = []
+        maximum_to_send = 0 if wait else self.upload_buffer_size_limit
+        send_buffer()  # Try to send data, as some cases (async_response) will never try to send data otherwise.
+        while wait or calls:
+            if self.shutdown_time and time.monotonic() > self.shutdown_time:
+                # we are shutting this RemoteRepository3 down already, make sure we do not waste
+                # a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
+                logger.debug(
+                    "shutdown_time reached, shutting down with %d waiting_for and %d async_responses.",
+                    len(waiting_for),
+                    len(self.async_responses),
+                )
+                return
+            while waiting_for:
+                try:
+                    unpacked = self.responses.pop(waiting_for[0])
+                    waiting_for.pop(0)
+                    handle_error(unpacked)
+                    yield unpacked[RESULT]
+                    if not waiting_for and not calls:
+                        return
+                except KeyError:
+                    break
+            if cmd == "async_responses":
+                while True:
+                    try:
+                        msgid, unpacked = self.async_responses.popitem()
+                    except KeyError:
+                        # there is nothing left what we already have received
+                        if async_wait and self.ignore_responses:
+                            # but do not return if we shall wait and there is something left to wait for:
+                            break
+                        else:
+                            return
+                    else:
+                        handle_error(unpacked)
+                        yield unpacked[RESULT]
+            if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
+                w_fds = [self.stdin_fd]
+            else:
+                w_fds = []
+            r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
+            if x:
+                raise Exception("FD exception occurred")
+            for fd in r:
+                if fd is self.stdout_fd:
+                    data = os.read(fd, BUFSIZE)
+                    if not data:
+                        raise ConnectionClosed()
+                    self.rx_bytes += len(data)
+                    self.unpacker.feed(data)
+                    for unpacked in self.unpacker:
+                        if not isinstance(unpacked, dict):
+                            raise UnexpectedRPCDataFormatFromServer(data)
+
+                        lr_dict = unpacked.get(LOG)
+                        if lr_dict is not None:
+                            # Re-emit remote log messages locally.
+                            _logger = logging.getLogger(lr_dict["name"])
+                            if _logger.isEnabledFor(lr_dict["level"]):
+                                _logger.handle(logging.LogRecord(**lr_dict))
+                            continue
+
+                        msgid = unpacked[MSGID]
+                        if msgid in self.ignore_responses:
+                            self.ignore_responses.remove(msgid)
+                            # async methods never return values, but may raise exceptions.
+                            if "exception_class" in unpacked:
+                                self.async_responses[msgid] = unpacked
+                            else:
+                                # we currently do not have async result values except "None",
+                                # so we do not add them into async_responses.
+                                if unpacked[RESULT] is not None:
+                                    self.async_responses[msgid] = unpacked
+                        else:
+                            self.responses[msgid] = unpacked
+                elif fd is self.stderr_fd:
+                    data = os.read(fd, 32768)
+                    if not data:
+                        raise ConnectionClosed()
+                    self.rx_bytes += len(data)
+                    # deal with incomplete lines (may appear due to block buffering)
+                    if self.stderr_received:
+                        data = self.stderr_received + data
+                        self.stderr_received = b""
+                    lines = data.splitlines(keepends=True)
+                    if lines and not lines[-1].endswith((b"\r", b"\n")):
+                        self.stderr_received = lines.pop()
+                    # now we have complete lines in <lines> and any partial line in self.stderr_received.
+                    _logger = logging.getLogger()
+                    for line in lines:
+                        # borg serve (remote/server side) should not emit stuff on stderr,
+                        # but e.g. the ssh process (local/client side) might output errors there.
+                        assert line.endswith((b"\r", b"\n"))
+                        # something came in on stderr, log it to not lose it.
+                        # decode late, avoid partial utf-8 sequences.
+                        _logger.warning("stderr: " + line.decode().strip())
+            if w:
+                while (
+                    (len(self.to_send) <= maximum_to_send)
+                    and (calls or self.preload_ids)
+                    and len(waiting_for) < MAX_INFLIGHT
+                ):
+                    if calls:
+                        if is_preloaded:
+                            assert cmd == "get", "is_preload is only supported for 'get'"
+                            if calls[0]["id"] in self.chunkid_to_msgids:
+                                waiting_for.append(pop_preload_msgid(calls.pop(0)["id"]))
+                        else:
+                            args = calls.pop(0)
+                            if cmd == "get" and args["id"] in self.chunkid_to_msgids:
+                                waiting_for.append(pop_preload_msgid(args["id"]))
+                            else:
+                                self.msgid += 1
+                                waiting_for.append(self.msgid)
+                                self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}))
+                    if not self.to_send and self.preload_ids:
+                        chunk_id = self.preload_ids.pop(0)
+                        args = {"id": chunk_id}
+                        self.msgid += 1
+                        self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
+                        self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args}))
+
+                send_buffer()
+        self.ignore_responses |= set(waiting_for)  # we lose order here
+
+    @api(
+        since=parse_version("1.0.0"),
+        append_only={"since": parse_version("1.0.7"), "previously": False},
+        make_parent_dirs={"since": parse_version("1.1.9"), "previously": False},
+        v1_or_v2={"since": parse_version("2.0.0b8"), "previously": True},  # TODO fix version
+    )
+    def open(
+        self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False, make_parent_dirs=False, v1_or_v2=False
+    ):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(since=parse_version("2.0.0a3"))
+    def info(self):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(since=parse_version("1.0.0"), max_duration={"since": parse_version("1.2.0a4"), "previously": 0})
+    def check(self, repair=False, max_duration=0):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(
+        since=parse_version("1.0.0"),
+        compact={"since": parse_version("1.2.0a0"), "previously": True, "dontcare": True},
+        threshold={"since": parse_version("1.2.0a8"), "previously": 0.1, "dontcare": True},
+    )
+    def commit(self, compact=True, threshold=0.1):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(since=parse_version("1.0.0"))
+    def rollback(self):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(since=parse_version("1.0.0"))
+    def destroy(self):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(since=parse_version("1.0.0"))
+    def __len__(self):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(
+        since=parse_version("1.0.0"),
+        mask={"since": parse_version("2.0.0b2"), "previously": 0},
+        value={"since": parse_version("2.0.0b2"), "previously": 0},
+    )
+    def list(self, limit=None, marker=None, mask=0, value=0):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(since=parse_version("2.0.0b3"))
+    def scan(self, limit=None, state=None):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    def get(self, id, read_data=True):
+        for resp in self.get_many([id], read_data=read_data):
+            return resp
+
+    def get_many(self, ids, read_data=True, is_preloaded=False):
+        yield from self.call_many("get", [{"id": id, "read_data": read_data} for id in ids], is_preloaded=is_preloaded)
+
+    @api(since=parse_version("1.0.0"))
+    def put(self, id, data, wait=True):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(since=parse_version("1.0.0"))
+    def delete(self, id, wait=True):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(since=parse_version("1.0.0"))
+    def save_key(self, keydata):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(since=parse_version("1.0.0"))
+    def load_key(self):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    @api(since=parse_version("1.0.0"))
+    def break_lock(self):
+        """actual remoting is done via self.call in the @api decorator"""
+
+    def close(self):
+        if self.p or self.sock:
+            self.call("close", {}, wait=True)
+        if self.p:
+            self.p.stdin.close()
+            self.p.stdout.close()
+            self.p.wait()
+            self.p = None
+        if self.sock:
+            try:
+                self.sock.shutdown(socket.SHUT_RDWR)
+            except OSError as e:
+                if e.errno != errno.ENOTCONN:
+                    raise
+            self.sock.close()
+            self.sock = None
+
+    def async_response(self, wait=True):
+        for resp in self.call_many("async_responses", calls=[], wait=True, async_wait=wait):
+            return resp
+
+    def preload(self, ids):
+        self.preload_ids += ids
+
+
+class RepositoryNoCache:
+    """A not caching Repository wrapper, passes through to repository.
+
+    Just to have same API (including the context manager) as RepositoryCache.
+
+    *transform* is a callable taking two arguments, key and raw repository data.
+    The return value is returned from get()/get_many(). By default, the raw
+    repository data is returned.
+    """
+
+    def __init__(self, repository, transform=None):
+        self.repository = repository
+        self.transform = transform or (lambda key, data: data)
+
+    def close(self):
+        pass
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    def get(self, key, read_data=True):
+        return next(self.get_many([key], read_data=read_data, cache=False))
+
+    def get_many(self, keys, read_data=True, cache=True):
+        for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data)):
+            yield self.transform(key, data)
+
+    def log_instrumentation(self):
+        pass
+
+
+class RepositoryCache(RepositoryNoCache):
+    """
+    A caching Repository wrapper.
+
+    Caches Repository GET operations locally.
+
+    *pack* and *unpack* complement *transform* of the base class.
+    *pack* receives the output of *transform* and should return bytes,
+    which are stored in the cache. *unpack* receives these bytes and
+    should return the initial data (as returned by *transform*).
+    """
+
+    def __init__(self, repository, pack=None, unpack=None, transform=None):
+        super().__init__(repository, transform)
+        self.pack = pack or (lambda data: data)
+        self.unpack = unpack or (lambda data: data)
+        self.cache = set()
+        self.basedir = tempfile.mkdtemp(prefix="borg-cache-")
+        self.query_size_limit()
+        self.size = 0
+        # Instrumentation
+        self.hits = 0
+        self.misses = 0
+        self.slow_misses = 0
+        self.slow_lat = 0.0
+        self.evictions = 0
+        self.enospc = 0
+
+    def query_size_limit(self):
+        available_space = shutil.disk_usage(self.basedir).free
+        self.size_limit = int(min(available_space * 0.25, 2**31))
+
+    def prefixed_key(self, key, complete):
+        # just prefix another byte telling whether this key refers to a complete chunk
+        # or a without-data-metadata-only chunk (see also read_data param).
+        prefix = b"\x01" if complete else b"\x00"
+        return prefix + key
+
+    def key_filename(self, key):
+        return os.path.join(self.basedir, bin_to_hex(key))
+
+    def backoff(self):
+        self.query_size_limit()
+        target_size = int(0.9 * self.size_limit)
+        while self.size > target_size and self.cache:
+            key = self.cache.pop()
+            file = self.key_filename(key)
+            self.size -= os.stat(file).st_size
+            os.unlink(file)
+            self.evictions += 1
+
+    def add_entry(self, key, data, cache, complete):
+        transformed = self.transform(key, data)
+        if not cache:
+            return transformed
+        packed = self.pack(transformed)
+        pkey = self.prefixed_key(key, complete=complete)
+        file = self.key_filename(pkey)
+        try:
+            with open(file, "wb") as fd:
+                fd.write(packed)
+        except OSError as os_error:
+            try:
+                safe_unlink(file)
+            except FileNotFoundError:
+                pass  # open() could have failed as well
+            if os_error.errno == errno.ENOSPC:
+                self.enospc += 1
+                self.backoff()
+            else:
+                raise
+        else:
+            self.size += len(packed)
+            self.cache.add(pkey)
+            if self.size > self.size_limit:
+                self.backoff()
+        return transformed
+
+    def log_instrumentation(self):
+        logger.debug(
+            "RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), "
+            "%d evictions, %d ENOSPC hit",
+            len(self.cache),
+            format_file_size(self.size),
+            format_file_size(self.size_limit),
+            self.hits,
+            self.misses,
+            self.slow_misses,
+            self.slow_lat,
+            self.evictions,
+            self.enospc,
+        )
+
+    def close(self):
+        self.log_instrumentation()
+        self.cache.clear()
+        shutil.rmtree(self.basedir)
+
+    def get_many(self, keys, read_data=True, cache=True):
+        # It could use different cache keys depending on read_data and cache full vs. meta-only chunks.
+        unknown_keys = [key for key in keys if self.prefixed_key(key, complete=read_data) not in self.cache]
+        repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys, read_data=read_data))
+        for key in keys:
+            pkey = self.prefixed_key(key, complete=read_data)
+            if pkey in self.cache:
+                file = self.key_filename(pkey)
+                with open(file, "rb") as fd:
+                    self.hits += 1
+                    yield self.unpack(fd.read())
+            else:
+                for key_, data in repository_iterator:
+                    if key_ == key:
+                        transformed = self.add_entry(key, data, cache, complete=read_data)
+                        self.misses += 1
+                        yield transformed
+                        break
+                else:
+                    # slow path: eviction during this get_many removed this key from the cache
+                    t0 = time.perf_counter()
+                    data = self.repository.get(key, read_data=read_data)
+                    self.slow_lat += time.perf_counter() - t0
+                    transformed = self.add_entry(key, data, cache, complete=read_data)
+                    self.slow_misses += 1
+                    yield transformed
+        # Consume any pending requests
+        for _ in repository_iterator:
+            pass
+
+
+def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False):
+    """
+    Return a Repository(No)Cache for *repository*.
+
+    If *decrypted_cache* is a repo_objs object, then get and get_many will return a tuple
+    (csize, plaintext) instead of the actual data in the repository. The cache will
+    store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting
+    and more importantly MAC and ID checking cached objects).
+    Internally, objects are compressed with LZ4.
+    """
+    if decrypted_cache and (pack or unpack or transform):
+        raise ValueError("decrypted_cache and pack/unpack/transform are incompatible")
+    elif decrypted_cache:
+        repo_objs = decrypted_cache
+        # 32 bit csize, 64 bit (8 byte) xxh64, 1 byte ctype, 1 byte clevel
+        cache_struct = struct.Struct("=I8sBB")
+        compressor = Compressor("lz4")
+
+        def pack(data):
+            csize, decrypted = data
+            meta, compressed = compressor.compress({}, decrypted)
+            return cache_struct.pack(csize, xxh64(compressed), meta["ctype"], meta["clevel"]) + compressed
+
+        def unpack(data):
+            data = memoryview(data)
+            csize, checksum, ctype, clevel = cache_struct.unpack(data[: cache_struct.size])
+            compressed = data[cache_struct.size :]
+            if checksum != xxh64(compressed):
+                raise IntegrityError("detected corrupted data in metadata cache")
+            meta = dict(ctype=ctype, clevel=clevel, csize=len(compressed))
+            _, decrypted = compressor.decompress(meta, compressed)
+            return csize, decrypted
+
+        def transform(id_, data):
+            meta, decrypted = repo_objs.parse(id_, data, ro_type=ROBJ_DONTCARE)
+            csize = meta.get("csize", len(data))
+            return csize, decrypted
+
+    if isinstance(repository, RemoteRepository3) or force_cache:
+        return RepositoryCache(repository, pack, unpack, transform)
+    else:
+        return RepositoryNoCache(repository, transform)

+ 314 - 0
src/borg/repository3.py

@@ -0,0 +1,314 @@
+import os
+
+from borgstore.store import Store
+from borgstore.store import ObjectNotFound as StoreObjectNotFound
+
+from .constants import *  # NOQA
+from .helpers import Error, ErrorWithTraceback, IntegrityError
+from .helpers import Location
+from .helpers import bin_to_hex, hex_to_bin
+from .logger import create_logger
+from .repoobj import RepoObj
+
+logger = create_logger(__name__)
+
+
+class Repository3:
+    """borgstore based key value store"""
+
+    class AlreadyExists(Error):
+        """A repository already exists at {}."""
+
+        exit_mcode = 10
+
+    class CheckNeeded(ErrorWithTraceback):
+        """Inconsistency detected. Please run "borg check {}"."""
+
+        exit_mcode = 12
+
+    class DoesNotExist(Error):
+        """Repository {} does not exist."""
+
+        exit_mcode = 13
+
+    class InsufficientFreeSpaceError(Error):
+        """Insufficient free space to complete transaction (required: {}, available: {})."""
+
+        exit_mcode = 14
+
+    class InvalidRepository(Error):
+        """{} is not a valid repository. Check repo config."""
+
+        exit_mcode = 15
+
+    class InvalidRepositoryConfig(Error):
+        """{} does not have a valid configuration. Check repo config [{}]."""
+
+        exit_mcode = 16
+
+    class ObjectNotFound(ErrorWithTraceback):
+        """Object with key {} not found in repository {}."""
+
+        exit_mcode = 17
+
+        def __init__(self, id, repo):
+            if isinstance(id, bytes):
+                id = bin_to_hex(id)
+            super().__init__(id, repo)
+
+    class ParentPathDoesNotExist(Error):
+        """The parent path of the repo directory [{}] does not exist."""
+
+        exit_mcode = 18
+
+    class PathAlreadyExists(Error):
+        """There is already something at {}."""
+
+        exit_mcode = 19
+
+    class StorageQuotaExceeded(Error):
+        """The storage quota ({}) has been exceeded ({}). Try deleting some archives."""
+
+        exit_mcode = 20
+
+    class PathPermissionDenied(Error):
+        """Permission denied to {}."""
+
+        exit_mcode = 21
+
+    def __init__(
+        self,
+        path,
+        create=False,
+        exclusive=False,
+        lock_wait=None,
+        lock=True,
+        append_only=False,
+        storage_quota=None,
+        make_parent_dirs=False,
+        send_log_cb=None,
+    ):
+        self.path = os.path.abspath(path)
+        url = "file://%s" % self.path
+        # use a Store with flat config storage and 2-levels-nested data storage
+        self.store = Store(url, levels={"config/": [0], "data/": [2]})
+        self._location = Location(url)
+        self.version = None
+        # long-running repository methods which emit log or progress output are responsible for calling
+        # the ._send_log method periodically to get log and progress output transferred to the borg client
+        # in a timely manner, in case we have a RemoteRepository.
+        # for local repositories ._send_log can be called also (it will just do nothing in that case).
+        self._send_log = send_log_cb or (lambda: None)
+        self.do_create = create
+        self.created = False
+        self.acceptable_repo_versions = (3, )
+        self.opened = False
+        self.append_only = append_only  # XXX not implemented / not implementable
+        self.storage_quota = storage_quota  # XXX not implemented
+        self.storage_quota_use = 0  # XXX not implemented
+
+    def __repr__(self):
+        return f"<{self.__class__.__name__} {self.path}>"
+
+    def __enter__(self):
+        if self.do_create:
+            self.do_create = False
+            self.create()
+            self.created = True
+        self.open()
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.close()
+
+    @property
+    def id_str(self):
+        return bin_to_hex(self.id)
+
+    def create(self):
+        """Create a new empty repository"""
+        self.store.create()
+        self.store.open()
+        self.store.store("config/readme", REPOSITORY_README.encode())
+        self.version = 3
+        self.store.store("config/version", str(self.version).encode())
+        self.store.store("config/id", bin_to_hex(os.urandom(32)).encode())
+        self.store.close()
+
+    def _set_id(self, id):
+        # for testing: change the id of an existing repository
+        assert self.opened
+        assert isinstance(id, bytes) and len(id) == 32
+        self.id = id
+        self.store.store("config/id", bin_to_hex(id).encode())
+
+    def save_key(self, keydata):
+        # note: saving an empty key means that there is no repokey anymore
+        self.store.store("keys/repokey", keydata)
+
+    def load_key(self):
+        keydata = self.store.load("keys/repokey")
+        # note: if we return an empty string, it means there is no repo key
+        return keydata
+
+    def destroy(self):
+        """Destroy the repository"""
+        self.close()
+        self.store.destroy()
+
+    def open(self):
+        self.store.open()
+        readme = self.store.load("config/readme").decode()
+        if readme != REPOSITORY_README:
+            raise self.InvalidRepository(self.path)
+        self.version = int(self.store.load("config/version").decode())
+        if self.version not in self.acceptable_repo_versions:
+            self.close()
+            raise self.InvalidRepositoryConfig(
+                self.path, "repository version %d is not supported by this borg version" % self.version
+            )
+        self.id = hex_to_bin(self.store.load("config/id").decode(), length=32)
+        self.opened = True
+
+    def close(self):
+        if self.opened:
+            self.store.close()
+            self.opened = False
+
+    def info(self):
+        """return some infos about the repo (must be opened first)"""
+        info = dict(id=self.id, version=self.version, storage_quota_use=self.storage_quota_use, storage_quota=self.storage_quota, append_only=self.append_only)
+        return info
+
+    def commit(self, compact=True, threshold=0.1):
+        pass
+
+    def check(self, repair=False, max_duration=0):
+        """Check repository consistency
+
+        This method verifies all segment checksums and makes sure
+        the index is consistent with the data stored in the segments.
+        """
+        mode = "full"
+        logger.info("Starting repository check")
+        # XXX TODO
+        logger.info("Finished %s repository check, no problems found.", mode)
+        return True
+
+    def scan_low_level(self, segment=None, offset=None):
+        raise NotImplementedError
+
+    def __len__(self):
+        raise NotImplementedError
+
+    def __contains__(self, id):
+        raise NotImplementedError
+
+    def list(self, limit=None, marker=None, mask=0, value=0):
+        """
+        list <limit> IDs starting from after id <marker> - in index (pseudo-random) order.
+
+        if mask and value are given, only return IDs where flags & mask == value (default: all IDs).
+        """
+        infos = self.store.list("data")  # XXX we can only get the full list from the store
+        ids = [hex_to_bin(info.name) for info in infos]
+        if marker is not None:
+            idx = ids.index(marker)
+            ids = ids[idx + 1:]
+        if limit is not None:
+            return ids[:limit]
+        return ids
+
+
+    def scan(self, limit=None, state=None):
+        """
+        list (the next) <limit> chunk IDs from the repository.
+
+        state can either be None (initially, when starting to scan) or the object
+        returned from a previous scan call (meaning "continue scanning").
+
+        returns: list of chunk ids, state
+        """
+        # we only have store.list() anyway, so just call .list() from here.
+        ids = self.list(limit=limit, marker=state)
+        state = ids[-1] if ids else None
+        return ids, state
+
+    def get(self, id, read_data=True):
+        id_hex = bin_to_hex(id)
+        key = "data/" + id_hex
+        try:
+            if read_data:
+                # read everything
+                return self.store.load(key)
+            else:
+                # RepoObj layout supports separately encrypted metadata and data.
+                # We return enough bytes so the client can decrypt the metadata.
+                meta_len_size = RepoObj.meta_len_hdr.size
+                extra_len = 1024 - meta_len_size  # load a bit more, 1024b, reduces round trips
+                obj = self.store.load(key, size=meta_len_size + extra_len)
+                meta_len = obj[0:meta_len_size]
+                if len(meta_len) != meta_len_size:
+                    raise IntegrityError(
+                        f"Object too small [id {id_hex}]: expected {meta_len_size}, got {len(meta_len)} bytes"
+                    )
+                ml = RepoObj.meta_len_hdr.unpack(meta_len)[0]
+                if ml > extra_len:
+                    # we did not get enough, need to load more, but not all.
+                    # this should be rare, as chunk metadata is rather small usually.
+                    obj = self.store.load(key, size=meta_len_size + ml)
+                meta = obj[meta_len_size:meta_len_size + ml]
+                if len(meta) != ml:
+                    raise IntegrityError(
+                        f"Object too small [id {id_hex}]: expected {ml}, got {len(meta)} bytes"
+                    )
+                return meta_len + meta
+        except StoreObjectNotFound:
+            raise self.ObjectNotFound(id, self.path) from None
+
+
+    def get_many(self, ids, read_data=True, is_preloaded=False):
+        for id_ in ids:
+            yield self.get(id_, read_data=read_data)
+
+    def put(self, id, data, wait=True):
+        """put a repo object
+
+        Note: when doing calls with wait=False this gets async and caller must
+              deal with async results / exceptions later.
+        """
+        data_size = len(data)
+        if data_size > MAX_DATA_SIZE:
+            raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]")
+
+        key = "data/" + bin_to_hex(id)
+        self.store.store(key, data)
+
+    def delete(self, id, wait=True):
+        """delete a repo object
+
+        Note: when doing calls with wait=False this gets async and caller must
+              deal with async results / exceptions later.
+        """
+        key = "data/" + bin_to_hex(id)
+        try:
+            self.store.delete(key)
+        except StoreObjectNotFound:
+            raise self.ObjectNotFound(id, self.path) from None
+
+    def async_response(self, wait=True):
+        """Get one async result (only applies to remote repositories).
+
+        async commands (== calls with wait=False, e.g. delete and put) have no results,
+        but may raise exceptions. These async exceptions must get collected later via
+        async_response() calls. Repeat the call until it returns None.
+        The previous calls might either return one (non-None) result or raise an exception.
+        If wait=True is given and there are outstanding responses, it will wait for them
+        to arrive. With wait=False, it will only return already received responses.
+        """
+
+    def preload(self, ids):
+        """Preload objects (only applies to remote repositories)"""
+
+    def break_lock(self):
+        pass

+ 8 - 12
src/borg/testsuite/archiver/__init__.py

@@ -27,8 +27,8 @@ from ...helpers import init_ec_warnings
 from ...logger import flush_logging
 from ...manifest import Manifest
 from ...platform import get_flags
-from ...remote import RemoteRepository
-from ...repository import Repository
+from ...remote3 import RemoteRepository3
+from ...repository3 import Repository3
 from .. import has_lchflags, is_utime_fully_supported, have_fuse_mtime_ns, st_mtime_ns_round, no_selinux
 from .. import changedir
 from .. import are_symlinks_supported, are_hardlinks_supported, are_fifos_supported
@@ -169,7 +169,7 @@ def create_src_archive(archiver, name, ts=None):
 
 
 def open_archive(repo_path, name):
-    repository = Repository(repo_path, exclusive=True)
+    repository = Repository3(repo_path, exclusive=True)
     with repository:
         manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
         archive = Archive(manifest, name)
@@ -178,9 +178,9 @@ def open_archive(repo_path, name):
 
 def open_repository(archiver):
     if archiver.get_kind() == "remote":
-        return RemoteRepository(Location(archiver.repository_location))
+        return RemoteRepository3(Location(archiver.repository_location))
     else:
-        return Repository(archiver.repository_path, exclusive=True)
+        return Repository3(archiver.repository_path, exclusive=True)
 
 
 def create_regular_file(input_path, name, size=0, contents=None):
@@ -256,17 +256,13 @@ def create_test_files(input_path, create_hardlinks=True):
 
 
 def _extract_repository_id(repo_path):
-    with Repository(repo_path) as repository:
+    with Repository3(repo_path) as repository:
         return repository.id
 
 
 def _set_repository_id(repo_path, id):
-    config = ConfigParser(interpolation=None)
-    config.read(os.path.join(repo_path, "config"))
-    config.set("repository", "id", bin_to_hex(id))
-    with open(os.path.join(repo_path, "config"), "w") as fd:
-        config.write(fd)
-    with Repository(repo_path) as repository:
+    with Repository3(repo_path) as repository:
+        repository._set_id(id)
         return repository.id
 
 

+ 0 - 130
src/borg/testsuite/archiver/bypass_lock_option.py

@@ -1,130 +0,0 @@
-import pytest
-
-from ...constants import *  # NOQA
-from ...helpers import EXIT_ERROR
-from ...locking import LockFailed
-from ...remote import RemoteRepository
-from .. import llfuse
-from . import cmd, create_src_archive, RK_ENCRYPTION, read_only, fuse_mount
-
-
-def test_readonly_check(archiver):
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    create_src_archive(archiver, "test")
-
-    with read_only(archiver.repository_path):
-        # verify that command normally doesn't work with read-only repo
-        if archiver.FORK_DEFAULT:
-            cmd(archiver, "check", "--verify-data", exit_code=EXIT_ERROR)
-        else:
-            with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
-                cmd(archiver, "check", "--verify-data")
-            if isinstance(excinfo.value, RemoteRepository.RPCError):
-                assert excinfo.value.exception_class == "LockFailed"
-        # verify that command works with read-only repo when using --bypass-lock
-        cmd(archiver, "check", "--verify-data", "--bypass-lock")
-
-
-def test_readonly_diff(archiver):
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    create_src_archive(archiver, "a")
-    create_src_archive(archiver, "b")
-
-    with read_only(archiver.repository_path):
-        # verify that command normally doesn't work with read-only repo
-        if archiver.FORK_DEFAULT:
-            cmd(archiver, "diff", "a", "b", exit_code=EXIT_ERROR)
-        else:
-            with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
-                cmd(archiver, "diff", "a", "b")
-            if isinstance(excinfo.value, RemoteRepository.RPCError):
-                assert excinfo.value.exception_class == "LockFailed"
-        # verify that command works with read-only repo when using --bypass-lock
-        cmd(archiver, "diff", "a", "b", "--bypass-lock")
-
-
-def test_readonly_export_tar(archiver):
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    create_src_archive(archiver, "test")
-
-    with read_only(archiver.repository_path):
-        # verify that command normally doesn't work with read-only repo
-        if archiver.FORK_DEFAULT:
-            cmd(archiver, "export-tar", "test", "test.tar", exit_code=EXIT_ERROR)
-        else:
-            with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
-                cmd(archiver, "export-tar", "test", "test.tar")
-            if isinstance(excinfo.value, RemoteRepository.RPCError):
-                assert excinfo.value.exception_class == "LockFailed"
-        # verify that command works with read-only repo when using --bypass-lock
-        cmd(archiver, "export-tar", "test", "test.tar", "--bypass-lock")
-
-
-def test_readonly_extract(archiver):
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    create_src_archive(archiver, "test")
-
-    with read_only(archiver.repository_path):
-        # verify that command normally doesn't work with read-only repo
-        if archiver.FORK_DEFAULT:
-            cmd(archiver, "extract", "test", exit_code=EXIT_ERROR)
-        else:
-            with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
-                cmd(archiver, "extract", "test")
-            if isinstance(excinfo.value, RemoteRepository.RPCError):
-                assert excinfo.value.exception_class == "LockFailed"
-        # verify that command works with read-only repo when using --bypass-lock
-        cmd(archiver, "extract", "test", "--bypass-lock")
-
-
-def test_readonly_info(archiver):
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    create_src_archive(archiver, "test")
-    with read_only(archiver.repository_path):
-        # verify that command normally doesn't work with read-only repo
-        if archiver.FORK_DEFAULT:
-            cmd(archiver, "rinfo", exit_code=EXIT_ERROR)
-        else:
-            with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
-                cmd(archiver, "rinfo")
-            if isinstance(excinfo.value, RemoteRepository.RPCError):
-                assert excinfo.value.exception_class == "LockFailed"
-        # verify that command works with read-only repo when using --bypass-lock
-        cmd(archiver, "rinfo", "--bypass-lock")
-
-
-def test_readonly_list(archiver):
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    create_src_archive(archiver, "test")
-    with read_only(archiver.repository_path):
-        # verify that command normally doesn't work with read-only repo
-        if archiver.FORK_DEFAULT:
-            cmd(archiver, "rlist", exit_code=EXIT_ERROR)
-        else:
-            with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
-                cmd(archiver, "rlist")
-            if isinstance(excinfo.value, RemoteRepository.RPCError):
-                assert excinfo.value.exception_class == "LockFailed"
-        # verify that command works with read-only repo when using --bypass-lock
-        cmd(archiver, "rlist", "--bypass-lock")
-
-
-@pytest.mark.skipif(not llfuse, reason="llfuse not installed")
-def test_readonly_mount(archiver):
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    create_src_archive(archiver, "test")
-    with read_only(archiver.repository_path):
-        # verify that command normally doesn't work with read-only repo
-        if archiver.FORK_DEFAULT:
-            with fuse_mount(archiver, exit_code=EXIT_ERROR):
-                pass
-        else:
-            with pytest.raises((LockFailed, RemoteRepository.RPCError)) as excinfo:
-                # self.fuse_mount always assumes fork=True, so for this test we have to set fork=False manually
-                with fuse_mount(archiver, fork=False):
-                    pass
-            if isinstance(excinfo.value, RemoteRepository.RPCError):
-                assert excinfo.value.exception_class == "LockFailed"
-        # verify that command works with read-only repo when using --bypass-lock
-        with fuse_mount(archiver, None, "--bypass-lock"):
-            pass

+ 3 - 5
src/borg/testsuite/archiver/check_cmd.py

@@ -8,7 +8,7 @@ from ...archive import ChunkBuffer
 from ...constants import *  # NOQA
 from ...helpers import bin_to_hex, msgpack
 from ...manifest import Manifest
-from ...repository import Repository
+from ...repository3 import Repository3
 from . import cmd, src_file, create_src_archive, open_archive, generate_archiver_tests, RK_ENCRYPTION
 
 pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary")  # NOQA
@@ -28,12 +28,10 @@ def test_check_usage(archivers, request):
     output = cmd(archiver, "check", "-v", "--progress", exit_code=0)
     assert "Starting repository check" in output
     assert "Starting archive consistency check" in output
-    assert "Checking segments" in output
 
     output = cmd(archiver, "check", "-v", "--repository-only", exit_code=0)
     assert "Starting repository check" in output
     assert "Starting archive consistency check" not in output
-    assert "Checking segments" not in output
 
     output = cmd(archiver, "check", "-v", "--archives-only", exit_code=0)
     assert "Starting repository check" not in output
@@ -348,7 +346,7 @@ def test_extra_chunks(archivers, request):
         pytest.skip("only works locally")
     check_cmd_setup(archiver)
     cmd(archiver, "check", exit_code=0)
-    with Repository(archiver.repository_location, exclusive=True) as repository:
+    with Repository3(archiver.repository_location, exclusive=True) as repository:
         repository.put(b"01234567890123456789012345678901", b"xxxx")
         repository.commit(compact=False)
     output = cmd(archiver, "check", "-v", exit_code=0)  # orphans are not considered warnings anymore
@@ -391,7 +389,7 @@ def test_empty_repository(archivers, request):
     if archiver.get_kind() == "remote":
         pytest.skip("only works locally")
     check_cmd_setup(archiver)
-    with Repository(archiver.repository_location, exclusive=True) as repository:
+    with Repository3(archiver.repository_location, exclusive=True) as repository:
         for id_ in repository.list():
             repository.delete(id_)
         repository.commit(compact=False)

+ 12 - 12
src/borg/testsuite/archiver/checks.py

@@ -9,8 +9,8 @@ from ...constants import *  # NOQA
 from ...helpers import Location, get_security_dir, bin_to_hex
 from ...helpers import EXIT_ERROR
 from ...manifest import Manifest, MandatoryFeatureUnsupported
-from ...remote import RemoteRepository, PathNotAllowed
-from ...repository import Repository
+from ...remote3 import RemoteRepository3, PathNotAllowed
+from ...repository3 import Repository3
 from .. import llfuse
 from .. import changedir
 from . import cmd, _extract_repository_id, open_repository, check_cache, create_test_files
@@ -25,7 +25,7 @@ def get_security_directory(repo_path):
 
 
 def add_unknown_feature(repo_path, operation):
-    with Repository(repo_path, exclusive=True) as repository:
+    with Repository3(repo_path, exclusive=True) as repository:
         manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
         manifest.config["feature_flags"] = {operation.value: {"mandatory": ["unknown-feature"]}}
         manifest.write()
@@ -272,7 +272,7 @@ def test_unknown_mandatory_feature_in_cache(archivers, request):
     remote_repo = archiver.get_kind() == "remote"
     print(cmd(archiver, "rcreate", RK_ENCRYPTION))
 
-    with Repository(archiver.repository_path, exclusive=True) as repository:
+    with Repository3(archiver.repository_path, exclusive=True) as repository:
         if remote_repo:
             repository._location = Location(archiver.repository_location)
         manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
@@ -299,7 +299,7 @@ def test_unknown_mandatory_feature_in_cache(archivers, request):
         if is_localcache:
             assert called
 
-    with Repository(archiver.repository_path, exclusive=True) as repository:
+    with Repository3(archiver.repository_path, exclusive=True) as repository:
         if remote_repo:
             repository._location = Location(archiver.repository_location)
         manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
@@ -346,26 +346,26 @@ def test_env_use_chunks_archive(archivers, request, monkeypatch):
 def test_remote_repo_restrict_to_path(remote_archiver):
     original_location, repo_path = remote_archiver.repository_location, remote_archiver.repository_path
     # restricted to repo directory itself:
-    with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]):
+    with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-path", repo_path]):
         cmd(remote_archiver, "rcreate", RK_ENCRYPTION)
     # restricted to repo directory itself, fail for other directories with same prefix:
-    with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", repo_path]):
+    with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-path", repo_path]):
         with pytest.raises(PathNotAllowed):
             remote_archiver.repository_location = original_location + "_0"
             cmd(remote_archiver, "rcreate", RK_ENCRYPTION)
     # restricted to a completely different path:
-    with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo"]):
+    with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-path", "/foo"]):
         with pytest.raises(PathNotAllowed):
             remote_archiver.repository_location = original_location + "_1"
             cmd(remote_archiver, "rcreate", RK_ENCRYPTION)
     path_prefix = os.path.dirname(repo_path)
     # restrict to repo directory's parent directory:
-    with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-path", path_prefix]):
+    with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-path", path_prefix]):
         remote_archiver.repository_location = original_location + "_2"
         cmd(remote_archiver, "rcreate", RK_ENCRYPTION)
     # restrict to repo directory's parent directory and another directory:
     with patch.object(
-        RemoteRepository, "extra_test_args", ["--restrict-to-path", "/foo", "--restrict-to-path", path_prefix]
+        RemoteRepository3, "extra_test_args", ["--restrict-to-path", "/foo", "--restrict-to-path", path_prefix]
     ):
         remote_archiver.repository_location = original_location + "_3"
         cmd(remote_archiver, "rcreate", RK_ENCRYPTION)
@@ -374,10 +374,10 @@ def test_remote_repo_restrict_to_path(remote_archiver):
 def test_remote_repo_restrict_to_repository(remote_archiver):
     repo_path = remote_archiver.repository_path
     # restricted to repo directory itself:
-    with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", repo_path]):
+    with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-repository", repo_path]):
         cmd(remote_archiver, "rcreate", RK_ENCRYPTION)
     parent_path = os.path.join(repo_path, "..")
-    with patch.object(RemoteRepository, "extra_test_args", ["--restrict-to-repository", parent_path]):
+    with patch.object(RemoteRepository3, "extra_test_args", ["--restrict-to-repository", parent_path]):
         with pytest.raises(PathNotAllowed):
             cmd(remote_archiver, "rcreate", RK_ENCRYPTION)
 

+ 0 - 64
src/borg/testsuite/archiver/config_cmd.py

@@ -1,64 +0,0 @@
-import os
-import pytest
-
-from ...constants import *  # NOQA
-from . import RK_ENCRYPTION, create_test_files, cmd, generate_archiver_tests
-from ...helpers import CommandError, Error
-
-pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,binary")  # NOQA
-
-
-def test_config(archivers, request):
-    archiver = request.getfixturevalue(archivers)
-    create_test_files(archiver.input_path)
-    os.unlink("input/flagfile")
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    output = cmd(archiver, "config", "--list")
-    assert "[repository]" in output
-    assert "version" in output
-    assert "segments_per_dir" in output
-    assert "storage_quota" in output
-    assert "append_only" in output
-    assert "additional_free_space" in output
-    assert "id" in output
-    assert "last_segment_checked" not in output
-
-    if archiver.FORK_DEFAULT:
-        output = cmd(archiver, "config", "last_segment_checked", exit_code=2)
-        assert "No option " in output
-    else:
-        with pytest.raises(Error):
-            cmd(archiver, "config", "last_segment_checked")
-
-    cmd(archiver, "config", "last_segment_checked", "123")
-    output = cmd(archiver, "config", "last_segment_checked")
-    assert output == "123" + os.linesep
-    output = cmd(archiver, "config", "--list")
-    assert "last_segment_checked" in output
-    cmd(archiver, "config", "--delete", "last_segment_checked")
-
-    for cfg_key, cfg_value in [("additional_free_space", "2G"), ("repository.append_only", "1")]:
-        output = cmd(archiver, "config", cfg_key)
-        assert output == "0" + os.linesep
-        cmd(archiver, "config", cfg_key, cfg_value)
-        output = cmd(archiver, "config", cfg_key)
-        assert output == cfg_value + os.linesep
-        cmd(archiver, "config", "--delete", cfg_key)
-        if archiver.FORK_DEFAULT:
-            cmd(archiver, "config", cfg_key, exit_code=2)
-        else:
-            with pytest.raises(Error):
-                cmd(archiver, "config", cfg_key)
-
-    cmd(archiver, "config", "--list", "--delete", exit_code=2)
-    if archiver.FORK_DEFAULT:
-        expected_ec = CommandError().exit_code
-        cmd(archiver, "config", exit_code=expected_ec)
-    else:
-        with pytest.raises(CommandError):
-            cmd(archiver, "config")
-    if archiver.FORK_DEFAULT:
-        cmd(archiver, "config", "invalid-option", exit_code=2)
-    else:
-        with pytest.raises(Error):
-            cmd(archiver, "config", "invalid-option")

+ 0 - 18
src/borg/testsuite/archiver/corruption.py

@@ -13,24 +13,6 @@ from ...hashindex import ChunkIndex
 from ...cache import LocalCache
 
 
-def test_check_corrupted_repository(archiver):
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    create_src_archive(archiver, "test")
-    cmd(archiver, "extract", "test", "--dry-run")
-    cmd(archiver, "check")
-
-    name = sorted(os.listdir(os.path.join(archiver.tmpdir, "repository", "data", "0")), reverse=True)[1]
-    with open(os.path.join(archiver.tmpdir, "repository", "data", "0", name), "r+b") as fd:
-        fd.seek(100)
-        fd.write(b"XXXX")
-
-    if archiver.FORK_DEFAULT:
-        cmd(archiver, "check", exit_code=1)
-    else:
-        with pytest.raises(Error):
-            cmd(archiver, "check")
-
-
 def corrupt_archiver(archiver):
     create_test_files(archiver.input_path)
     cmd(archiver, "rcreate", RK_ENCRYPTION)

+ 2 - 2
src/borg/testsuite/archiver/create_cmd.py

@@ -16,7 +16,7 @@ from ...cache import get_cache_impl
 from ...constants import *  # NOQA
 from ...manifest import Manifest
 from ...platform import is_cygwin, is_win32, is_darwin
-from ...repository import Repository
+from ...repository3 import Repository3
 from ...helpers import CommandError, BackupPermissionError
 from .. import has_lchflags
 from .. import changedir
@@ -668,7 +668,7 @@ def test_create_dry_run(archivers, request):
     cmd(archiver, "rcreate", RK_ENCRYPTION)
     cmd(archiver, "create", "--dry-run", "test", "input")
     # Make sure no archive has been created
-    with Repository(archiver.repository_path) as repository:
+    with Repository3(archiver.repository_path) as repository:
         manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
     assert len(manifest.archives) == 0
 

+ 3 - 3
src/borg/testsuite/archiver/delete_cmd.py

@@ -1,7 +1,7 @@
 from ...archive import Archive
 from ...constants import *  # NOQA
 from ...manifest import Manifest
-from ...repository import Repository
+from ...repository3 import Repository3
 from . import cmd, create_regular_file, src_file, create_src_archive, generate_archiver_tests, RK_ENCRYPTION
 
 pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary")  # NOQA
@@ -47,7 +47,7 @@ def test_delete_force(archivers, request):
     archiver = request.getfixturevalue(archivers)
     cmd(archiver, "rcreate", "--encryption=none")
     create_src_archive(archiver, "test")
-    with Repository(archiver.repository_path, exclusive=True) as repository:
+    with Repository3(archiver.repository_path, exclusive=True) as repository:
         manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
         archive = Archive(manifest, "test")
         for item in archive.iter_items():
@@ -69,7 +69,7 @@ def test_delete_double_force(archivers, request):
     archiver = request.getfixturevalue(archivers)
     cmd(archiver, "rcreate", "--encryption=none")
     create_src_archive(archiver, "test")
-    with Repository(archiver.repository_path, exclusive=True) as repository:
+    with Repository3(archiver.repository_path, exclusive=True) as repository:
         manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
         archive = Archive(manifest, "test")
         id = archive.metadata.items[0]

+ 7 - 7
src/borg/testsuite/archiver/key_cmds.py

@@ -9,7 +9,7 @@ from ...crypto.keymanager import RepoIdMismatch, NotABorgKeyFile
 from ...helpers import CommandError
 from ...helpers import bin_to_hex, hex_to_bin
 from ...helpers import msgpack
-from ...repository import Repository
+from ...repository3 import Repository3
 from .. import key
 from . import RK_ENCRYPTION, KF_ENCRYPTION, cmd, _extract_repository_id, _set_repository_id, generate_archiver_tests
 
@@ -129,7 +129,7 @@ def test_key_export_repokey(archivers, request):
 
     assert export_contents.startswith("BORG_KEY " + bin_to_hex(repo_id) + "\n")
 
-    with Repository(archiver.repository_path) as repository:
+    with Repository3(archiver.repository_path) as repository:
         repo_key = AESOCBRepoKey(repository)
         repo_key.load(None, Passphrase.env_passphrase())
 
@@ -138,12 +138,12 @@ def test_key_export_repokey(archivers, request):
 
     assert repo_key.crypt_key == backup_key.crypt_key
 
-    with Repository(archiver.repository_path) as repository:
+    with Repository3(archiver.repository_path) as repository:
         repository.save_key(b"")
 
     cmd(archiver, "key", "import", export_file)
 
-    with Repository(archiver.repository_path) as repository:
+    with Repository3(archiver.repository_path) as repository:
         repo_key2 = AESOCBRepoKey(repository)
         repo_key2.load(None, Passphrase.env_passphrase())
 
@@ -302,7 +302,7 @@ def test_init_defaults_to_argon2(archivers, request):
     """https://github.com/borgbackup/borg/issues/747#issuecomment-1076160401"""
     archiver = request.getfixturevalue(archivers)
     cmd(archiver, "rcreate", RK_ENCRYPTION)
-    with Repository(archiver.repository_path) as repository:
+    with Repository3(archiver.repository_path) as repository:
         key = msgpack.unpackb(binascii.a2b_base64(repository.load_key()))
         assert key["algorithm"] == "argon2 chacha20-poly1305"
 
@@ -313,7 +313,7 @@ def test_change_passphrase_does_not_change_algorithm_argon2(archivers, request):
     os.environ["BORG_NEW_PASSPHRASE"] = "newpassphrase"
     cmd(archiver, "key", "change-passphrase")
 
-    with Repository(archiver.repository_path) as repository:
+    with Repository3(archiver.repository_path) as repository:
         key = msgpack.unpackb(binascii.a2b_base64(repository.load_key()))
         assert key["algorithm"] == "argon2 chacha20-poly1305"
 
@@ -323,6 +323,6 @@ def test_change_location_does_not_change_algorithm_argon2(archivers, request):
     cmd(archiver, "rcreate", KF_ENCRYPTION)
     cmd(archiver, "key", "change-location", "repokey")
 
-    with Repository(archiver.repository_path) as repository:
+    with Repository3(archiver.repository_path) as repository:
         key = msgpack.unpackb(binascii.a2b_base64(repository.load_key()))
         assert key["algorithm"] == "argon2 chacha20-poly1305"

+ 2 - 2
src/borg/testsuite/archiver/rcompress_cmd.py

@@ -1,7 +1,7 @@
 import os
 
 from ...constants import *  # NOQA
-from ...repository import Repository
+from ...repository3 import Repository3
 from ...manifest import Manifest
 from ...compress import ZSTD, ZLIB, LZ4, CNONE
 from ...helpers import bin_to_hex
@@ -12,7 +12,7 @@ from . import create_regular_file, cmd, RK_ENCRYPTION
 def test_rcompress(archiver):
     def check_compression(ctype, clevel, olevel):
         """check if all the chunks in the repo are compressed/obfuscated like expected"""
-        repository = Repository(archiver.repository_path, exclusive=True)
+        repository = Repository3(archiver.repository_path, exclusive=True)
         with repository:
             manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
             state = None

+ 0 - 29
src/borg/testsuite/archiver/rcreate_cmd.py

@@ -6,28 +6,11 @@ import pytest
 from ...helpers.errors import Error, CancelledByUser
 from ...constants import *  # NOQA
 from ...crypto.key import FlexiKey
-from ...repository import Repository
 from . import cmd, generate_archiver_tests, RK_ENCRYPTION, KF_ENCRYPTION
 
 pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary")  # NOQA
 
 
-def test_rcreate_parent_dirs(archivers, request):
-    archiver = request.getfixturevalue(archivers)
-    if archiver.EXE:
-        pytest.skip("does not raise Exception, but sets rc==2")
-    remote_repo = archiver.get_kind() == "remote"
-    parent_path = os.path.join(archiver.tmpdir, "parent1", "parent2")
-    repository_path = os.path.join(parent_path, "repository")
-    archiver.repository_location = ("ssh://__testsuite__" + repository_path) if remote_repo else repository_path
-    with pytest.raises(Repository.ParentPathDoesNotExist):
-        # normal borg rcreate does NOT create missing parent dirs
-        cmd(archiver, "rcreate", "--encryption=none")
-    # but if told so, it does:
-    cmd(archiver, "rcreate", "--encryption=none", "--make-parent-dirs")
-    assert os.path.exists(parent_path)
-
-
 def test_rcreate_interrupt(archivers, request):
     archiver = request.getfixturevalue(archivers)
     if archiver.EXE:
@@ -51,18 +34,6 @@ def test_rcreate_requires_encryption_option(archivers, request):
     cmd(archiver, "rcreate", exit_code=2)
 
 
-def test_rcreate_nested_repositories(archivers, request):
-    archiver = request.getfixturevalue(archivers)
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    archiver.repository_location += "/nested"
-    if archiver.FORK_DEFAULT:
-        expected_ec = Repository.AlreadyExists().exit_code
-        cmd(archiver, "rcreate", RK_ENCRYPTION, exit_code=expected_ec)
-    else:
-        with pytest.raises(Repository.AlreadyExists):
-            cmd(archiver, "rcreate", RK_ENCRYPTION)
-
-
 def test_rcreate_refuse_to_overwrite_keyfile(archivers, request, monkeypatch):
     #  BORG_KEY_FILE=something borg rcreate should quit if "something" already exists.
     #  See: https://github.com/borgbackup/borg/pull/6046

+ 2 - 2
src/borg/testsuite/archiver/rename_cmd.py

@@ -1,6 +1,6 @@
 from ...constants import *  # NOQA
 from ...manifest import Manifest
-from ...repository import Repository
+from ...repository3 import Repository3
 from . import cmd, create_regular_file, generate_archiver_tests, RK_ENCRYPTION
 
 pytest_generate_tests = lambda metafunc: generate_archiver_tests(metafunc, kinds="local,remote,binary")  # NOQA
@@ -21,7 +21,7 @@ def test_rename(archivers, request):
     cmd(archiver, "extract", "test.3", "--dry-run")
     cmd(archiver, "extract", "test.4", "--dry-run")
     # Make sure both archives have been renamed
-    with Repository(archiver.repository_path) as repository:
+    with Repository3(archiver.repository_path) as repository:
         manifest = Manifest.load(repository, Manifest.NO_OPERATION_CHECK)
     assert len(manifest.archives) == 2
     assert "test.3" in manifest.archives

+ 1 - 1
src/borg/testsuite/archiver/return_codes.py

@@ -5,7 +5,7 @@ from . import cmd_fixture, changedir  # NOQA
 
 
 def test_return_codes(cmd_fixture, tmpdir):
-    repo = tmpdir.mkdir("repo")
+    repo = tmpdir / "repo"  # borg creates the directory
     input = tmpdir.mkdir("input")
     output = tmpdir.mkdir("output")
     input.join("test_file").write("content")

+ 0 - 18
src/borg/testsuite/archiver/rinfo_cmd.py

@@ -35,21 +35,3 @@ def test_info_json(archivers, request):
     stats = cache["stats"]
     assert all(isinstance(o, int) for o in stats.values())
     assert all(key in stats for key in ("total_chunks", "total_size", "total_unique_chunks", "unique_size"))
-
-
-def test_info_on_repository_with_storage_quota(archivers, request):
-    archiver = request.getfixturevalue(archivers)
-    create_regular_file(archiver.input_path, "file1", contents=randbytes(1000 * 1000))
-    cmd(archiver, "rcreate", RK_ENCRYPTION, "--storage-quota=1G")
-    cmd(archiver, "create", "test", "input")
-    info_repo = cmd(archiver, "rinfo")
-    assert "Storage quota: 1.00 MB used out of 1.00 GB" in info_repo
-
-
-def test_info_on_repository_without_storage_quota(archivers, request):
-    archiver = request.getfixturevalue(archivers)
-    create_regular_file(archiver.input_path, "file1", contents=randbytes(1000 * 1000))
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    cmd(archiver, "create", "test", "input")
-    info_repo = cmd(archiver, "rinfo")
-    assert "Storage quota: 1.00 MB used" in info_repo

+ 3 - 3
src/borg/testsuite/cache.py

@@ -12,7 +12,7 @@ from ..cache import AdHocCache
 from ..crypto.key import AESOCBRepoKey
 from ..hashindex import ChunkIndex, CacheSynchronizer
 from ..manifest import Manifest
-from ..repository import Repository
+from ..repository3 import Repository3
 
 
 class TestCacheSynchronizer:
@@ -164,7 +164,7 @@ class TestAdHocCache:
     @pytest.fixture
     def repository(self, tmpdir):
         self.repository_location = os.path.join(str(tmpdir), "repository")
-        with Repository(self.repository_location, exclusive=True, create=True) as repository:
+        with Repository3(self.repository_location, exclusive=True, create=True) as repository:
             repository.put(H(1), b"1234")
             repository.put(Manifest.MANIFEST_ID, b"5678")
             yield repository
@@ -201,7 +201,7 @@ class TestAdHocCache:
         assert cache.seen_chunk(H(5)) == 1
         cache.chunk_decref(H(5), 1, Statistics())
         assert not cache.seen_chunk(H(5))
-        with pytest.raises(Repository.ObjectNotFound):
+        with pytest.raises(Repository3.ObjectNotFound):
             repository.get(H(5))
 
     def test_files_cache(self, cache):

+ 2 - 2
src/borg/testsuite/repoobj.py

@@ -3,14 +3,14 @@ import pytest
 from ..constants import ROBJ_FILE_STREAM, ROBJ_MANIFEST, ROBJ_ARCHIVE_META
 from ..crypto.key import PlaintextKey
 from ..helpers.errors import IntegrityError
-from ..repository import Repository
+from ..repository3 import Repository3
 from ..repoobj import RepoObj, RepoObj1
 from ..compress import LZ4
 
 
 @pytest.fixture
 def repository(tmpdir):
-    return Repository(tmpdir, create=True)
+    return Repository3(tmpdir, create=True)
 
 
 @pytest.fixture

+ 290 - 0
src/borg/testsuite/repository3.py

@@ -0,0 +1,290 @@
+import logging
+import os
+import sys
+from typing import Optional
+
+import pytest
+
+from ..helpers import Location
+from ..helpers import IntegrityError
+from ..platformflags import is_win32
+from ..remote3 import RemoteRepository3, InvalidRPCMethod, PathNotAllowed
+from ..repository3 import Repository3, MAX_DATA_SIZE
+from ..repoobj import RepoObj
+from .hashindex import H
+
+
+@pytest.fixture()
+def repository(tmp_path):
+    repository_location = os.fspath(tmp_path / "repository")
+    yield Repository3(repository_location, exclusive=True, create=True)
+
+
+@pytest.fixture()
+def remote_repository(tmp_path):
+    if is_win32:
+        pytest.skip("Remote repository does not yet work on Windows.")
+    repository_location = Location("ssh://__testsuite__" + os.fspath(tmp_path / "repository"))
+    yield RemoteRepository3(repository_location, exclusive=True, create=True)
+
+
+def pytest_generate_tests(metafunc):
+    # Generates tests that run on both local and remote repos
+    if "repo_fixtures" in metafunc.fixturenames:
+        metafunc.parametrize("repo_fixtures", ["repository", "remote_repository"])
+
+
+def get_repository_from_fixture(repo_fixtures, request):
+    # returns the repo object from the fixture for tests that run on both local and remote repos
+    return request.getfixturevalue(repo_fixtures)
+
+
+def reopen(repository, exclusive: Optional[bool] = True, create=False):
+    if isinstance(repository, Repository3):
+        if repository.opened:
+            raise RuntimeError("Repo must be closed before a reopen. Cannot support nested repository contexts.")
+        return Repository3(repository.path, exclusive=exclusive, create=create)
+
+    if isinstance(repository, RemoteRepository3):
+        if repository.p is not None or repository.sock is not None:
+            raise RuntimeError("Remote repo must be closed before a reopen. Cannot support nested repository contexts.")
+        return RemoteRepository3(repository.location, exclusive=exclusive, create=create)
+
+    raise TypeError(
+        f"Invalid argument type. Expected 'Repository3' or 'RemoteRepository3', received '{type(repository).__name__}'."
+    )
+
+
+def fchunk(data, meta=b""):
+    # format chunk: create a raw chunk that has valid RepoObj layout, but does not use encryption or compression.
+    meta_len = RepoObj.meta_len_hdr.pack(len(meta))
+    assert isinstance(data, bytes)
+    chunk = meta_len + meta + data
+    return chunk
+
+
+def pchunk(chunk):
+    # parse chunk: parse data and meta from a raw chunk made by fchunk
+    meta_len_size = RepoObj.meta_len_hdr.size
+    meta_len = chunk[:meta_len_size]
+    meta_len = RepoObj.meta_len_hdr.unpack(meta_len)[0]
+    meta = chunk[meta_len_size : meta_len_size + meta_len]
+    data = chunk[meta_len_size + meta_len :]
+    return data, meta
+
+
+def pdchunk(chunk):
+    # parse only data from a raw chunk made by fchunk
+    return pchunk(chunk)[0]
+
+
+def test_basic_operations(repo_fixtures, request):
+    with get_repository_from_fixture(repo_fixtures, request) as repository:
+        for x in range(100):
+            repository.put(H(x), fchunk(b"SOMEDATA"))
+        key50 = H(50)
+        assert pdchunk(repository.get(key50)) == b"SOMEDATA"
+        repository.delete(key50)
+        with pytest.raises(Repository3.ObjectNotFound):
+            repository.get(key50)
+    with reopen(repository) as repository:
+        with pytest.raises(Repository3.ObjectNotFound):
+            repository.get(key50)
+        for x in range(100):
+            if x == 50:
+                continue
+            assert pdchunk(repository.get(H(x))) == b"SOMEDATA"
+
+
+def test_read_data(repo_fixtures, request):
+    with get_repository_from_fixture(repo_fixtures, request) as repository:
+        meta, data = b"meta", b"data"
+        meta_len = RepoObj.meta_len_hdr.pack(len(meta))
+        chunk_complete = meta_len + meta + data
+        chunk_short = meta_len + meta
+        repository.put(H(0), chunk_complete)
+        assert repository.get(H(0)) == chunk_complete
+        assert repository.get(H(0), read_data=True) == chunk_complete
+        assert repository.get(H(0), read_data=False) == chunk_short
+
+
+def test_consistency(repo_fixtures, request):
+    with get_repository_from_fixture(repo_fixtures, request) as repository:
+        repository.put(H(0), fchunk(b"foo"))
+        assert pdchunk(repository.get(H(0))) == b"foo"
+        repository.put(H(0), fchunk(b"foo2"))
+        assert pdchunk(repository.get(H(0))) == b"foo2"
+        repository.put(H(0), fchunk(b"bar"))
+        assert pdchunk(repository.get(H(0))) == b"bar"
+        repository.delete(H(0))
+        with pytest.raises(Repository3.ObjectNotFound):
+            repository.get(H(0))
+
+
+def test_list(repo_fixtures, request):
+    with get_repository_from_fixture(repo_fixtures, request) as repository:
+        for x in range(100):
+            repository.put(H(x), fchunk(b"SOMEDATA"))
+        repo_list = repository.list()
+        assert len(repo_list) == 100
+        first_half = repository.list(limit=50)
+        assert len(first_half) == 50
+        assert first_half == repo_list[:50]
+        second_half = repository.list(marker=first_half[-1])
+        assert len(second_half) == 50
+        assert second_half == repo_list[50:]
+        assert len(repository.list(limit=50)) == 50
+
+
+def test_scan(repo_fixtures, request):
+    with get_repository_from_fixture(repo_fixtures, request) as repository:
+        for x in range(100):
+            repository.put(H(x), fchunk(b"SOMEDATA"))
+        ids, _ = repository.scan()
+        assert len(ids) == 100
+        first_half, state = repository.scan(limit=50)
+        assert len(first_half) == 50
+        assert first_half == ids[:50]
+        second_half, _ = repository.scan(state=state)
+        assert len(second_half) == 50
+        assert second_half == ids[50:]
+
+
+def test_max_data_size(repo_fixtures, request):
+    with get_repository_from_fixture(repo_fixtures, request) as repository:
+        max_data = b"x" * (MAX_DATA_SIZE - RepoObj.meta_len_hdr.size)
+        repository.put(H(0), fchunk(max_data))
+        assert pdchunk(repository.get(H(0))) == max_data
+        with pytest.raises(IntegrityError):
+            repository.put(H(1), fchunk(max_data + b"x"))
+
+
+def check(repository, repo_path, repair=False, status=True):
+    assert repository.check(repair=repair) == status
+    # Make sure no tmp files are left behind
+    tmp_files = [name for name in os.listdir(repo_path) if "tmp" in name]
+    assert tmp_files == [], "Found tmp files"
+
+
+def _get_mock_args():
+    class MockArgs:
+        remote_path = "borg"
+        umask = 0o077
+        debug_topics = []
+        rsh = None
+
+        def __contains__(self, item):
+            # to behave like argparse.Namespace
+            return hasattr(self, item)
+
+    return MockArgs()
+
+
+def test_remote_invalid_rpc(remote_repository):
+    with remote_repository:
+        with pytest.raises(InvalidRPCMethod):
+            remote_repository.call("__init__", {})
+
+
+def test_remote_rpc_exception_transport(remote_repository):
+    with remote_repository:
+        s1 = "test string"
+
+        try:
+            remote_repository.call("inject_exception", {"kind": "DoesNotExist"})
+        except Repository3.DoesNotExist as e:
+            assert len(e.args) == 1
+            assert e.args[0] == remote_repository.location.processed
+
+        try:
+            remote_repository.call("inject_exception", {"kind": "AlreadyExists"})
+        except Repository3.AlreadyExists as e:
+            assert len(e.args) == 1
+            assert e.args[0] == remote_repository.location.processed
+
+        try:
+            remote_repository.call("inject_exception", {"kind": "CheckNeeded"})
+        except Repository3.CheckNeeded as e:
+            assert len(e.args) == 1
+            assert e.args[0] == remote_repository.location.processed
+
+        try:
+            remote_repository.call("inject_exception", {"kind": "IntegrityError"})
+        except IntegrityError as e:
+            assert len(e.args) == 1
+            assert e.args[0] == s1
+
+        try:
+            remote_repository.call("inject_exception", {"kind": "PathNotAllowed"})
+        except PathNotAllowed as e:
+            assert len(e.args) == 1
+            assert e.args[0] == "foo"
+
+        try:
+            remote_repository.call("inject_exception", {"kind": "ObjectNotFound"})
+        except Repository3.ObjectNotFound as e:
+            assert len(e.args) == 2
+            assert e.args[0] == s1
+            assert e.args[1] == remote_repository.location.processed
+
+        try:
+            remote_repository.call("inject_exception", {"kind": "InvalidRPCMethod"})
+        except InvalidRPCMethod as e:
+            assert len(e.args) == 1
+            assert e.args[0] == s1
+
+        try:
+            remote_repository.call("inject_exception", {"kind": "divide"})
+        except RemoteRepository3.RPCError as e:
+            assert e.unpacked
+            assert e.get_message() == "ZeroDivisionError: integer division or modulo by zero\n"
+            assert e.exception_class == "ZeroDivisionError"
+            assert len(e.exception_full) > 0
+
+
+def test_remote_ssh_cmd(remote_repository):
+    with remote_repository:
+        args = _get_mock_args()
+        remote_repository._args = args
+        assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "example.com"]
+        assert remote_repository.ssh_cmd(Location("ssh://user@example.com/foo")) == ["ssh", "user@example.com"]
+        assert remote_repository.ssh_cmd(Location("ssh://user@example.com:1234/foo")) == [
+            "ssh",
+            "-p",
+            "1234",
+            "user@example.com",
+        ]
+        os.environ["BORG_RSH"] = "ssh --foo"
+        assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "--foo", "example.com"]
+
+
+def test_remote_borg_cmd(remote_repository):
+    with remote_repository:
+        assert remote_repository.borg_cmd(None, testing=True) == [sys.executable, "-m", "borg", "serve"]
+        args = _get_mock_args()
+        # XXX without next line we get spurious test fails when using pytest-xdist, root cause unknown:
+        logging.getLogger().setLevel(logging.INFO)
+        # note: test logger is on info log level, so --info gets added automagically
+        assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"]
+        args.remote_path = "borg-0.28.2"
+        assert remote_repository.borg_cmd(args, testing=False) == ["borg-0.28.2", "serve", "--info"]
+        args.debug_topics = ["something_client_side", "repository_compaction"]
+        assert remote_repository.borg_cmd(args, testing=False) == [
+            "borg-0.28.2",
+            "serve",
+            "--info",
+            "--debug-topic=borg.debug.repository_compaction",
+        ]
+        args = _get_mock_args()
+        args.storage_quota = 0
+        assert remote_repository.borg_cmd(args, testing=False) == ["borg", "serve", "--info"]
+        args.storage_quota = 314159265
+        assert remote_repository.borg_cmd(args, testing=False) == [
+            "borg",
+            "serve",
+            "--info",
+            "--storage-quota=314159265",
+        ]
+        args.rsh = "ssh -i foo"
+        remote_repository._args = args
+        assert remote_repository.ssh_cmd(Location("ssh://example.com/foo")) == ["ssh", "-i", "foo", "example.com"]

+ 1 - 1
tox.ini

@@ -42,7 +42,7 @@ deps =
     pytest
     mypy
     pkgconfig
-commands = mypy
+commands = mypy --ignore-missing-imports
 
 [testenv:docs]
 changedir = docs