瀏覽代碼

fix race condition in lock migration, fixes #4953

- add a daemonizing() ctx manager

The foreground borg mount process (local repo) survives until the lock
migration (performed by the background) is finished, so the lock to be
migrated is never stale, and the race condition is gone.

- add a test case revealing that locking is not safe during daemonization (borg mount)

- amend printing in testsuite.archiver
Thomas Portmann 5 年之前
父節點
當前提交
ef9fdcf992
共有 4 個文件被更改,包括 249 次插入33 次删除
  1. 8 5
      src/borg/fuse.py
  2. 104 8
      src/borg/helpers/process.py
  3. 45 20
      src/borg/testsuite/__init__.py
  4. 92 0
      src/borg/testsuite/archiver.py

+ 8 - 5
src/borg/fuse.py

@@ -18,7 +18,7 @@ from .crypto.low_level import blake2b_128
 from .archiver import Archiver
 from .archive import Archive
 from .hashindex import FuseVersionsIndex
-from .helpers import daemonize, hardlinkable, signal_handler, format_file_size
+from .helpers import daemonize, daemonizing, hardlinkable, signal_handler, format_file_size
 from .helpers import msgpack
 from .item import Item
 from .lrucache import LRUCache
@@ -510,10 +510,13 @@ class FuseOperations(llfuse.Operations, FuseBackend):
         self._create_filesystem()
         llfuse.init(self, mountpoint, options)
         if not foreground:
-            old_id, new_id = daemonize()
-            if not isinstance(self.repository_uncached, RemoteRepository):
-                # local repo and the locking process' PID just changed, migrate it:
-                self.repository_uncached.migrate_lock(old_id, new_id)
+            if isinstance(self.repository_uncached, RemoteRepository):
+                daemonize()
+            else:
+                with daemonizing() as (old_id, new_id):
+                    # local repo: the locking process' PID is changing, migrate it:
+                    logger.debug('fuse: mount local repo, going to background: migrating lock.')
+                    self.repository_uncached.migrate_lock(old_id, new_id)
 
         # If the file system crashes, we do not want to umount because in that
         # case the mountpoint suddenly appears to become empty. This can have

+ 104 - 8
src/borg/helpers/process.py

@@ -6,6 +6,8 @@ import shlex
 import signal
 import subprocess
 import sys
+import time
+import traceback
 
 from .. import __version__
 
@@ -13,17 +15,23 @@ from ..platformflags import is_win32, is_linux, is_freebsd, is_darwin
 from ..logger import create_logger
 logger = create_logger()
 
+from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_SIGNAL_BASE
 
-def daemonize():
-    """Detach process from controlling terminal and run in background
 
-    Returns: old and new get_process_id tuples
-    """
+@contextlib.contextmanager
+def _daemonize():
     from ..platform import get_process_id
     old_id = get_process_id()
     pid = os.fork()
     if pid:
-        os._exit(0)
+        exit_code = EXIT_SUCCESS
+        try:
+            yield old_id, None
+        except _ExitCodeException as e:
+            exit_code = e.exit_code
+        finally:
+            logger.debug('Daemonizing: Foreground process (%s, %s, %s) is now dying.' % old_id)
+            os._exit(exit_code)
     os.setsid()
     pid = os.fork()
     if pid:
@@ -31,13 +39,101 @@ def daemonize():
     os.chdir('/')
     os.close(0)
     os.close(1)
-    os.close(2)
     fd = os.open(os.devnull, os.O_RDWR)
     os.dup2(fd, 0)
     os.dup2(fd, 1)
-    os.dup2(fd, 2)
     new_id = get_process_id()
-    return old_id, new_id
+    try:
+        yield old_id, new_id
+    finally:
+        # Close / redirect stderr to /dev/null only now
+        # for the case that we want to log something before yield returns.
+        os.close(2)
+        os.dup2(fd, 2)
+
+
+def daemonize():
+    """Detach process from controlling terminal and run in background
+
+    Returns: old and new get_process_id tuples
+    """
+    with _daemonize() as (old_id, new_id):
+        return old_id, new_id
+
+
+@contextlib.contextmanager
+def daemonizing(*, timeout=5):
+    """Like daemonize(), but as context manager.
+
+    The with-body is executed in the background process,
+    while the foreground process survives until the body is left
+    or the given timeout is exceeded. In the latter case a warning is
+    reported by the foreground.
+    Context variable is (old_id, new_id) get_process_id tuples.
+    An exception raised in the body is reported by the foreground
+    as a warning as well as propagated outside the body in the background.
+    In case of a warning, the foreground exits with exit code EXIT_WARNING
+    instead of EXIT_SUCCESS.
+    """
+    with _daemonize() as (old_id, new_id):
+        if new_id is None:
+            # The original / parent process, waiting for a signal to die.
+            logger.debug('Daemonizing: Foreground process (%s, %s, %s) is waiting for background process...' % old_id)
+            exit_code = EXIT_SUCCESS
+            # Indeed, SIGHUP and SIGTERM handlers should have been set on archiver.run(). Just in case...
+            with signal_handler('SIGINT', raising_signal_handler(KeyboardInterrupt)), \
+                 signal_handler('SIGHUP', raising_signal_handler(SigHup)), \
+                 signal_handler('SIGTERM', raising_signal_handler(SigTerm)):
+                try:
+                    if timeout > 0:
+                        time.sleep(timeout)
+                except SigTerm:
+                    # Normal termination; expected from grandchild, see 'os.kill()' below
+                    pass
+                except SigHup:
+                    # Background wants to indicate a problem; see 'os.kill()' below,
+                    # log message will come from grandchild.
+                    exit_code = EXIT_WARNING
+                except KeyboardInterrupt:
+                    # Manual termination.
+                    logger.debug('Daemonizing: Foreground process (%s, %s, %s) received SIGINT.' % old_id)
+                    exit_code = EXIT_SIGNAL_BASE + 2
+                except BaseException as e:
+                    # Just in case...
+                    logger.warning('Daemonizing: Foreground process received an exception while waiting:\n' +
+                                   ''.join(traceback.format_exception(e.__class__, e, e.__traceback__)))
+                    exit_code = EXIT_WARNING
+                else:
+                    logger.warning('Daemonizing: Background process did not respond (timeout). Is it alive?')
+                    exit_code = EXIT_WARNING
+                finally:
+                    # Don't call with-body, but die immediately!
+                    # return would be sufficient, but we want to pass the exit code.
+                    raise _ExitCodeException(exit_code)
+
+        # The background / grandchild process.
+        sig_to_foreground = signal.SIGTERM
+        logger.debug('Daemonizing: Background process (%s, %s, %s) is starting...' % new_id)
+        try:
+            yield old_id, new_id
+        except BaseException as e:
+            sig_to_foreground = signal.SIGHUP
+            logger.warning('Daemonizing: Background process raised an exception while starting:\n' +
+                           ''.join(traceback.format_exception(e.__class__, e, e.__traceback__)))
+            raise e
+        else:
+            logger.debug('Daemonizing: Background process (%s, %s, %s) has started.' % new_id)
+        finally:
+            try:
+                os.kill(old_id[1], sig_to_foreground)
+            except BaseException as e:
+                logger.error('Daemonizing: Trying to kill the foreground process raised an exception:\n' +
+                             ''.join(traceback.format_exception(e.__class__, e, e.__traceback__)))
+
+
+class _ExitCodeException(BaseException):
+    def __init__(self, exit_code):
+        self.exit_code = exit_code
 
 
 class SignalException(BaseException):

+ 45 - 20
src/borg/testsuite/__init__.py

@@ -238,30 +238,55 @@ class BaseTestCase(unittest.TestCase):
             self._assert_dirs_equal_cmp(sub_diff, ignore_flags=ignore_flags, ignore_xattrs=ignore_xattrs, ignore_ns=ignore_ns)
 
     @contextmanager
-    def fuse_mount(self, location, mountpoint=None, *options, **kwargs):
+    def fuse_mount(self, location, mountpoint=None, *options, fork=True, os_fork=False, **kwargs):
+        # For a successful mount, `fork = True` is required for
+        # the borg mount daemon to work properly or the tests
+        # will just freeze. Therefore, if argument `fork` is not
+        # specified, the default value is `True`, regardless of
+        # `FORK_DEFAULT`. However, leaving the possibilty to run
+        # the command with `fork = False` is still necessary for
+        # testing for mount failures, for example attempting to
+        # mount a read-only repo.
+        #    `os_fork = True` is needed for testing (the absence of)
+        # a race condition of the Lock during lock migration when
+        # borg mount (local repo) is daemonizing (#4953). This is another
+        # example where we need `fork = False`, because the test case
+        # needs an OS fork, not a spawning of the fuse mount.
+        # `fork = False` is implied if `os_fork = True`.
         if mountpoint is None:
             mountpoint = tempfile.mkdtemp()
         else:
             os.mkdir(mountpoint)
-        if 'fork' not in kwargs:
-            # For a successful mount, `fork = True` is required for
-            # the borg mount daemon to work properly or the tests
-            # will just freeze. Therefore, if argument `fork` is not
-            # specified, the default value is `True`, regardless of
-            # `FORK_DEFAULT`. However, leaving the possibilty to run
-            # the command with `fork = False` is still necessary for
-            # testing for mount failures, for example attempting to
-            # mount a read-only repo.
-            kwargs['fork'] = True
-        self.cmd('mount', location, mountpoint, *options, **kwargs)
-        if kwargs.get('exit_code', EXIT_SUCCESS) == EXIT_ERROR:
-            # If argument `exit_code = EXIT_ERROR`, then this call
-            # is testing the behavior of an unsuccessful mount and
-            # we must not continue, as there is no mount to work
-            # with. The test itself has already failed or succeeded
-            # with the call to `self.cmd`, above.
-            yield
-            return
+        args = ['mount', location, mountpoint] + list(options)
+        if os_fork:
+            # Do not spawn, but actually (OS) fork.
+            if os.fork() == 0:
+                # The child process.
+                # Decouple from parent and fork again.
+                # Otherwise, it becomes a zombie and pretends to be alive.
+                os.setsid()
+                if os.fork() > 0:
+                    os._exit(0)
+                # The grandchild process.
+                try:
+                    self.cmd(*args, fork=False, **kwargs)  # borg mount not spawning.
+                finally:
+                    # This should never be reached, since it daemonizes,
+                    # and the grandchild process exits before cmd() returns.
+                    # However, just in case...
+                    print('Fatal: borg mount did not daemonize properly. Force exiting.',
+                          file=sys.stderr, flush=True)
+                    os._exit(0)
+        else:
+            self.cmd(*args, fork=fork, **kwargs)
+            if kwargs.get('exit_code', EXIT_SUCCESS) == EXIT_ERROR:
+                # If argument `exit_code = EXIT_ERROR`, then this call
+                # is testing the behavior of an unsuccessful mount and
+                # we must not continue, as there is no mount to work
+                # with. The test itself has already failed or succeeded
+                # with the call to `self.cmd`, above.
+                yield
+                return
         self.wait_for_mountstate(mountpoint, mounted=True)
         yield
         umount(mountpoint)

+ 92 - 0
src/borg/testsuite/archiver.py

@@ -2503,6 +2503,94 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         with self.fuse_mount(self.repository_location, mountpoint, '--prefix=nope'):
             assert sorted(os.listdir(os.path.join(mountpoint))) == []
 
+    @unittest.skipUnless(has_llfuse, 'llfuse not installed')
+    def test_migrate_lock_alive(self):
+        """Both old_id and new_id must not be stale during lock migration / daemonization."""
+        from functools import wraps
+        import pickle
+        import traceback
+
+        # Check results are communicated from the borg mount background process
+        # to the pytest process by means of a serialized dict object stored in this file.
+        assert_data_file = os.path.join(self.tmpdir, 'migrate_lock_assert_data.pickle')
+
+        # Decorates Lock.migrate_lock() with process_alive() checks before and after.
+        # (We don't want to mix testing code into runtime.)
+        def write_assert_data(migrate_lock):
+            @wraps(migrate_lock)
+            def wrapper(self, old_id, new_id):
+                wrapper.num_calls += 1
+                assert_data = {
+                    'num_calls': wrapper.num_calls,
+                    'old_id': old_id,
+                    'new_id': new_id,
+                    'before': {
+                        'old_id_alive': platform.process_alive(*old_id),
+                        'new_id_alive': platform.process_alive(*new_id)},
+                    'exception': None,
+                    'exception.extr_tb': None,
+                    'after': {
+                        'old_id_alive': None,
+                        'new_id_alive': None}}
+                try:
+                    with open(assert_data_file, 'wb') as _out:
+                        pickle.dump(assert_data, _out)
+                except:
+                    pass
+                try:
+                    return migrate_lock(self, old_id, new_id)
+                except BaseException as e:
+                    assert_data['exception'] = e
+                    assert_data['exception.extr_tb'] = traceback.extract_tb(e.__traceback__)
+                finally:
+                    assert_data['after'].update({
+                        'old_id_alive': platform.process_alive(*old_id),
+                        'new_id_alive': platform.process_alive(*new_id)})
+                    try:
+                        with open(assert_data_file, 'wb') as _out:
+                            pickle.dump(assert_data, _out)
+                    except:
+                        pass
+            wrapper.num_calls = 0
+            return wrapper
+
+        # Decorate
+        borg.locking.Lock.migrate_lock = write_assert_data(borg.locking.Lock.migrate_lock)
+        try:
+            self.cmd('init', '--encryption=none', self.repository_location)
+            self.create_src_archive('arch')
+            mountpoint = os.path.join(self.tmpdir, 'mountpoint')
+            # In order that the decoration is kept for the borg mount process, we must not spawn, but actually fork;
+            # not to be confused with the forking in borg.helpers.daemonize() which is done as well.
+            with self.fuse_mount(self.repository_location, mountpoint, os_fork=True):
+                pass
+            with open(assert_data_file, 'rb') as _in:
+                assert_data = pickle.load(_in)
+            print('\nLock.migrate_lock(): assert_data = %r.' % (assert_data, ), file=sys.stderr, flush=True)
+            exception = assert_data['exception']
+            if exception is not None:
+                extracted_tb = assert_data['exception.extr_tb']
+                print(
+                    'Lock.migrate_lock() raised an exception:\n',
+                    'Traceback (most recent call last):\n',
+                    *traceback.format_list(extracted_tb),
+                    *traceback.format_exception(exception.__class__, exception, None),
+                    sep='', end='', file=sys.stderr, flush=True)
+
+            assert assert_data['num_calls'] == 1, "Lock.migrate_lock() must be called exactly once."
+            assert exception is None, "Lock.migrate_lock() may not raise an exception."
+
+            assert_data_before = assert_data['before']
+            assert assert_data_before['old_id_alive'], "old_id must be alive (=must not be stale) when calling Lock.migrate_lock()."
+            assert assert_data_before['new_id_alive'], "new_id must be alive (=must not be stale) when calling Lock.migrate_lock()."
+
+            assert_data_after = assert_data['after']
+            assert assert_data_after['old_id_alive'], "old_id must be alive (=must not be stale) when Lock.migrate_lock() has returned."
+            assert assert_data_after['new_id_alive'], "new_id must be alive (=must not be stale) when Lock.migrate_lock() has returned."
+        finally:
+            # Undecorate
+            borg.locking.Lock.migrate_lock = borg.locking.Lock.migrate_lock.__wrapped__
+
     def verify_aes_counter_uniqueness(self, method):
         seen = set()  # Chunks already seen
         used = set()  # counter values already used
@@ -3570,6 +3658,10 @@ class RemoteArchiverTestCase(ArchiverTestCase):
     def test_config(self):
         pass
 
+    @unittest.skip('only works locally')
+    def test_migrate_lock_alive(self):
+        pass
+
     def test_strip_components_doesnt_leak(self):
         self.cmd('init', '--encryption=repokey', self.repository_location)
         self.create_regular_file('dir/file', contents=b"test file contents 1")