2
0
Эх сурвалжийг харах

Merge pull request #4954 from alfredo08154711/issue-lockmigr

Fixed lock migration, issue #4953
TW 4 жил өмнө
parent
commit
9b8ba5f6f7

+ 8 - 5
src/borg/fuse.py

@@ -18,7 +18,7 @@ from .crypto.low_level import blake2b_128
 from .archiver import Archiver
 from .archive import Archive
 from .hashindex import FuseVersionsIndex
-from .helpers import daemonize, hardlinkable, signal_handler, format_file_size
+from .helpers import daemonize, daemonizing, hardlinkable, signal_handler, format_file_size
 from .helpers import msgpack
 from .item import Item
 from .lrucache import LRUCache
@@ -510,10 +510,13 @@ class FuseOperations(llfuse.Operations, FuseBackend):
         self._create_filesystem()
         llfuse.init(self, mountpoint, options)
         if not foreground:
-            old_id, new_id = daemonize()
-            if not isinstance(self.repository_uncached, RemoteRepository):
-                # local repo and the locking process' PID just changed, migrate it:
-                self.repository_uncached.migrate_lock(old_id, new_id)
+            if isinstance(self.repository_uncached, RemoteRepository):
+                daemonize()
+            else:
+                with daemonizing() as (old_id, new_id):
+                    # local repo: the locking process' PID is changing, migrate it:
+                    logger.debug('fuse: mount local repo, going to background: migrating lock.')
+                    self.repository_uncached.migrate_lock(old_id, new_id)
 
         # If the file system crashes, we do not want to umount because in that
         # case the mountpoint suddenly appears to become empty. This can have

+ 104 - 8
src/borg/helpers/process.py

@@ -6,6 +6,8 @@ import shlex
 import signal
 import subprocess
 import sys
+import time
+import traceback
 
 from .. import __version__
 
@@ -13,17 +15,23 @@ from ..platformflags import is_win32, is_linux, is_freebsd, is_darwin
 from ..logger import create_logger
 logger = create_logger()
 
+from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_SIGNAL_BASE
 
-def daemonize():
-    """Detach process from controlling terminal and run in background
 
-    Returns: old and new get_process_id tuples
-    """
+@contextlib.contextmanager
+def _daemonize():
     from ..platform import get_process_id
     old_id = get_process_id()
     pid = os.fork()
     if pid:
-        os._exit(0)
+        exit_code = EXIT_SUCCESS
+        try:
+            yield old_id, None
+        except _ExitCodeException as e:
+            exit_code = e.exit_code
+        finally:
+            logger.debug('Daemonizing: Foreground process (%s, %s, %s) is now dying.' % old_id)
+            os._exit(exit_code)
     os.setsid()
     pid = os.fork()
     if pid:
@@ -31,13 +39,101 @@ def daemonize():
     os.chdir('/')
     os.close(0)
     os.close(1)
-    os.close(2)
     fd = os.open(os.devnull, os.O_RDWR)
     os.dup2(fd, 0)
     os.dup2(fd, 1)
-    os.dup2(fd, 2)
     new_id = get_process_id()
-    return old_id, new_id
+    try:
+        yield old_id, new_id
+    finally:
+        # Close / redirect stderr to /dev/null only now
+        # for the case that we want to log something before yield returns.
+        os.close(2)
+        os.dup2(fd, 2)
+
+
+def daemonize():
+    """Detach process from controlling terminal and run in background
+
+    Returns: old and new get_process_id tuples
+    """
+    with _daemonize() as (old_id, new_id):
+        return old_id, new_id
+
+
+@contextlib.contextmanager
+def daemonizing(*, timeout=5):
+    """Like daemonize(), but as context manager.
+
+    The with-body is executed in the background process,
+    while the foreground process survives until the body is left
+    or the given timeout is exceeded. In the latter case a warning is
+    reported by the foreground.
+    Context variable is (old_id, new_id) get_process_id tuples.
+    An exception raised in the body is reported by the foreground
+    as a warning as well as propagated outside the body in the background.
+    In case of a warning, the foreground exits with exit code EXIT_WARNING
+    instead of EXIT_SUCCESS.
+    """
+    with _daemonize() as (old_id, new_id):
+        if new_id is None:
+            # The original / parent process, waiting for a signal to die.
+            logger.debug('Daemonizing: Foreground process (%s, %s, %s) is waiting for background process...' % old_id)
+            exit_code = EXIT_SUCCESS
+            # Indeed, SIGHUP and SIGTERM handlers should have been set on archiver.run(). Just in case...
+            with signal_handler('SIGINT', raising_signal_handler(KeyboardInterrupt)), \
+                 signal_handler('SIGHUP', raising_signal_handler(SigHup)), \
+                 signal_handler('SIGTERM', raising_signal_handler(SigTerm)):
+                try:
+                    if timeout > 0:
+                        time.sleep(timeout)
+                except SigTerm:
+                    # Normal termination; expected from grandchild, see 'os.kill()' below
+                    pass
+                except SigHup:
+                    # Background wants to indicate a problem; see 'os.kill()' below,
+                    # log message will come from grandchild.
+                    exit_code = EXIT_WARNING
+                except KeyboardInterrupt:
+                    # Manual termination.
+                    logger.debug('Daemonizing: Foreground process (%s, %s, %s) received SIGINT.' % old_id)
+                    exit_code = EXIT_SIGNAL_BASE + 2
+                except BaseException as e:
+                    # Just in case...
+                    logger.warning('Daemonizing: Foreground process received an exception while waiting:\n' +
+                                   ''.join(traceback.format_exception(e.__class__, e, e.__traceback__)))
+                    exit_code = EXIT_WARNING
+                else:
+                    logger.warning('Daemonizing: Background process did not respond (timeout). Is it alive?')
+                    exit_code = EXIT_WARNING
+                finally:
+                    # Don't call with-body, but die immediately!
+                    # return would be sufficient, but we want to pass the exit code.
+                    raise _ExitCodeException(exit_code)
+
+        # The background / grandchild process.
+        sig_to_foreground = signal.SIGTERM
+        logger.debug('Daemonizing: Background process (%s, %s, %s) is starting...' % new_id)
+        try:
+            yield old_id, new_id
+        except BaseException as e:
+            sig_to_foreground = signal.SIGHUP
+            logger.warning('Daemonizing: Background process raised an exception while starting:\n' +
+                           ''.join(traceback.format_exception(e.__class__, e, e.__traceback__)))
+            raise e
+        else:
+            logger.debug('Daemonizing: Background process (%s, %s, %s) has started.' % new_id)
+        finally:
+            try:
+                os.kill(old_id[1], sig_to_foreground)
+            except BaseException as e:
+                logger.error('Daemonizing: Trying to kill the foreground process raised an exception:\n' +
+                             ''.join(traceback.format_exception(e.__class__, e, e.__traceback__)))
+
+
+class _ExitCodeException(BaseException):
+    def __init__(self, exit_code):
+        self.exit_code = exit_code
 
 
 class SignalException(BaseException):

+ 45 - 20
src/borg/testsuite/__init__.py

@@ -238,30 +238,55 @@ class BaseTestCase(unittest.TestCase):
             self._assert_dirs_equal_cmp(sub_diff, ignore_flags=ignore_flags, ignore_xattrs=ignore_xattrs, ignore_ns=ignore_ns)
 
     @contextmanager
-    def fuse_mount(self, location, mountpoint=None, *options, **kwargs):
+    def fuse_mount(self, location, mountpoint=None, *options, fork=True, os_fork=False, **kwargs):
+        # For a successful mount, `fork = True` is required for
+        # the borg mount daemon to work properly or the tests
+        # will just freeze. Therefore, if argument `fork` is not
+        # specified, the default value is `True`, regardless of
+        # `FORK_DEFAULT`. However, leaving the possibilty to run
+        # the command with `fork = False` is still necessary for
+        # testing for mount failures, for example attempting to
+        # mount a read-only repo.
+        #    `os_fork = True` is needed for testing (the absence of)
+        # a race condition of the Lock during lock migration when
+        # borg mount (local repo) is daemonizing (#4953). This is another
+        # example where we need `fork = False`, because the test case
+        # needs an OS fork, not a spawning of the fuse mount.
+        # `fork = False` is implied if `os_fork = True`.
         if mountpoint is None:
             mountpoint = tempfile.mkdtemp()
         else:
             os.mkdir(mountpoint)
-        if 'fork' not in kwargs:
-            # For a successful mount, `fork = True` is required for
-            # the borg mount daemon to work properly or the tests
-            # will just freeze. Therefore, if argument `fork` is not
-            # specified, the default value is `True`, regardless of
-            # `FORK_DEFAULT`. However, leaving the possibilty to run
-            # the command with `fork = False` is still necessary for
-            # testing for mount failures, for example attempting to
-            # mount a read-only repo.
-            kwargs['fork'] = True
-        self.cmd('mount', location, mountpoint, *options, **kwargs)
-        if kwargs.get('exit_code', EXIT_SUCCESS) == EXIT_ERROR:
-            # If argument `exit_code = EXIT_ERROR`, then this call
-            # is testing the behavior of an unsuccessful mount and
-            # we must not continue, as there is no mount to work
-            # with. The test itself has already failed or succeeded
-            # with the call to `self.cmd`, above.
-            yield
-            return
+        args = ['mount', location, mountpoint] + list(options)
+        if os_fork:
+            # Do not spawn, but actually (OS) fork.
+            if os.fork() == 0:
+                # The child process.
+                # Decouple from parent and fork again.
+                # Otherwise, it becomes a zombie and pretends to be alive.
+                os.setsid()
+                if os.fork() > 0:
+                    os._exit(0)
+                # The grandchild process.
+                try:
+                    self.cmd(*args, fork=False, **kwargs)  # borg mount not spawning.
+                finally:
+                    # This should never be reached, since it daemonizes,
+                    # and the grandchild process exits before cmd() returns.
+                    # However, just in case...
+                    print('Fatal: borg mount did not daemonize properly. Force exiting.',
+                          file=sys.stderr, flush=True)
+                    os._exit(0)
+        else:
+            self.cmd(*args, fork=fork, **kwargs)
+            if kwargs.get('exit_code', EXIT_SUCCESS) == EXIT_ERROR:
+                # If argument `exit_code = EXIT_ERROR`, then this call
+                # is testing the behavior of an unsuccessful mount and
+                # we must not continue, as there is no mount to work
+                # with. The test itself has already failed or succeeded
+                # with the call to `self.cmd`, above.
+                yield
+                return
         self.wait_for_mountstate(mountpoint, mounted=True)
         yield
         umount(mountpoint)

+ 92 - 0
src/borg/testsuite/archiver.py

@@ -2503,6 +2503,94 @@ class ArchiverTestCase(ArchiverTestCaseBase):
         with self.fuse_mount(self.repository_location, mountpoint, '--prefix=nope'):
             assert sorted(os.listdir(os.path.join(mountpoint))) == []
 
+    @unittest.skipUnless(has_llfuse, 'llfuse not installed')
+    def test_migrate_lock_alive(self):
+        """Both old_id and new_id must not be stale during lock migration / daemonization."""
+        from functools import wraps
+        import pickle
+        import traceback
+
+        # Check results are communicated from the borg mount background process
+        # to the pytest process by means of a serialized dict object stored in this file.
+        assert_data_file = os.path.join(self.tmpdir, 'migrate_lock_assert_data.pickle')
+
+        # Decorates Lock.migrate_lock() with process_alive() checks before and after.
+        # (We don't want to mix testing code into runtime.)
+        def write_assert_data(migrate_lock):
+            @wraps(migrate_lock)
+            def wrapper(self, old_id, new_id):
+                wrapper.num_calls += 1
+                assert_data = {
+                    'num_calls': wrapper.num_calls,
+                    'old_id': old_id,
+                    'new_id': new_id,
+                    'before': {
+                        'old_id_alive': platform.process_alive(*old_id),
+                        'new_id_alive': platform.process_alive(*new_id)},
+                    'exception': None,
+                    'exception.extr_tb': None,
+                    'after': {
+                        'old_id_alive': None,
+                        'new_id_alive': None}}
+                try:
+                    with open(assert_data_file, 'wb') as _out:
+                        pickle.dump(assert_data, _out)
+                except:
+                    pass
+                try:
+                    return migrate_lock(self, old_id, new_id)
+                except BaseException as e:
+                    assert_data['exception'] = e
+                    assert_data['exception.extr_tb'] = traceback.extract_tb(e.__traceback__)
+                finally:
+                    assert_data['after'].update({
+                        'old_id_alive': platform.process_alive(*old_id),
+                        'new_id_alive': platform.process_alive(*new_id)})
+                    try:
+                        with open(assert_data_file, 'wb') as _out:
+                            pickle.dump(assert_data, _out)
+                    except:
+                        pass
+            wrapper.num_calls = 0
+            return wrapper
+
+        # Decorate
+        borg.locking.Lock.migrate_lock = write_assert_data(borg.locking.Lock.migrate_lock)
+        try:
+            self.cmd('init', '--encryption=none', self.repository_location)
+            self.create_src_archive('arch')
+            mountpoint = os.path.join(self.tmpdir, 'mountpoint')
+            # In order that the decoration is kept for the borg mount process, we must not spawn, but actually fork;
+            # not to be confused with the forking in borg.helpers.daemonize() which is done as well.
+            with self.fuse_mount(self.repository_location, mountpoint, os_fork=True):
+                pass
+            with open(assert_data_file, 'rb') as _in:
+                assert_data = pickle.load(_in)
+            print('\nLock.migrate_lock(): assert_data = %r.' % (assert_data, ), file=sys.stderr, flush=True)
+            exception = assert_data['exception']
+            if exception is not None:
+                extracted_tb = assert_data['exception.extr_tb']
+                print(
+                    'Lock.migrate_lock() raised an exception:\n',
+                    'Traceback (most recent call last):\n',
+                    *traceback.format_list(extracted_tb),
+                    *traceback.format_exception(exception.__class__, exception, None),
+                    sep='', end='', file=sys.stderr, flush=True)
+
+            assert assert_data['num_calls'] == 1, "Lock.migrate_lock() must be called exactly once."
+            assert exception is None, "Lock.migrate_lock() may not raise an exception."
+
+            assert_data_before = assert_data['before']
+            assert assert_data_before['old_id_alive'], "old_id must be alive (=must not be stale) when calling Lock.migrate_lock()."
+            assert assert_data_before['new_id_alive'], "new_id must be alive (=must not be stale) when calling Lock.migrate_lock()."
+
+            assert_data_after = assert_data['after']
+            assert assert_data_after['old_id_alive'], "old_id must be alive (=must not be stale) when Lock.migrate_lock() has returned."
+            assert assert_data_after['new_id_alive'], "new_id must be alive (=must not be stale) when Lock.migrate_lock() has returned."
+        finally:
+            # Undecorate
+            borg.locking.Lock.migrate_lock = borg.locking.Lock.migrate_lock.__wrapped__
+
     def verify_aes_counter_uniqueness(self, method):
         seen = set()  # Chunks already seen
         used = set()  # counter values already used
@@ -3570,6 +3658,10 @@ class RemoteArchiverTestCase(ArchiverTestCase):
     def test_config(self):
         pass
 
+    @unittest.skip('only works locally')
+    def test_migrate_lock_alive(self):
+        pass
+
     def test_strip_components_doesnt_leak(self):
         self.cmd('init', '--encryption=repokey', self.repository_location)
         self.create_regular_file('dir/file', contents=b"test file contents 1")