Răsfoiți Sursa

migrate locks to child PID when daemonize is used

also:

increase platform api version due to change in get_process_id behaviour.

(cherry picked from commit 6f94949a36607f977bdb8ac835e98647896e4b7a)
(cherry picked from commit 5bad764637ece004cccce6273d5028b97148c9dc)
Thomas Waldmann 7 ani în urmă
părinte
comite
09e3a02fbc

+ 5 - 1
src/borg/fuse.py

@@ -22,6 +22,7 @@ from .hashindex import FuseVersionsIndex
 from .helpers import daemonize, hardlinkable, signal_handler, format_file_size
 from .item import Item
 from .lrucache import LRUCache
+from .remote import RemoteRepository
 
 # Does this version of llfuse support ns precision?
 have_fuse_xtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns')
@@ -285,7 +286,10 @@ class FuseOperations(llfuse.Operations):
         self._create_filesystem()
         llfuse.init(self, mountpoint, options)
         if not foreground:
-            daemonize()
+            old_id, new_id = daemonize()
+            if not isinstance(self.repository_uncached, RemoteRepository):
+                # local repo and the locking process' PID just changed, migrate it:
+                self.repository_uncached.migrate_lock(old_id, new_id)
 
         # If the file system crashes, we do not want to umount because in that
         # case the mountpoint suddenly appears to become empty. This can have

+ 7 - 1
src/borg/helpers.py

@@ -139,7 +139,7 @@ def check_extension_modules():
         raise ExtensionModuleError
     if borg.crypto.low_level.API_VERSION != '1.1_02':
         raise ExtensionModuleError
-    if platform.API_VERSION != platform.OS_API_VERSION != '1.1_01':
+    if platform.API_VERSION != platform.OS_API_VERSION != '1.1_02':
         raise ExtensionModuleError
     if item.API_VERSION != '1.1_02':
         raise ExtensionModuleError
@@ -1183,7 +1183,11 @@ def make_path_safe(path):
 
 def daemonize():
     """Detach process from controlling terminal and run in background
+
+    Returns: old and new get_process_id tuples
     """
+    from .platform import get_process_id
+    old_id = get_process_id()
     pid = os.fork()
     if pid:
         os._exit(0)
@@ -1199,6 +1203,8 @@ def daemonize():
     os.dup2(fd, 0)
     os.dup2(fd, 1)
     os.dup2(fd, 2)
+    new_id = get_process_id()
+    return old_id, new_id
 
 
 class StableDict(dict):

+ 40 - 0
src/borg/locking.py

@@ -202,6 +202,16 @@ class ExclusiveLock:
                 os.unlink(os.path.join(self.path, name))
             os.rmdir(self.path)
 
+    def migrate_lock(self, old_id, new_id):
+        """migrate the lock ownership from old_id to new_id"""
+        assert self.id == old_id
+        new_unique_name = os.path.join(self.path, "%s.%d-%x" % new_id)
+        if self.is_locked() and self.by_me():
+            with open(new_unique_name, "wb"):
+                pass
+            os.unlink(self.unique_name)
+        self.id, self.unique_name = new_id, new_unique_name
+
 
 class LockRoster:
     """
@@ -271,6 +281,25 @@ class LockRoster:
         roster[key] = list(list(e) for e in elements)
         self.save(roster)
 
+    def migrate_lock(self, key, old_id, new_id):
+        """migrate the lock ownership from old_id to new_id"""
+        assert self.id == old_id
+        # need to temporarily switch off stale lock killing as we want to
+        # rather migrate than kill them (at least the one made by old_id).
+        killing, self.kill_stale_locks = self.kill_stale_locks, False
+        try:
+            try:
+                self.modify(key, REMOVE)
+            except KeyError:
+                # entry was not there, so no need to add a new one, but still update our id
+                self.id = new_id
+            else:
+                # old entry removed, update our id and add a updated entry
+                self.id = new_id
+                self.modify(key, ADD)
+        finally:
+            self.kill_stale_locks = killing
+
 
 class Lock:
     """
@@ -373,3 +402,14 @@ class Lock:
     def break_lock(self):
         self._roster.remove()
         self._lock.break_lock()
+
+    def migrate_lock(self, old_id, new_id):
+        assert self.id == old_id
+        self.id = new_id
+        if self.is_exclusive:
+            self._lock.migrate_lock(old_id, new_id)
+            self._roster.migrate_lock(EXCLUSIVE, old_id, new_id)
+        else:
+            with self._lock:
+                self._lock.migrate_lock(old_id, new_id)
+                self._roster.migrate_lock(SHARED, old_id, new_id)

+ 1 - 1
src/borg/platform/base.py

@@ -15,7 +15,7 @@ platform API: that way platform APIs provided by the platform-specific support m
 are correctly composed into the base functionality.
 """
 
-API_VERSION = '1.1_01'
+API_VERSION = '1.1_02'
 
 fdatasync = getattr(os, 'fdatasync', os.fsync)
 

+ 1 - 1
src/borg/platform/darwin.pyx

@@ -4,7 +4,7 @@ from ..helpers import user2uid, group2gid
 from ..helpers import safe_decode, safe_encode
 from .posix import swidth
 
-API_VERSION = '1.1_01'
+API_VERSION = '1.1_02'
 
 cdef extern from "sys/acl.h":
     ctypedef struct _acl_t:

+ 1 - 1
src/borg/platform/freebsd.pyx

@@ -4,7 +4,7 @@ from ..helpers import posix_acl_use_stored_uid_gid
 from ..helpers import safe_encode, safe_decode
 from .posix import swidth
 
-API_VERSION = '1.1_01'
+API_VERSION = '1.1_02'
 
 cdef extern from "errno.h":
     int errno

+ 1 - 1
src/borg/platform/linux.pyx

@@ -13,7 +13,7 @@ from .posix import swidth
 from libc cimport errno
 from libc.stdint cimport int64_t
 
-API_VERSION = '1.1_01'
+API_VERSION = '1.1_02'
 
 cdef extern from "sys/types.h":
     int ACL_TYPE_ACCESS

+ 5 - 7
src/borg/platform/posix.pyx

@@ -18,23 +18,21 @@ def swidth(s):
         return str_len
 
 
-# only determine the PID and hostname once.
-# for FUSE mounts, we fork a child process that needs to release
-# the lock made by the parent, so it needs to use the same PID for that.
-_pid = os.getpid()
+# for performance reasons, only determine the hostname once.
 # XXX this sometimes requires live internet access for issuing a DNS query in the background.
 _hostname = '%s@%s' % (socket.getfqdn(), uuid.getnode())
 
 
 def get_process_id():
     """
-    Return identification tuple (hostname, pid, thread_id) for 'us'. If this is a FUSE process, then the PID will be
-    that of the parent, not the forked FUSE child.
+    Return identification tuple (hostname, pid, thread_id) for 'us'.
+    This always returns the current pid, which might be different from before, e.g. if daemonize() was used.
 
     Note: Currently thread_id is *always* zero.
     """
     thread_id = 0
-    return _hostname, _pid, thread_id
+    pid = os.getpid()
+    return _hostname, pid, thread_id
 
 
 def process_alive(host, pid, thread):

+ 5 - 0
src/borg/repository.py

@@ -346,6 +346,11 @@ class Repository:
     def break_lock(self):
         Lock(os.path.join(self.path, 'lock')).break_lock()
 
+    def migrate_lock(self, old_id, new_id):
+        # note: only needed for local repos
+        if self.lock is not None:
+            self.lock.migrate_lock(old_id, new_id)
+
     def open(self, path, exclusive, lock_wait=None, lock=True):
         self.path = path
         if not os.path.isdir(path):

+ 41 - 0
src/borg/testsuite/locking.py

@@ -3,6 +3,7 @@ import time
 
 import pytest
 
+from ..helpers import daemonize
 from ..platform import get_process_id, process_alive
 from ..locking import TimeoutTimer, ExclusiveLock, Lock, LockRoster, \
                       ADD, REMOVE, SHARED, EXCLUSIVE, LockTimeout, NotLocked, NotMyLock
@@ -76,6 +77,19 @@ class TestExclusiveLock:
             with pytest.raises(LockTimeout):
                 ExclusiveLock(lockpath, id=our_id, kill_stale_locks=True, timeout=0.1).acquire()
 
+    def test_migrate_lock(self, lockpath):
+        old_id, new_id = ID1, ID2
+        assert old_id[1] != new_id[1]  # different PIDs (like when doing daemonize())
+        lock = ExclusiveLock(lockpath, id=old_id).acquire()
+        assert lock.id == old_id  # lock is for old id / PID
+        old_unique_name = lock.unique_name
+        assert lock.by_me()  # we have the lock
+        lock.migrate_lock(old_id, new_id)  # fix the lock
+        assert lock.id == new_id  # lock corresponds to the new id / PID
+        new_unique_name = lock.unique_name
+        assert lock.by_me()  # we still have the lock
+        assert old_unique_name != new_unique_name  # locking filename is different now
+
 
 class TestLock:
     def test_shared(self, lockpath):
@@ -155,6 +169,22 @@ class TestLock:
             with pytest.raises(LockTimeout):
                 Lock(lockpath, id=our_id, kill_stale_locks=True, timeout=0.1).acquire()
 
+    def test_migrate_lock(self, lockpath):
+        old_id, new_id = ID1, ID2
+        assert old_id[1] != new_id[1]  # different PIDs (like when doing daemonize())
+
+        lock = Lock(lockpath, id=old_id, exclusive=True).acquire()
+        assert lock.id == old_id
+        lock.migrate_lock(old_id, new_id)  # fix the lock
+        assert lock.id == new_id
+        lock.release()
+
+        lock = Lock(lockpath, id=old_id, exclusive=False).acquire()
+        assert lock.id == old_id
+        lock.migrate_lock(old_id, new_id)  # fix the lock
+        assert lock.id == new_id
+        lock.release()
+
 
 @pytest.fixture()
 def rosterpath(tmpdir):
@@ -207,3 +237,14 @@ class TestLockRoster:
         other_killer_roster = LockRoster(rosterpath, kill_stale_locks=True)
         # Did not kill us, since we're alive
         assert other_killer_roster.get(SHARED) == {our_id, cant_know_if_dead_id}
+
+    def test_migrate_lock(self, rosterpath):
+        old_id, new_id = ID1, ID2
+        assert old_id[1] != new_id[1]  # different PIDs (like when doing daemonize())
+        roster = LockRoster(rosterpath, id=old_id)
+        assert roster.id == old_id
+        roster.modify(SHARED, ADD)
+        assert roster.get(SHARED) == {old_id}
+        roster.migrate_lock(SHARED, old_id, new_id)  # fix the lock
+        assert roster.id == new_id
+        assert roster.get(SHARED) == {new_id}