Browse Source

Merge branch '1.0-maint' into merge-1.0-maint

Thomas Waldmann 8 years ago
parent
commit
dbe862f3d8

+ 3 - 3
Vagrantfile

@@ -61,9 +61,9 @@ def packages_darwin
     # install all the (security and other) updates
     sudo softwareupdate --install --all
     # get osxfuse 3.x pre-release code from github:
-    curl -s -L https://github.com/osxfuse/osxfuse/releases/download/osxfuse-3.3.3/osxfuse-3.3.3.dmg >osxfuse.dmg
+    curl -s -L https://github.com/osxfuse/osxfuse/releases/download/osxfuse-3.4.1/osxfuse-3.4.1.dmg >osxfuse.dmg
     MOUNTDIR=$(echo `hdiutil mount osxfuse.dmg | tail -1 | awk '{$1="" ; print $0}'` | xargs -0 echo) \
-    && sudo installer -pkg "${MOUNTDIR}/Extras/FUSE for OS X 3.3.3.pkg" -target /
+    && sudo installer -pkg "${MOUNTDIR}/Extras/FUSE for macOS 3.4.1.pkg" -target /
     sudo chown -R vagrant /usr/local  # brew must be able to create stuff here
     ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"
     brew update
@@ -72,7 +72,7 @@ def packages_darwin
     brew install xz  # required for python lzma module
     brew install fakeroot
     brew install git
-    brew install pkgconfig
+    brew install pkg-config
     touch ~vagrant/.bash_profile ; chown vagrant ~vagrant/.bash_profile
   EOF
 end

+ 64 - 0
docs/changes.rst

@@ -123,6 +123,70 @@ Other changes:
   - ChunkBuffer: add test for leaving partial chunk in buffer, fixes #945
 
 
+Version 1.0.7 (not released yet)
+--------------------------------
+
+Security fixes:
+
+- fix security issue with remote repository access, #1428
+
+
+Version 1.0.7rc1 (2016-08-05)
+-----------------------------
+
+Bug fixes:
+
+- fix repo lock deadlocks (related to lock upgrade), #1220
+- catch unpacker exceptions, resync, #1351
+- fix borg break-lock ignoring BORG_REPO env var, #1324
+- files cache performance fixes (fixes unneccessary re-reading/chunking/
+  hashing of unmodified files for some use cases):
+
+  - fix unintended file cache eviction, #1430
+  - implement BORG_FILES_CACHE_TTL, update FAQ, raise default TTL from 10
+    to 20, #1338
+- FUSE:
+
+  - cache partially read data chunks (performance), #965, #966
+  - always create a root dir, #1125
+- use an OrderedDict for helptext, making the build reproducible, #1346
+- RemoteRepository init: always call close on exceptions, #1370 (cosmetic)
+- ignore stdout/stderr broken pipe errors (cosmetic), #1116
+
+New features:
+
+- better borg versions management support (useful esp. for borg servers
+  wanting to offer multiple borg versions and for clients wanting to choose
+  a specific server borg version), #1392:
+
+  - add BORG_VERSION environment variable before executing "borg serve" via ssh
+  - add new placeholder {borgversion}
+  - substitute placeholders in --remote-path
+
+- borg init --append-only option (makes using the more secure append-only mode
+  more convenient. when used remotely, this requires 1.0.7+ also on the borg
+  server), #1291.
+
+Other changes:
+
+- Vagrantfile:
+
+  - darwin64: upgrade to FUSE for macOS 3.4.1 (aka osxfuse), #1378
+  - xenial64: use user "ubuntu", not "vagrant" (as usual), #1331
+- tests:
+
+  - fix fuse tests on OS X, #1433
+- docs:
+
+  - FAQ: add backup using stable filesystem names recommendation
+  - FAQ about glibc compatibility added, #491, glibc-check improved
+  - FAQ: 'A' unchanged file; remove ambiguous entry age sentence.
+  - OS X: install pkg-config to build with FUSE support, fixes #1400
+  - add notes about shell/sudo pitfalls with env. vars, #1380
+  - added platform feature matrix
+- implement borg debug-dump-repo-objs
+
+
 Version 1.0.6 (2016-07-12)
 --------------------------
 

+ 48 - 0
docs/faq.rst

@@ -341,6 +341,35 @@ those files are reported as being added when, really, chunks are
 already used.
 
 
+It always chunks all my files, even unchanged ones!
+---------------------------------------------------
+
+|project_name| maintains a files cache where it remembers the mtime, size and
+inode of files. When |project_name| does a new backup and starts processing a
+file, it first looks whether the file has changed (compared to the values
+stored in the files cache). If the values are the same, the file is assumed
+unchanged and thus its contents won't get chunked (again).
+
+|project_name| can't keep an infinite history of files of course, thus entries
+in the files cache have a "maximum time to live" which is set via the
+environment variable BORG_FILES_CACHE_TTL (and defaults to 20).
+Every time you do a backup (on the same machine, using the same user), the
+cache entries' ttl values of files that were not "seen" are incremented by 1
+and if they reach BORG_FILES_CACHE_TTL, the entry is removed from the cache.
+
+So, for example, if you do daily backups of 26 different data sets A, B,
+C, ..., Z on one machine (using the default TTL), the files from A will be
+already forgotten when you repeat the same backups on the next day and it
+will be slow because it would chunk all the files each time. If you set
+BORG_FILES_CACHE_TTL to at least 26 (or maybe even a small multiple of that),
+it would be much faster.
+
+Another possible reason is that files don't always have the same path, for
+example if you mount a filesystem without stable mount points for each backup.
+If the directory where you mount a filesystem is different every time,
+|project_name| assume they are different files.
+
+
 Is there a way to limit bandwidth with |project_name|?
 ------------------------------------------------------
 
@@ -386,6 +415,25 @@ If you can reproduce the issue with the proven filesystem, please file an
 issue in the |project_name| issue tracker about that.
 
 
+Requirements for the borg single-file binary, esp. (g)libc?
+-----------------------------------------------------------
+
+We try to build the binary on old, but still supported systems - to keep the
+minimum requirement for the (g)libc low. The (g)libc can't be bundled into
+the binary as it needs to fit your kernel and OS, but Python and all other
+required libraries will be bundled into the binary.
+
+If your system fulfills the minimum (g)libc requirement (see the README that
+is released with the binary), there should be no problem. If you are slightly
+below the required version, maybe just try. Due to the dynamic loading (or not
+loading) of some shared libraries, it might still work depending on what
+libraries are actually loaded and used.
+
+In the borg git repository, there is scripts/glibc_check.py that can determine
+(based on the symbols' versions they want to link to) whether a set of given
+(Linux) binaries works with a given glibc version.
+
+
 Why was Borg forked from Attic?
 -------------------------------
 

+ 1 - 0
docs/installation.rst

@@ -260,6 +260,7 @@ Assuming you have installed homebrew_, the following steps will install all the
 dependencies::
 
     brew install python3 lz4 openssl
+    brew install pkg-config                            # optional, for FUSE support
     pip3 install virtualenv
 
 For FUSE support to mount the backup archives, you need at least version 3.0 of

+ 27 - 0
docs/quickstart.rst

@@ -149,6 +149,33 @@ certain number of old archives:
     borg prune -v --prefix '{hostname}-' \
         --keep-daily=7 --keep-weekly=4 --keep-monthly=6
 
+Pitfalls with shell variables and environment variables
+-------------------------------------------------------
+
+This applies to all environment variables you want borg to see, not just
+``BORG_PASSPHRASE``. The short explanation is: always ``export`` your variable,
+and use single quotes if you're unsure of the details of your shell's expansion
+behavior. E.g.::
+
+    export BORG_PASSPHRASE='complicated & long'
+
+This is because ``export`` exposes variables to subprocesses, which borg may be
+one of. More on ``export`` can be found in the "ENVIRONMENT" section of the
+bash(1) man page.
+
+Beware of how ``sudo`` interacts with environment variables. For example, you
+may be surprised that the following ``export`` has no effect on your command::
+
+   export BORG_PASSPHRASE='complicated & long'
+   sudo ./yourborgwrapper.sh  # still prompts for password
+
+For more information, see sudo(8) man page. Hint: see ``env_keep`` in
+sudoers(5), or try ``sudo BORG_PASSPHRASE='yourphrase' borg`` syntax.
+
+.. Tip::
+    To debug what your borg process is actually seeing, find its PID
+    (``ps aux|grep borg``) and then look into ``/proc/<PID>/environ``.
+
 .. backup_compression:
 
 Backup compression

+ 3 - 0
docs/usage.rst

@@ -83,6 +83,9 @@ General:
     BORG_REMOTE_PATH
         When set, use the given path/filename as remote path (default is "borg").
         Using ``--remote-path PATH`` commandline option overrides the environment variable.
+    BORG_FILES_CACHE_TTL
+        When set to a numeric value, this determines the maximum "time to live" for the files cache
+        entries (default: 20). The files cache is used to quickly determine whether a file is unchanged.
     TMPDIR
         where temporary files are stored (might need a lot of temporary space for some operations)
 

+ 3 - 1
scripts/glibc_check.py

@@ -2,7 +2,9 @@
 """
 Check if all given binaries work with the given glibc version.
 
-check_glibc.py 2.11 bin [bin ...]
+glibc_check.py 2.11 bin [bin ...]
+
+rc = 0 means "yes", rc = 1 means "no".
 """
 
 import re

+ 20 - 6
src/borg/archive.py

@@ -889,6 +889,9 @@ def valid_msgpacked_dict(d, keys_serialized):
 class RobustUnpacker:
     """A restartable/robust version of the streaming msgpack unpacker
     """
+    class UnpackerCrashed(Exception):
+        """raise if unpacker crashed"""
+
     def __init__(self, validator, item_keys):
         super().__init__()
         self.item_keys = [msgpack.packb(name.encode()) for name in item_keys]
@@ -911,6 +914,14 @@ class RobustUnpacker:
         return self
 
     def __next__(self):
+        def unpack_next():
+            try:
+                return next(self._unpacker)
+            except (TypeError, ValueError) as err:
+                # transform exceptions that might be raised when feeding
+                # msgpack with invalid data to a more specific exception
+                raise self.UnpackerCrashed(str(err))
+
         if self._resync:
             data = b''.join(self._buffered_data)
             while self._resync:
@@ -923,17 +934,17 @@ class RobustUnpacker:
                 self._unpacker = msgpack.Unpacker(object_hook=StableDict)
                 self._unpacker.feed(data)
                 try:
-                    item = next(self._unpacker)
+                    item = unpack_next()
+                except (self.UnpackerCrashed, StopIteration):
+                    # as long as we are resyncing, we also ignore StopIteration
+                    pass
+                else:
                     if self.validator(item):
                         self._resync = False
                         return item
-                # Ignore exceptions that might be raised when feeding
-                # msgpack with invalid data
-                except (TypeError, ValueError, StopIteration):
-                    pass
                 data = data[1:]
         else:
-            return next(self._unpacker)
+            return unpack_next()
 
 
 class ArchiveChecker:
@@ -1192,6 +1203,9 @@ class ArchiveChecker:
                                 yield Item(internal_dict=item)
                             else:
                                 report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i)
+                    except RobustUnpacker.UnpackerCrashed as err:
+                        report('Unpacker crashed while unpacking item metadata, trying to resync...', chunk_id, i)
+                        unpacker.resync()
                     except Exception:
                         report('Exception while unpacking item metadata', chunk_id, i)
                         raise

+ 54 - 11
src/borg/archiver.py

@@ -80,8 +80,8 @@ def with_repository(fake=False, create=False, lock=True, exclusive=False, manife
             if argument(args, fake):
                 return method(self, args, repository=None, **kwargs)
             elif location.proto == 'ssh':
-                repository = RemoteRepository(location, create=create, lock_wait=self.lock_wait, lock=lock,
-                                              append_only=append_only, args=args)
+                repository = RemoteRepository(location, create=create, exclusive=argument(args, exclusive),
+                                              lock_wait=self.lock_wait, lock=lock, append_only=append_only, args=args)
             else:
                 repository = Repository(location.path, create=create, exclusive=argument(args, exclusive),
                                         lock_wait=self.lock_wait, lock=lock,
@@ -191,7 +191,7 @@ class Archiver:
             pass
         return self.exit_code
 
-    @with_repository(exclusive='repair', manifest=False)
+    @with_repository(exclusive=True, manifest=False)
     def do_check(self, args, repository):
         """Check repository consistency"""
         if args.repair:
@@ -235,7 +235,7 @@ class Archiver:
         key_new.change_passphrase()  # option to change key protection passphrase, save
         return EXIT_SUCCESS
 
-    @with_repository(fake='dry_run')
+    @with_repository(fake='dry_run', exclusive=True)
     def do_create(self, args, repository, manifest=None, key=None):
         """Create new archive"""
         matcher = PatternMatcher(fallback=True)
@@ -848,7 +848,7 @@ class Archiver:
             print(str(cache))
         return self.exit_code
 
-    @with_repository()
+    @with_repository(exclusive=True)
     def do_prune(self, args, repository, manifest, key):
         """Prune repository archives according to specified rules"""
         if not any((args.secondly, args.minutely, args.hourly, args.daily,
@@ -982,9 +982,12 @@ class Archiver:
             cache.commit()
             return self.exit_code
 
-    @with_repository(manifest=False)
+    @with_repository(manifest=False, exclusive=True)
     def do_with_lock(self, args, repository):
         """run a user specified command with the repository lock held"""
+        # for a new server, this will immediately take an exclusive lock.
+        # to support old servers, that do not have "exclusive" arg in open()
+        # RPC API, we also do it the old way:
         # re-write manifest to start a repository transaction - this causes a
         # lock upgrade to exclusive for remote (and also for local) repositories.
         # by using manifest=False in the decorator, we avoid having to require
@@ -1011,6 +1014,28 @@ class Archiver:
         print('Done.')
         return EXIT_SUCCESS
 
+    @with_repository()
+    def do_debug_dump_repo_objs(self, args, repository, manifest, key):
+        """dump (decrypted, decompressed) repo objects"""
+        marker = None
+        i = 0
+        while True:
+            result = repository.list(limit=10000, marker=marker)
+            if not result:
+                break
+            marker = result[-1]
+            for id in result:
+                cdata = repository.get(id)
+                give_id = id if id != Manifest.MANIFEST_ID else None
+                _, data = key.decrypt(give_id, cdata)
+                filename = '%06d_%s.obj' % (i, bin_to_hex(id))
+                print('Dumping', filename)
+                with open(filename, 'wb') as fd:
+                    fd.write(data)
+                i += 1
+        print('Done.')
+        return EXIT_SUCCESS
+
     @with_repository(manifest=False)
     def do_debug_get_obj(self, args, repository):
         """get object contents from the repository and write it into file"""
@@ -1030,7 +1055,7 @@ class Archiver:
                 print("object %s fetched." % hex_id)
         return EXIT_SUCCESS
 
-    @with_repository(manifest=False)
+    @with_repository(manifest=False, exclusive=True)
     def do_debug_put_obj(self, args, repository):
         """put file(s) contents into the repository"""
         for path in args.paths:
@@ -1042,7 +1067,7 @@ class Archiver:
         repository.commit()
         return EXIT_SUCCESS
 
-    @with_repository(manifest=False)
+    @with_repository(manifest=False, exclusive=True)
     def do_debug_delete_obj(self, args, repository):
         """delete the objects with the given IDs from the repo"""
         modified = False
@@ -1159,7 +1184,8 @@ class Archiver:
             EOF
             $ borg create --exclude-from exclude.txt backup /\n\n''')
     helptext['placeholders'] = textwrap.dedent('''
-        Repository (or Archive) URLs and --prefix values support these placeholders:
+        Repository (or Archive) URLs, --prefix and --remote-path values support these
+        placeholders:
 
         {hostname}
 
@@ -1185,7 +1211,11 @@ class Archiver:
 
             The current process ID.
 
-        Examples::
+        {borgversion}
+
+            The version of borg.
+
+       Examples::
 
             borg create /path/to/repo::{hostname}-{user}-{utcnow} ...
             borg create /path/to/repo::{hostname}-{now:%Y-%m-%d_%H:%M:%S} ...
@@ -1468,7 +1498,7 @@ class Archiver:
         checkpoints and treated in special ways.
 
         In the archive name, you may use the following format tags:
-        {now}, {utcnow}, {fqdn}, {hostname}, {user}, {pid}, {uuid4}
+        {now}, {utcnow}, {fqdn}, {hostname}, {user}, {pid}, {uuid4}, {borgversion}
 
         To speed up pulling backups over sshfs and similar network file systems which do
         not provide correct inode information the --ignore-inode flag can be used. This
@@ -2131,6 +2161,19 @@ class Archiver:
                                type=location_validator(archive=True),
                                help='archive to dump')
 
+        debug_dump_repo_objs_epilog = textwrap.dedent("""
+        This command dumps raw (but decrypted and decompressed) repo objects to files.
+        """)
+        subparser = subparsers.add_parser('debug-dump-repo-objs', parents=[common_parser], add_help=False,
+                                          description=self.do_debug_dump_repo_objs.__doc__,
+                                          epilog=debug_dump_repo_objs_epilog,
+                                          formatter_class=argparse.RawDescriptionHelpFormatter,
+                                          help='dump repo objects (debug)')
+        subparser.set_defaults(func=self.do_debug_dump_repo_objs)
+        subparser.add_argument('location', metavar='REPOSITORY',
+                               type=location_validator(archive=False),
+                               help='repo to dump')
+
         debug_get_obj_epilog = textwrap.dedent("""
         This command gets an object from the repository.
         """)

+ 9 - 6
src/borg/cache.py

@@ -18,7 +18,7 @@ from .helpers import format_file_size
 from .helpers import yes
 from .item import Item
 from .key import PlaintextKey
-from .locking import UpgradableLock
+from .locking import Lock
 from .platform import SaveFile
 from .remote import cache_if_remote
 
@@ -44,7 +44,7 @@ class Cache:
     @staticmethod
     def break_lock(repository, path=None):
         path = path or os.path.join(get_cache_dir(), repository.id_str)
-        UpgradableLock(os.path.join(path, 'lock'), exclusive=True).break_lock()
+        Lock(os.path.join(path, 'lock'), exclusive=True).break_lock()
 
     @staticmethod
     def destroy(repository, path=None):
@@ -172,7 +172,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
     def open(self, lock_wait=None):
         if not os.path.isdir(self.path):
             raise Exception('%s Does not look like a Borg cache' % self.path)
-        self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire()
+        self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire()
         self.rollback()
 
     def close(self):
@@ -213,12 +213,15 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
         if not self.txn_active:
             return
         if self.files is not None:
+            ttl = int(os.environ.get('BORG_FILES_CACHE_TTL', 20))
             with SaveFile(os.path.join(self.path, 'files'), binary=True) as fd:
                 for path_hash, item in self.files.items():
-                    # Discard cached files with the newest mtime to avoid
-                    # issues with filesystem snapshots and mtime precision
+                    # Only keep files seen in this backup that are older than newest mtime seen in this backup -
+                    # this is to avoid issues with filesystem snapshots and mtime granularity.
+                    # Also keep files from older backups that have not reached BORG_FILES_CACHE_TTL yet.
                     entry = FileCacheEntry(*msgpack.unpackb(item))
-                    if entry.age < 10 and bigint_to_int(entry.mtime) < self._newest_mtime:
+                    if entry.age == 0 and bigint_to_int(entry.mtime) < self._newest_mtime or \
+                       entry.age > 0 and entry.age < ttl:
                         msgpack.pack((path_hash, entry), fd)
         self.config.set('cache', 'manifest', self.manifest.id_str)
         self.config.set('cache', 'timestamp', self.manifest.timestamp)

+ 13 - 14
src/borg/fuse.py

@@ -11,6 +11,7 @@ import llfuse
 import msgpack
 
 from .logger import create_logger
+from .lrucache import LRUCache
 logger = create_logger()
 
 from .archive import Archive
@@ -68,17 +69,13 @@ class FuseOperations(llfuse.Operations):
         data_cache_capacity = int(os.environ.get('BORG_MOUNT_DATA_CACHE_ENTRIES', os.cpu_count() or 1))
         logger.debug('mount data cache capacity: %d chunks', data_cache_capacity)
         self.data_cache = LRUCache(capacity=data_cache_capacity, dispose=lambda _: None)
+        self._create_dir(parent=1)  # first call, create root dir (inode == 1)
         if archive:
             self.process_archive(archive)
         else:
-            # Create root inode
-            self.parent[1] = self.allocate_inode()
-            self.items[1] = self.default_dir
             for archive_name in manifest.archives:
                 # Create archive placeholder inode
-                archive_inode = self.allocate_inode()
-                self.items[archive_inode] = self.default_dir
-                self.parent[archive_inode] = 1
+                archive_inode = self._create_dir(parent=1)
                 self.contents[1][os.fsencode(archive_name)] = archive_inode
                 self.pending_archives[archive_inode] = Archive(repository, key, manifest, archive_name)
 
@@ -108,6 +105,14 @@ class FuseOperations(llfuse.Operations):
         finally:
             llfuse.close(umount)
 
+    def _create_dir(self, parent):
+        """Create directory
+        """
+        ino = self.allocate_inode()
+        self.items[ino] = self.default_dir
+        self.parent[ino] = parent
+        return ino
+
     def process_archive(self, archive, prefix=[]):
         """Build fuse inode hierarchy from archive metadata
         """
@@ -132,11 +137,6 @@ class FuseOperations(llfuse.Operations):
                 num_segments = len(segments)
                 parent = 1
                 for i, segment in enumerate(segments, 1):
-                    # Insert a default root inode if needed
-                    if self._inode_count == 0 and segment:
-                        archive_inode = self.allocate_inode()
-                        self.items[archive_inode] = self.default_dir
-                        self.parent[archive_inode] = parent
                     # Leaf segment?
                     if i == num_segments:
                         if 'source' in item and stat.S_ISREG(item.mode):
@@ -152,9 +152,7 @@ class FuseOperations(llfuse.Operations):
                     elif segment in self.contents[parent]:
                         parent = self.contents[parent][segment]
                     else:
-                        inode = self.allocate_inode()
-                        self.items[inode] = self.default_dir
-                        self.parent[inode] = parent
+                        inode = self._create_dir(parent)
                         if segment:
                             self.contents[parent][segment] = inode
                         parent = inode
@@ -282,6 +280,7 @@ class FuseOperations(llfuse.Operations):
                     # evict fully read chunk from cache
                     del self.data_cache[id]
             else:
+                # XXX
                 _, data = self.key.decrypt(id, self.repository.get(id))
                 if offset + n < len(data):
                     # chunk was only partially read, cache it

+ 1 - 0
src/borg/helpers.py

@@ -576,6 +576,7 @@ def replace_placeholders(text):
         'utcnow': current_time.utcnow(),
         'user': uid2user(os.getuid(), os.getuid()),
         'uuid4': str(uuid.uuid4()),
+        'borgversion': borg_version,
     }
     return format_line(text, data)
 

+ 8 - 3
src/borg/locking.py

@@ -217,7 +217,7 @@ class LockRoster:
         self.save(roster)
 
 
-class UpgradableLock:
+class Lock:
     """
     A Lock for a resource that can be accessed in a shared or exclusive way.
     Typically, write access to a resource needs an exclusive lock (1 writer,
@@ -226,7 +226,7 @@ class UpgradableLock:
 
     If possible, try to use the contextmanager here like::
 
-        with UpgradableLock(...) as lock:
+        with Lock(...) as lock:
             ...
 
     This makes sure the lock is released again if the block is left, no
@@ -242,7 +242,7 @@ class UpgradableLock:
         self._roster = LockRoster(path + '.roster', id=id)
         # an exclusive lock, used for:
         # - holding while doing roster queries / updates
-        # - holding while the UpgradableLock itself is exclusive
+        # - holding while the Lock instance itself is exclusive
         self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout)
 
     def __enter__(self):
@@ -299,6 +299,8 @@ class UpgradableLock:
                 self._roster.modify(SHARED, REMOVE)
 
     def upgrade(self):
+        # WARNING: if multiple read-lockers want to upgrade, it will deadlock because they
+        # all will wait until the other read locks go away - and that won't happen.
         if not self.is_exclusive:
             self.acquire(exclusive=True, remove=SHARED)
 
@@ -306,6 +308,9 @@ class UpgradableLock:
         if self.is_exclusive:
             self.acquire(exclusive=False, remove=EXCLUSIVE)
 
+    def got_exclusive_lock(self):
+        return self.is_exclusive and self._lock.is_locked() and self._lock.by_me()
+
     def break_lock(self):
         self._roster.remove()
         self._lock.break_lock()

+ 21 - 19
src/borg/remote.py

@@ -15,6 +15,7 @@ from .helpers import Error, IntegrityError
 from .helpers import get_home_dir
 from .helpers import sysinfo
 from .helpers import bin_to_hex
+from .helpers import replace_placeholders
 from .repository import Repository
 
 RPC_PROTOCOL_VERSION = 2
@@ -117,7 +118,7 @@ class RepositoryServer:  # pragma: no cover
     def negotiate(self, versions):
         return RPC_PROTOCOL_VERSION
 
-    def open(self, path, create=False, lock_wait=None, lock=True, append_only=False):
+    def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False):
         path = os.fsdecode(path)
         if path.startswith('/~'):
             path = os.path.join(get_home_dir(), path[2:])
@@ -128,7 +129,9 @@ class RepositoryServer:  # pragma: no cover
                     break
             else:
                 raise PathNotAllowed(path)
-        self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock, append_only=self.append_only or append_only)
+        self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock,
+                                     append_only=self.append_only or append_only,
+                                     exclusive=exclusive)
         self.repository.__enter__()  # clean exit handled by serve() method
         return self.repository.id
 
@@ -144,7 +147,7 @@ class RemoteRepository:
     class NoAppendOnlyOnServer(Error):
         """Server does not support --append-only."""
 
-    def __init__(self, location, create=False, lock_wait=None, lock=True, append_only=False, args=None):
+    def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False, args=None):
         self.location = self._location = location
         self.preload_ids = []
         self.msgid = 0
@@ -163,6 +166,7 @@ class RemoteRepository:
             # that the system's ssh binary picks up (non-matching) libraries from there
             env.pop('LD_LIBRARY_PATH', None)
         env.pop('BORG_PASSPHRASE', None)  # security: do not give secrets to subprocess
+        env['BORG_VERSION'] = __version__
         self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env)
         self.stdin_fd = self.p.stdin.fileno()
         self.stdout_fd = self.p.stdout.fileno()
@@ -174,22 +178,19 @@ class RemoteRepository:
         self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]
 
         try:
-            version = self.call('negotiate', RPC_PROTOCOL_VERSION)
-        except ConnectionClosed:
-            raise ConnectionClosedWithHint('Is borg working on the server?') from None
-        if version != RPC_PROTOCOL_VERSION:
-            raise Exception('Server insisted on using unsupported protocol version %d' % version)
-        try:
-            # Because of protocol versions, only send append_only if necessary
-            if append_only:
-                try:
-                    self.id = self.call('open', self.location.path, create, lock_wait, lock, append_only)
-                except self.RPCError as err:
-                    if err.remote_type == 'TypeError':
-                        raise self.NoAppendOnlyOnServer() from err
-                    else:
-                        raise
-            else:
+            try:
+                version = self.call('negotiate', RPC_PROTOCOL_VERSION)
+            except ConnectionClosed:
+                raise ConnectionClosedWithHint('Is borg working on the server?') from None
+            if version != RPC_PROTOCOL_VERSION:
+                raise Exception('Server insisted on using unsupported protocol version %d' % version)
+            try:
+                self.id = self.call('open', self.location.path, create, lock_wait, lock, exclusive, append_only)
+            except self.RPCError as err:
+                if err.remote_type != 'TypeError':
+                    raise
+                if append_only:
+                    raise self.NoAppendOnlyOnServer()
                 self.id = self.call('open', self.location.path, create, lock_wait, lock)
         except Exception:
             self.close()
@@ -243,6 +244,7 @@ class RemoteRepository:
             return [sys.executable, '-m', 'borg.archiver', 'serve'] + opts + self.extra_test_args
         else:  # pragma: no cover
             remote_path = args.remote_path or os.environ.get('BORG_REMOTE_PATH', 'borg')
+            remote_path = replace_placeholders(remote_path)
             return [remote_path, 'serve'] + opts
 
     def ssh_cmd(self, location):

+ 25 - 12
src/borg/repository.py

@@ -21,7 +21,7 @@ from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size
 from .helpers import Location
 from .helpers import ProgressIndicatorPercent
 from .helpers import bin_to_hex
-from .locking import UpgradableLock, LockError, LockErrorT
+from .locking import Lock, LockError, LockErrorT
 from .lrucache import LRUCache
 from .platform import SaveFile, SyncFile, sync_dir
 
@@ -129,7 +129,7 @@ class Repository:
         if self.do_create:
             self.do_create = False
             self.create(self.path)
-        self.open(self.path, self.exclusive, lock_wait=self.lock_wait, lock=self.do_lock)
+        self.open(self.path, bool(self.exclusive), lock_wait=self.lock_wait, lock=self.do_lock)
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
@@ -221,14 +221,14 @@ class Repository:
         return self.get_index_transaction_id()
 
     def break_lock(self):
-        UpgradableLock(os.path.join(self.path, 'lock')).break_lock()
+        Lock(os.path.join(self.path, 'lock')).break_lock()
 
     def open(self, path, exclusive, lock_wait=None, lock=True):
         self.path = path
         if not os.path.isdir(path):
             raise self.DoesNotExist(path)
         if lock:
-            self.lock = UpgradableLock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire()
+            self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire()
         else:
             self.lock = None
         self.config = ConfigParser(interpolation=None)
@@ -282,14 +282,23 @@ class Repository:
 
     def prepare_txn(self, transaction_id, do_cleanup=True):
         self._active_txn = True
-        try:
-            self.lock.upgrade()
-        except (LockError, LockErrorT):
-            # if upgrading the lock to exclusive fails, we do not have an
-            # active transaction. this is important for "serve" mode, where
-            # the repository instance lives on - even if exceptions happened.
-            self._active_txn = False
-            raise
+        if not self.lock.got_exclusive_lock():
+            if self.exclusive is not None:
+                # self.exclusive is either True or False, thus a new client is active here.
+                # if it is False and we get here, the caller did not use exclusive=True although
+                # it is needed for a write operation. if it is True and we get here, something else
+                # went very wrong, because we should have a exclusive lock, but we don't.
+                raise AssertionError("bug in code, exclusive lock should exist here")
+            # if we are here, this is an old client talking to a new server (expecting lock upgrade).
+            # or we are replaying segments and might need a lock upgrade for that.
+            try:
+                self.lock.upgrade()
+            except (LockError, LockErrorT):
+                # if upgrading the lock to exclusive fails, we do not have an
+                # active transaction. this is important for "serve" mode, where
+                # the repository instance lives on - even if exceptions happened.
+                self._active_txn = False
+                raise
         if not self.index or transaction_id is None:
             try:
                 self.index = self.open_index(transaction_id, False)
@@ -449,6 +458,9 @@ class Repository:
         complete_xfer(intermediate=False)
 
     def replay_segments(self, index_transaction_id, segments_transaction_id):
+        # fake an old client, so that in case we do not have an exclusive lock yet, prepare_txn will upgrade the lock:
+        remember_exclusive = self.exclusive
+        self.exclusive = None
         self.prepare_txn(index_transaction_id, do_cleanup=False)
         try:
             segment_count = sum(1 for _ in self.io.segment_iterator())
@@ -464,6 +476,7 @@ class Repository:
             pi.finish()
             self.write_index()
         finally:
+            self.exclusive = remember_exclusive
             self.rollback()
 
     def _update_index(self, segment, objects, report=None):

+ 26 - 3
src/borg/testsuite/archiver.py

@@ -242,7 +242,7 @@ class ArchiverTestCaseBase(BaseTestCase):
         self.cmd('create', self.repository_location + '::' + name, src_dir)
 
     def open_archive(self, name):
-        repository = Repository(self.repository_path)
+        repository = Repository(self.repository_path, exclusive=True)
         with repository:
             manifest, key = Manifest.load(repository)
             archive = Archive(repository, key, manifest, name)
@@ -1334,8 +1334,20 @@ class ArchiverTestCase(ArchiverTestCaseBase):
 
     @unittest.skipUnless(has_llfuse, 'llfuse not installed')
     def test_fuse(self):
+        def has_noatime(some_file):
+            atime_before = os.stat(some_file).st_atime_ns
+            try:
+                os.close(os.open(some_file, flags_noatime))
+            except PermissionError:
+                return False
+            else:
+                atime_after = os.stat(some_file).st_atime_ns
+                noatime_used = flags_noatime != flags_normal
+                return noatime_used and atime_before == atime_after
+
         self.cmd('init', self.repository_location)
         self.create_test_files()
+        have_noatime = has_noatime('input/file1')
         self.cmd('create', self.repository_location + '::archive', 'input')
         self.cmd('create', self.repository_location + '::archive2', 'input')
         if has_lchflags:
@@ -1359,7 +1371,8 @@ class ArchiverTestCase(ArchiverTestCaseBase):
             assert sti1.st_uid == sto1.st_uid
             assert sti1.st_gid == sto1.st_gid
             assert sti1.st_size == sto1.st_size
-            assert sti1.st_atime == sto1.st_atime
+            if have_noatime:
+                assert sti1.st_atime == sto1.st_atime
             assert sti1.st_ctime == sto1.st_ctime
             assert sti1.st_mtime == sto1.st_mtime
             # note: there is another hardlink to this, see below
@@ -1472,6 +1485,16 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         assert len(output_dir) > 0 and output_dir[0].startswith('000000_')
         assert 'Done.' in output
 
+    def test_debug_dump_repo_objs(self):
+        self.create_test_files()
+        self.cmd('init', self.repository_location)
+        self.cmd('create', self.repository_location + '::test', 'input')
+        with changedir('output'):
+            output = self.cmd('debug-dump-repo-objs', self.repository_location)
+        output_dir = sorted(os.listdir('output'))
+        assert len(output_dir) > 0 and output_dir[0].startswith('000000_')
+        assert 'Done.' in output
+
     def test_debug_put_get_delete_obj(self):
         self.cmd('init', self.repository_location)
         data = b'some data'
@@ -1842,7 +1865,7 @@ class ArchiverCheckTestCase(ArchiverTestCaseBase):
 
     def test_extra_chunks(self):
         self.cmd('check', self.repository_location, exit_code=0)
-        with Repository(self.repository_location) as repository:
+        with Repository(self.repository_location, exclusive=True) as repository:
             repository.put(b'01234567890123456789012345678901', b'xxxx')
             repository.commit()
         self.cmd('check', self.repository_location, exit_code=1)

+ 23 - 15
src/borg/testsuite/locking.py

@@ -2,7 +2,7 @@ import time
 
 import pytest
 
-from ..locking import get_id, TimeoutTimer, ExclusiveLock, UpgradableLock, LockRoster, \
+from ..locking import get_id, TimeoutTimer, ExclusiveLock, Lock, LockRoster, \
                       ADD, REMOVE, SHARED, EXCLUSIVE, LockTimeout
 
 
@@ -58,52 +58,60 @@ class TestExclusiveLock:
                 ExclusiveLock(lockpath, id=ID2, timeout=0.1).acquire()
 
 
-class TestUpgradableLock:
+class TestLock:
     def test_shared(self, lockpath):
-        lock1 = UpgradableLock(lockpath, exclusive=False, id=ID1).acquire()
-        lock2 = UpgradableLock(lockpath, exclusive=False, id=ID2).acquire()
+        lock1 = Lock(lockpath, exclusive=False, id=ID1).acquire()
+        lock2 = Lock(lockpath, exclusive=False, id=ID2).acquire()
         assert len(lock1._roster.get(SHARED)) == 2
         assert len(lock1._roster.get(EXCLUSIVE)) == 0
         lock1.release()
         lock2.release()
 
     def test_exclusive(self, lockpath):
-        with UpgradableLock(lockpath, exclusive=True, id=ID1) as lock:
+        with Lock(lockpath, exclusive=True, id=ID1) as lock:
             assert len(lock._roster.get(SHARED)) == 0
             assert len(lock._roster.get(EXCLUSIVE)) == 1
 
     def test_upgrade(self, lockpath):
-        with UpgradableLock(lockpath, exclusive=False) as lock:
+        with Lock(lockpath, exclusive=False) as lock:
             lock.upgrade()
             lock.upgrade()  # NOP
             assert len(lock._roster.get(SHARED)) == 0
             assert len(lock._roster.get(EXCLUSIVE)) == 1
 
     def test_downgrade(self, lockpath):
-        with UpgradableLock(lockpath, exclusive=True) as lock:
+        with Lock(lockpath, exclusive=True) as lock:
             lock.downgrade()
             lock.downgrade()  # NOP
             assert len(lock._roster.get(SHARED)) == 1
             assert len(lock._roster.get(EXCLUSIVE)) == 0
 
+    def test_got_exclusive_lock(self, lockpath):
+        lock = Lock(lockpath, exclusive=True, id=ID1)
+        assert not lock.got_exclusive_lock()
+        lock.acquire()
+        assert lock.got_exclusive_lock()
+        lock.release()
+        assert not lock.got_exclusive_lock()
+
     def test_break(self, lockpath):
-        lock = UpgradableLock(lockpath, exclusive=True, id=ID1).acquire()
+        lock = Lock(lockpath, exclusive=True, id=ID1).acquire()
         lock.break_lock()
         assert len(lock._roster.get(SHARED)) == 0
         assert len(lock._roster.get(EXCLUSIVE)) == 0
-        with UpgradableLock(lockpath, exclusive=True, id=ID2):
+        with Lock(lockpath, exclusive=True, id=ID2):
             pass
 
     def test_timeout(self, lockpath):
-        with UpgradableLock(lockpath, exclusive=False, id=ID1):
+        with Lock(lockpath, exclusive=False, id=ID1):
             with pytest.raises(LockTimeout):
-                UpgradableLock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire()
-        with UpgradableLock(lockpath, exclusive=True, id=ID1):
+                Lock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire()
+        with Lock(lockpath, exclusive=True, id=ID1):
             with pytest.raises(LockTimeout):
-                UpgradableLock(lockpath, exclusive=False, id=ID2, timeout=0.1).acquire()
-        with UpgradableLock(lockpath, exclusive=True, id=ID1):
+                Lock(lockpath, exclusive=False, id=ID2, timeout=0.1).acquire()
+        with Lock(lockpath, exclusive=True, id=ID1):
             with pytest.raises(LockTimeout):
-                UpgradableLock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire()
+                Lock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire()
 
 
 @pytest.fixture()

+ 42 - 19
src/borg/testsuite/repository.py

@@ -11,17 +11,23 @@ import pytest
 from ..hashindex import NSIndex
 from ..helpers import Location
 from ..helpers import IntegrityError
-from ..locking import UpgradableLock, LockFailed
+from ..locking import Lock, LockFailed
 from ..remote import RemoteRepository, InvalidRPCMethod, ConnectionClosedWithHint, handle_remote_line
 from ..repository import Repository, LoggedIO, MAGIC
 from . import BaseTestCase
 
 
+UNSPECIFIED = object()  # for default values where we can't use None
+
+
 class RepositoryTestCaseBase(BaseTestCase):
     key_size = 32
+    exclusive = True
 
-    def open(self, create=False):
-        return Repository(os.path.join(self.tmppath, 'repository'), create=create)
+    def open(self, create=False, exclusive=UNSPECIFIED):
+        if exclusive is UNSPECIFIED:
+            exclusive = self.exclusive
+        return Repository(os.path.join(self.tmppath, 'repository'), exclusive=exclusive, create=create)
 
     def setUp(self):
         self.tmppath = tempfile.mkdtemp()
@@ -32,10 +38,10 @@ class RepositoryTestCaseBase(BaseTestCase):
         self.repository.close()
         shutil.rmtree(self.tmppath)
 
-    def reopen(self):
+    def reopen(self, exclusive=UNSPECIFIED):
         if self.repository:
             self.repository.close()
-        self.repository = self.open()
+        self.repository = self.open(exclusive=exclusive)
 
     def add_keys(self):
         self.repository.put(b'00000000000000000000000000000000', b'foo')
@@ -201,17 +207,6 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
             self.assert_equal(len(self.repository), 3)
             self.assert_equal(self.repository.check(), True)
 
-    def test_replay_of_readonly_repository(self):
-        self.add_keys()
-        for name in os.listdir(self.repository.path):
-            if name.startswith('index.'):
-                os.unlink(os.path.join(self.repository.path, name))
-        with patch.object(UpgradableLock, 'upgrade', side_effect=LockFailed) as upgrade:
-            self.reopen()
-            with self.repository:
-                self.assert_raises(LockFailed, lambda: len(self.repository))
-                upgrade.assert_called_once_with()
-
     def test_crash_before_write_index(self):
         self.add_keys()
         self.repository.write_index = None
@@ -224,6 +219,32 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
             self.assert_equal(len(self.repository), 3)
             self.assert_equal(self.repository.check(), True)
 
+    def test_replay_lock_upgrade_old(self):
+        self.add_keys()
+        for name in os.listdir(self.repository.path):
+            if name.startswith('index.'):
+                os.unlink(os.path.join(self.repository.path, name))
+        with patch.object(Lock, 'upgrade', side_effect=LockFailed) as upgrade:
+            self.reopen(exclusive=None)  # simulate old client that always does lock upgrades
+            with self.repository:
+                # the repo is only locked by a shared read lock, but to replay segments,
+                # we need an exclusive write lock - check if the lock gets upgraded.
+                self.assert_raises(LockFailed, lambda: len(self.repository))
+                upgrade.assert_called_once_with()
+
+    def test_replay_lock_upgrade(self):
+        self.add_keys()
+        for name in os.listdir(self.repository.path):
+            if name.startswith('index.'):
+                os.unlink(os.path.join(self.repository.path, name))
+        with patch.object(Lock, 'upgrade', side_effect=LockFailed) as upgrade:
+            self.reopen(exclusive=False)  # current client usually does not do lock upgrade, except for replay
+            with self.repository:
+                # the repo is only locked by a shared read lock, but to replay segments,
+                # we need an exclusive write lock - check if the lock gets upgraded.
+                self.assert_raises(LockFailed, lambda: len(self.repository))
+                upgrade.assert_called_once_with()
+
     def test_crash_before_deleting_compacted_segments(self):
         self.add_keys()
         self.repository.io.delete_segment = None
@@ -247,7 +268,7 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
 
 class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase):
     def open(self, create=False):
-        return Repository(os.path.join(self.tmppath, 'repository'), create=create, append_only=True)
+        return Repository(os.path.join(self.tmppath, 'repository'), exclusive=True, create=create, append_only=True)
 
     def test_destroy_append_only(self):
         # Can't destroy append only repo (via the API)
@@ -474,7 +495,8 @@ class RepositoryCheckTestCase(RepositoryTestCaseBase):
 class RemoteRepositoryTestCase(RepositoryTestCase):
 
     def open(self, create=False):
-        return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')), create=create)
+        return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')),
+                                exclusive=True, create=create)
 
     def test_invalid_rpc(self):
         self.assert_raises(InvalidRPCMethod, lambda: self.repository.call('__init__', None))
@@ -503,7 +525,8 @@ class RemoteRepositoryTestCase(RepositoryTestCase):
 class RemoteRepositoryCheckTestCase(RepositoryCheckTestCase):
 
     def open(self, create=False):
-        return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')), create=create)
+        return RemoteRepository(Location('__testsuite__:' + os.path.join(self.tmppath, 'repository')),
+                                exclusive=True, create=create)
 
     def test_crash_before_compact(self):
         # skip this test, we can't mock-patch a Repository class in another process!

+ 1 - 1
src/borg/testsuite/upgrader.py

@@ -24,7 +24,7 @@ def repo_valid(path):
     :param path: the path to the repository
     :returns: if borg can check the repository
     """
-    with Repository(str(path), create=False) as repository:
+    with Repository(str(path), exclusive=True, create=False) as repository:
         # can't check raises() because check() handles the error
         return repository.check()
 

+ 3 - 4
src/borg/upgrader.py

@@ -9,7 +9,7 @@ logger = logging.getLogger(__name__)
 from .helpers import get_home_dir, get_keys_dir, get_cache_dir
 from .helpers import ProgressIndicatorPercent
 from .key import KeyfileKey, KeyfileNotFoundError
-from .locking import UpgradableLock
+from .locking import Lock
 from .repository import Repository, MAGIC
 
 ATTIC_MAGIC = b'ATTICSEG'
@@ -40,7 +40,7 @@ class AtticRepositoryUpgrader(Repository):
                     shutil.copytree(self.path, backup, copy_function=os.link)
             logger.info("opening attic repository with borg and converting")
             # now lock the repo, after we have made the copy
-            self.lock = UpgradableLock(os.path.join(self.path, 'lock'), exclusive=True, timeout=1.0).acquire()
+            self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=1.0).acquire()
             segments = [filename for i, filename in self.io.segment_iterator()]
             try:
                 keyfile = self.find_attic_keyfile()
@@ -49,8 +49,7 @@ class AtticRepositoryUpgrader(Repository):
             else:
                 self.convert_keyfiles(keyfile, dryrun)
         # partial open: just hold on to the lock
-        self.lock = UpgradableLock(os.path.join(self.path, 'lock'),
-                                   exclusive=True).acquire()
+        self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True).acquire()
         try:
             self.convert_cache(dryrun)
             self.convert_repo_index(dryrun=dryrun, inplace=inplace)