浏览代码

locking3: store-based repo locking

Features:
- exclusive and non-exclusive locks
- acquire timeout
- lock auto-expiry (after 30mins of inactivity), lock refresh
- use tz-aware datetimes (in utc timezone) in locks

Also:
- document lock acquisition rules in the src
- increased default BORG_LOCK_WAIT to 10s
- better document with-lock test

Stale locks are ignored and automatically deleted.
Default: stale == 30 Minutes old.

lock.refresh() can be called frequently to avoid that an acquired lock becomes stale.
It does not do much if the last real refresh was recently.
After stale/2 time it checks and refreshes the locks in the store.

Update the repository3 code to call refresh frequently:
- get/put/list/scan
- inside check loop
Thomas Waldmann 10 月之前
父节点
当前提交
72d0caeb6b

+ 1 - 1
src/borg/archiver/_common.py

@@ -510,7 +510,7 @@ def define_common_options(add_common_option):
         metavar="SECONDS",
         dest="lock_wait",
         type=int,
-        default=int(os.environ.get("BORG_LOCK_WAIT", 1)),
+        default=int(os.environ.get("BORG_LOCK_WAIT", 10)),
         action=Highlander,
         help="wait at most SECONDS for acquiring a repository/cache lock (default: %(default)d).",
     )

+ 0 - 23
src/borg/archiver/lock_cmds.py

@@ -5,7 +5,6 @@ from ._common import with_repository
 from ..cache import Cache
 from ..constants import *  # NOQA
 from ..helpers import prepare_subprocess_env, set_ec, CommandError
-from ..manifest import Manifest
 
 from ..logger import create_logger
 
@@ -16,20 +15,6 @@ class LocksMixIn:
     @with_repository(manifest=False, exclusive=True)
     def do_with_lock(self, args, repository):
         """run a user specified command with the repository lock held"""
-        # for a new server, this will immediately take an exclusive lock.
-        # to support old servers, that do not have "exclusive" arg in open()
-        # RPC API, we also do it the old way:
-        # re-write manifest to start a repository transaction - this causes a
-        # lock upgrade to exclusive for remote (and also for local) repositories.
-        # by using manifest=False in the decorator, we avoid having to require
-        # the encryption key (and can operate just with encrypted data).
-        data = repository.get(Manifest.MANIFEST_ID)
-        repository.put(Manifest.MANIFEST_ID, data)
-        # usually, a 0 byte (open for writing) segment file would be visible in the filesystem here.
-        # we write and close this file, to rather have a valid segment file on disk, before invoking the subprocess.
-        # we can only do this for local repositories (with .io), though:
-        if hasattr(repository, "io"):
-            repository.io.close_segment()
         env = prepare_subprocess_env(system=True)
         try:
             # we exit with the return code we get from the subprocess
@@ -37,14 +22,6 @@ class LocksMixIn:
             set_ec(rc)
         except (FileNotFoundError, OSError, ValueError) as e:
             raise CommandError(f"Error while trying to run '{args.command}': {e}")
-        finally:
-            # we need to commit the "no change" operation we did to the manifest
-            # because it created a new segment file in the repository. if we would
-            # roll back, the same file would be later used otherwise (for other content).
-            # that would be bad if somebody uses rsync with ignore-existing (or
-            # any other mechanism relying on existing segment data not changing).
-            # see issue #1867.
-            repository.commit(compact=False)
 
     @with_repository(lock=False, manifest=False)
     def do_break_lock(self, args, repository):

+ 222 - 0
src/borg/locking3.py

@@ -0,0 +1,222 @@
+import datetime
+import json
+import random
+import time
+
+from borgstore.store import ObjectNotFound
+
+from . import platform
+from .checksums import xxh64
+from .helpers import Error, ErrorWithTraceback, bin_to_hex
+from .logger import create_logger
+
+logger = create_logger(__name__)
+
+
+class LockError(Error):
+    """Failed to acquire the lock {}."""
+
+    exit_mcode = 70
+
+
+class LockErrorT(ErrorWithTraceback):
+    """Failed to acquire the lock {}."""
+
+    exit_mcode = 71
+
+
+class LockFailed(LockErrorT):
+    """Failed to create/acquire the lock {} ({})."""
+
+    exit_mcode = 72
+
+
+class LockTimeout(LockError):
+    """Failed to create/acquire the lock {} (timeout)."""
+
+    exit_mcode = 73
+
+
+class NotLocked(LockErrorT):
+    """Failed to release the lock {} (was not locked)."""
+
+    exit_mcode = 74
+
+
+class NotMyLock(LockErrorT):
+    """Failed to release the lock {} (was/is locked, but not by me)."""
+
+    exit_mcode = 75
+
+
+class Lock:
+    """
+    A Lock for a resource that can be accessed in a shared or exclusive way.
+    Typically, write access to a resource needs an exclusive lock (1 writer,
+    no one is allowed reading) and read access to a resource needs a shared
+    lock (multiple readers are allowed).
+
+    If possible, try to use the contextmanager here like::
+
+        with Lock(...) as lock:
+            ...
+
+    This makes sure the lock is released again if the block is left, no
+    matter how (e.g. if an exception occurred).
+    """
+
+    def __init__(self, store, exclusive=False, sleep=None, timeout=1.0, stale=30*60, id=None):
+        self.store = store
+        self.is_exclusive = exclusive
+        self.sleep = sleep
+        self.timeout = timeout
+        self.race_recheck_delay = 0.01  # local: 0.01, network/slow remote: >= 1.0
+        self.other_locks_go_away_delay = 0.1  # local: 0.1, network/slow remote: >= 1.0
+        self.retry_delay_min = 1.0
+        self.retry_delay_max = 5.0
+        self.stale_td = datetime.timedelta(seconds=stale)  # ignore/delete it if older
+        self.refresh_td = datetime.timedelta(seconds=stale//2)  # don't refresh it if younger
+        self.last_refresh_dt = None
+        self.id = id or platform.get_process_id()
+        assert len(self.id) == 3
+
+    def __enter__(self):
+        return self.acquire()
+
+    def __exit__(self, *exc):
+        self.release()
+
+    def __repr__(self):
+        return f"<{self.__class__.__name__}: {self.id!r}>"
+
+    def _create_lock(self, *, exclusive=None):
+        assert exclusive is not None
+        now = datetime.datetime.now(datetime.timezone.utc)
+        timestamp = now.isoformat(timespec="milliseconds")
+        lock = dict(exclusive=exclusive, hostid=self.id[0], processid=self.id[1], threadid=self.id[2], time=timestamp)
+        value = json.dumps(lock).encode("utf-8")
+        key = bin_to_hex(xxh64(value))
+        self.store.store(f"locks/{key}", value)
+        self.last_refresh_dt = now
+        return key
+
+    def _delete_lock(self, key, *, ignore_not_found=False):
+        try:
+            self.store.delete(f"locks/{key}")
+        except ObjectNotFound:
+            if not ignore_not_found:
+                raise
+
+    def _get_locks(self):
+        now = datetime.datetime.now(datetime.timezone.utc)
+        locks = {}
+        try:
+            infos = list(self.store.list("locks"))
+        except ObjectNotFound:
+            return {}
+        for info in infos:
+            key = info.name
+            content = self.store.load(f"locks/{key}")
+            lock = json.loads(content.decode("utf-8"))
+            dt = datetime.datetime.fromisoformat(lock["time"])
+            stale = dt < now - self.stale_td
+            if stale:
+                # ignore it and delete it (even if it is not from us)
+                self._delete_lock(key, ignore_not_found=True)
+            else:
+                lock["key"] = key
+                lock["dt"] = dt
+                locks[key] = lock
+        return locks
+
+    def _find_locks(self, *, only_exclusive=False, only_mine=False):
+        locks = self._get_locks()
+        found_locks = []
+        for key in locks:
+            lock = locks[key]
+            if (not only_exclusive or lock["exclusive"]) and (not only_mine or (lock["hostid"], lock["processid"], lock["threadid"]) == self.id):
+                found_locks.append(lock)
+        return found_locks
+
+    def acquire(self):
+        # goal
+        # for exclusive lock: there must be only 1 exclusive lock and no other (exclusive or non-exclusive) locks.
+        # for non-exclusive lock: there can be multiple n-e locks, but there must not exist an exclusive lock.
+        started = time.monotonic()
+        while time.monotonic() - started < self.timeout:
+            exclusive_locks = self._find_locks(only_exclusive=True)
+            if len(exclusive_locks) == 0:
+                # looks like there are no exclusive locks, create our lock.
+                key = self._create_lock(exclusive=self.is_exclusive)
+                # obviously we have a race condition here: other client(s) might have created exclusive
+                # lock(s) at the same time in parallel. thus we have to check again.
+                time.sleep(self.race_recheck_delay)  # give other clients time to notice our exclusive lock, stop creating theirs
+                exclusive_locks = self._find_locks(only_exclusive=True)
+                if self.is_exclusive:
+                    if len(exclusive_locks) == 1 and exclusive_locks[0]["key"] == key:
+                        # success, we are the only exclusive lock! wait until the non-exclusive locks go away:
+                        while time.monotonic() - started < self.timeout:
+                            locks = self._find_locks(only_exclusive=False)
+                            if len(locks) == 1 and locks[0]["key"] == key:
+                                # success, we are alone!
+                                return self
+                            time.sleep(self.other_locks_go_away_delay)
+                        break  # timeout
+                    else:
+                        # take back our lock as some other client(s) also created exclusive lock(s).
+                        self._delete_lock(key, ignore_not_found=True)
+                else:  # not is_exclusive
+                    if len(exclusive_locks) == 0:
+                        # success, noone else created an exclusive lock meanwhile!
+                        # We don't care for other non-exclusive locks.
+                        return self
+                    else:
+                        # take back our lock as some other client(s) also created exclusive lock(s).
+                        self._delete_lock(key, ignore_not_found=True)
+            # wait a random bit before retrying
+            time.sleep(self.retry_delay_min + (self.retry_delay_max - self.retry_delay_min) * random.random())
+        # timeout
+        raise LockFailed(str(self.store), "timeout")
+
+    def release(self):
+        locks = self._find_locks(only_mine=True)
+        if not locks:
+            raise NotLocked(str(self.store))
+        assert len(locks) == 1
+        self._delete_lock(locks[0]["key"], ignore_not_found=True)
+        self.last_refresh_dt = None
+
+    def got_exclusive_lock(self):
+        locks = self._find_locks(only_mine=True, only_exclusive=True)
+        return len(locks) == 1
+
+    def break_lock(self):
+        """break ALL locks (not just ours)"""
+        locks = self._get_locks()
+        for key in locks:
+            self._delete_lock(key, ignore_not_found=True)
+        self.last_refresh_dt = None
+
+    def migrate_lock(self, old_id, new_id):
+        """migrate the lock ownership from old_id to new_id"""
+        assert self.id == old_id
+        assert len(new_id) == 3
+        old_locks = self._find_locks(only_mine=True)
+        assert len(old_locks) == 1
+        self.id = new_id
+        self._create_lock(exclusive=old_locks[0]["exclusive"])
+        self._delete_lock(old_locks[0]["key"])
+        now = datetime.datetime.now(datetime.timezone.utc)
+        self.last_refresh_dt = now
+
+    def refresh(self):
+        """refresh the lock - call this frequently, but not later than every <stale> seconds"""
+        now = datetime.datetime.now(datetime.timezone.utc)
+        if self.last_refresh_dt is not None and now > self.last_refresh_dt + self.refresh_td:
+            old_locks = self._find_locks(only_mine=True)
+            assert len(old_locks) == 1
+            old_lock = old_locks[0]
+            if old_lock["dt"] < now - self.refresh_td:
+                self._create_lock(exclusive=old_lock["exclusive"])
+                self._delete_lock(old_lock["key"])
+                self.last_refresh_dt = now

+ 1 - 1
src/borg/remote3.py

@@ -577,7 +577,7 @@ class RemoteRepository3:
         location,
         create=False,
         exclusive=False,
-        lock_wait=None,
+        lock_wait=1.0,
         lock=True,
         append_only=False,
         make_parent_dirs=False,

+ 26 - 4
src/borg/repository3.py

@@ -8,6 +8,7 @@ from .constants import *  # NOQA
 from .helpers import Error, ErrorWithTraceback, IntegrityError
 from .helpers import Location
 from .helpers import bin_to_hex, hex_to_bin
+from .locking3 import Lock
 from .logger import create_logger
 from .repoobj import RepoObj
 
@@ -82,7 +83,7 @@ class Repository3:
         path,
         create=False,
         exclusive=False,
-        lock_wait=None,
+        lock_wait=1.0,
         lock=True,
         append_only=False,
         storage_quota=None,
@@ -107,6 +108,10 @@ class Repository3:
         self.append_only = append_only  # XXX not implemented / not implementable
         self.storage_quota = storage_quota  # XXX not implemented
         self.storage_quota_use = 0  # XXX not implemented
+        self.lock = None
+        self.do_lock = lock
+        self.lock_wait = lock_wait
+        self.exclusive = exclusive
 
     def __repr__(self):
         return f"<{self.__class__.__name__} {self.path}>"
@@ -116,7 +121,7 @@ class Repository3:
             self.do_create = False
             self.create()
             self.created = True
-        self.open()
+        self.open(exclusive=bool(self.exclusive), lock_wait=self.lock_wait, lock=self.do_lock)
         return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
@@ -143,6 +148,10 @@ class Repository3:
         self.id = id
         self.store.store("config/id", bin_to_hex(id).encode())
 
+    def _lock_refresh(self):
+        if self.lock is not None:
+            self.lock.refresh()
+
     def save_key(self, keydata):
         # note: saving an empty key means that there is no repokey anymore
         self.store.store("keys/repokey", keydata)
@@ -157,8 +166,13 @@ class Repository3:
         self.close()
         self.store.destroy()
 
-    def open(self):
+    def open(self, *, exclusive, lock_wait=None, lock=True):
+        assert lock_wait is not None
         self.store.open()
+        if lock:
+            self.lock = Lock(self.store, exclusive, timeout=lock_wait).acquire()
+        else:
+            self.lock = None
         readme = self.store.load("config/readme").decode()
         if readme != REPOSITORY_README:
             raise self.InvalidRepository(self.path)
@@ -173,6 +187,9 @@ class Repository3:
 
     def close(self):
         if self.opened:
+            if self.lock:
+                self.lock.release()
+                self.lock = None
             self.store.close()
             self.opened = False
 
@@ -197,6 +214,7 @@ class Repository3:
         objs_checked = objs_errors = 0
         infos = self.store.list("data")
         for info in infos:
+            self._lock_refresh()
             obj_corrupted = False
             key = "data/%s" % info.name
             obj = self.store.load(key)
@@ -241,6 +259,7 @@ class Repository3:
 
         if mask and value are given, only return IDs where flags & mask == value (default: all IDs).
         """
+        self._lock_refresh()
         infos = self.store.list("data")  # XXX we can only get the full list from the store
         ids = [hex_to_bin(info.name) for info in infos]
         if marker is not None:
@@ -266,6 +285,7 @@ class Repository3:
         return ids, state
 
     def get(self, id, read_data=True):
+        self._lock_refresh()
         id_hex = bin_to_hex(id)
         key = "data/" + id_hex
         try:
@@ -308,6 +328,7 @@ class Repository3:
         Note: when doing calls with wait=False this gets async and caller must
               deal with async results / exceptions later.
         """
+        self._lock_refresh()
         data_size = len(data)
         if data_size > MAX_DATA_SIZE:
             raise IntegrityError(f"More than allowed put data [{data_size} > {MAX_DATA_SIZE}]")
@@ -321,6 +342,7 @@ class Repository3:
         Note: when doing calls with wait=False this gets async and caller must
               deal with async results / exceptions later.
         """
+        self._lock_refresh()
         key = "data/" + bin_to_hex(id)
         try:
             self.store.delete(key)
@@ -342,4 +364,4 @@ class Repository3:
         """Preload objects (only applies to remote repositories)"""
 
     def break_lock(self):
-        pass
+        Lock(self.store).break_lock()

+ 29 - 6
src/borg/testsuite/archiver/lock_cmds.py

@@ -1,4 +1,6 @@
 import os
+import subprocess
+import time
 
 from ...constants import *  # NOQA
 from . import cmd, generate_archiver_tests, RK_ENCRYPTION
@@ -13,12 +15,33 @@ def test_break_lock(archivers, request):
     cmd(archiver, "break-lock")
 
 
-def test_with_lock(archivers, request):
-    archiver = request.getfixturevalue(archivers)
-    cmd(archiver, "rcreate", RK_ENCRYPTION)
-    lock_path = os.path.join(archiver.repository_path, "lock.exclusive")
-    command = "python3", "-c", 'import os, sys; sys.exit(42 if os.path.exists("%s") else 23)' % lock_path
-    cmd(archiver, "with-lock", *command, fork=True, exit_code=42)
+def test_with_lock(tmp_path):
+    repo_path = tmp_path / "repo"
+    env = os.environ.copy()
+    env["BORG_REPO"] = "file://" + str(repo_path)
+    command0 = "python3", "-m", "borg", "rcreate", "--encryption=none"
+    # timings must be adjusted so that command1 keeps running while command2 tries to get the lock,
+    # so that lock acquisition for command2 fails as the test expects it.
+    lock_wait, execution_time, startup_wait = 2, 4, 1
+    assert lock_wait < execution_time - startup_wait
+    command1 = "python3", "-c", f'import time; print("first command - acquires the lock"); time.sleep({execution_time})'
+    command2 = "python3", "-c", 'print("second command - should never get executed")'
+    borgwl = "python3", "-m", "borg", "with-lock", f"--lock-wait={lock_wait}"
+    popen_options = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env)
+    subprocess.run(command0, env=env, check=True, text=True, capture_output=True)
+    assert repo_path.exists()
+    with subprocess.Popen([*borgwl, *command1], **popen_options) as p1:
+        time.sleep(startup_wait)  # wait until p1 is running
+        # now try to get another lock on the same repository:
+        with subprocess.Popen([*borgwl, *command2], **popen_options) as p2:
+            out, err_out = p2.communicate()
+            assert "second command" not in out  # command2 is "locked out"
+            assert "Failed to create/acquire the lock" in err_out
+            assert p2.returncode == 72  # LockTimeout: could not acquire the lock, p1 already has it
+        out, err_out = p1.communicate()
+        assert "first command" in out  # command1 was executed and had the lock
+        assert not err_out
+        assert p1.returncode == 0
 
 
 def test_with_lock_non_existent_command(archivers, request):

+ 90 - 0
src/borg/testsuite/locking3.py

@@ -0,0 +1,90 @@
+import time
+
+import pytest
+
+from borgstore.store import Store
+
+from ..locking3 import (
+    Lock,
+    LockFailed,
+    NotLocked,
+)
+
+ID1 = "foo", 1, 1
+ID2 = "bar", 2, 2
+
+
+@pytest.fixture()
+def lockstore(tmpdir):
+    store = Store("file://" + str(tmpdir / "lockstore"))
+    store.create()
+    with store:
+        yield store
+    store.destroy()
+
+
+class TestLock:
+    def test_cm(self, lockstore):
+        with Lock(lockstore, exclusive=True, id=ID1) as lock:
+            assert lock.got_exclusive_lock()
+        with Lock(lockstore, exclusive=False, id=ID1) as lock:
+            assert not lock.got_exclusive_lock()
+
+    def test_got_exclusive_lock(self, lockstore):
+        lock = Lock(lockstore, exclusive=True, id=ID1)
+        assert not lock.got_exclusive_lock()
+        lock.acquire()
+        assert lock.got_exclusive_lock()
+        lock.release()
+        assert not lock.got_exclusive_lock()
+
+    def test_exclusive_lock(self, lockstore):
+        # there must not be 2 exclusive locks
+        with Lock(lockstore, exclusive=True, id=ID1):
+            with pytest.raises(LockFailed):
+                Lock(lockstore, exclusive=True, id=ID2).acquire()
+        # acquiring an exclusive lock will time out if the non-exclusive does not go away
+        with Lock(lockstore, exclusive=False, id=ID1):
+            with pytest.raises(LockFailed):
+                Lock(lockstore, exclusive=True, id=ID2).acquire()
+
+    def test_double_nonexclusive_lock_succeeds(self, lockstore):
+        with Lock(lockstore, exclusive=False, id=ID1):
+            with Lock(lockstore, exclusive=False, id=ID2):
+                pass
+
+    def test_not_locked(self, lockstore):
+        lock = Lock(lockstore, exclusive=True, id=ID1)
+        with pytest.raises(NotLocked):
+            lock.release()
+        lock = Lock(lockstore, exclusive=False, id=ID1)
+        with pytest.raises(NotLocked):
+            lock.release()
+
+    def test_break_lock(self, lockstore):
+        lock = Lock(lockstore, exclusive=True, id=ID1).acquire()
+        lock.break_lock()
+        with Lock(lockstore, exclusive=True, id=ID2):
+            pass
+        with Lock(lockstore, exclusive=True, id=ID1):
+            pass
+
+    def test_lock_refresh_stale_removal(self, lockstore):
+        # stale after 2s, refreshable after 1s
+        lock = Lock(lockstore, exclusive=True, id=ID1, stale=2)
+        lock.acquire()
+        lock_keys_a00 = set(lock._get_locks())
+        time.sleep(0.5)
+        lock.refresh()  # shouldn't change locks, existing lock too young
+        lock_keys_a05 = set(lock._get_locks())
+        time.sleep(0.6)
+        lock.refresh()  # that should refresh the lock!
+        lock_keys_b00 = set(lock._get_locks())
+        time.sleep(2.1)
+        lock_keys_b21 = set(lock._get_locks())  # now the lock should be stale & gone.
+        assert lock_keys_a00 == lock_keys_a05  # was too young, no refresh done
+        assert len(lock_keys_a00) == 1
+        assert lock_keys_a00 != lock_keys_b00  # refresh done, new lock has different key
+        assert len(lock_keys_b00) == 1
+        assert len(lock_keys_b21) == 0  # stale lock was ignored
+        assert len(list(lock.store.list("locks"))) == 0  # stale lock was removed from store