Pārlūkot izejas kodu

Merge pull request #1674 from enkore/f/stale-lock-murderer

Automatically remove stale locks
enkore 8 gadi atpakaļ
vecāks
revīzija
cf44954c34

+ 4 - 1
src/borg/cache.py

@@ -75,6 +75,9 @@ class Cache:
         self.key = key
         self.manifest = manifest
         self.path = path or os.path.join(get_cache_dir(), repository.id_str)
+        self.hostname_is_unique = yes(env_var_override='BORG_HOSTNAME_IS_UNIQUE', prompt=False, env_msg=None)
+        if self.hostname_is_unique:
+            logger.info('Enabled removal of stale cache locks')
         self.do_files = do_files
         # Warn user before sending data to a never seen before unencrypted repository
         if not os.path.exists(self.path):
@@ -202,7 +205,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
     def open(self, lock_wait=None):
         if not os.path.isdir(self.path):
             raise Exception('%s Does not look like a Borg cache' % self.path)
-        self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire()
+        self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait, kill_stale_locks=self.hostname_is_unique).acquire()
         self.rollback()
 
     def close(self):

+ 4 - 2
src/borg/helpers.py

@@ -94,7 +94,7 @@ def check_extension_modules():
         raise ExtensionModuleError
     if crypto.API_VERSION != 3:
         raise ExtensionModuleError
-    if platform.API_VERSION != 3:
+    if platform.API_VERSION != platform.OS_API_VERSION != 4:
         raise ExtensionModuleError
 
 
@@ -1116,7 +1116,7 @@ DEFAULTISH = ('Default', 'DEFAULT', 'default', 'D', 'd', '', )
 def yes(msg=None, false_msg=None, true_msg=None, default_msg=None,
         retry_msg=None, invalid_msg=None, env_msg='{} (from {})',
         falsish=FALSISH, truish=TRUISH, defaultish=DEFAULTISH,
-        default=False, retry=True, env_var_override=None, ofile=None, input=input):
+        default=False, retry=True, env_var_override=None, ofile=None, input=input, prompt=True):
     """Output <msg> (usually a question) and let user input an answer.
     Qualifies the answer according to falsish, truish and defaultish as True, False or <default>.
     If it didn't qualify and retry is False (no retries wanted), return the default [which
@@ -1161,6 +1161,8 @@ def yes(msg=None, false_msg=None, true_msg=None, default_msg=None,
             if answer is not None and env_msg:
                 print(env_msg.format(answer, env_var_override), file=ofile)
         if answer is None:
+            if not prompt:
+                return default
             try:
                 answer = input()
             except EOFError:

+ 73 - 21
src/borg/locking.py

@@ -1,24 +1,15 @@
 import json
 import os
-import socket
 import time
 
+from . import platform
 from .helpers import Error, ErrorWithTraceback
+from .logger import create_logger
 
 ADD, REMOVE = 'add', 'remove'
 SHARED, EXCLUSIVE = 'shared', 'exclusive'
 
-# only determine the PID and hostname once.
-# for FUSE mounts, we fork a child process that needs to release
-# the lock made by the parent, so it needs to use the same PID for that.
-_pid = os.getpid()
-_hostname = socket.gethostname()
-
-
-def get_id():
-    """Get identification tuple for 'us'"""
-    thread_id = 0
-    return _hostname, _pid, thread_id
+logger = create_logger(__name__)
 
 
 class TimeoutTimer:
@@ -109,12 +100,14 @@ class ExclusiveLock:
     This makes sure the lock is released again if the block is left, no
     matter how (e.g. if an exception occurred).
     """
-    def __init__(self, path, timeout=None, sleep=None, id=None):
+    def __init__(self, path, timeout=None, sleep=None, id=None, kill_stale_locks=False):
         self.timeout = timeout
         self.sleep = sleep
         self.path = os.path.abspath(path)
-        self.id = id or get_id()
+        self.id = id or platform.get_process_id()
         self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
+        self.kill_stale_locks = kill_stale_locks
+        self.stale_warning_printed = False
 
     def __enter__(self):
         return self.acquire()
@@ -137,6 +130,7 @@ class ExclusiveLock:
             except FileExistsError:  # already locked
                 if self.by_me():
                     return self
+                self.kill_stale_lock()
                 if timer.timed_out_or_sleep():
                     raise LockTimeout(self.path)
             except OSError as err:
@@ -160,6 +154,48 @@ class ExclusiveLock:
     def by_me(self):
         return os.path.exists(self.unique_name)
 
+    def kill_stale_lock(self):
+        for name in os.listdir(self.path):
+            try:
+                host_pid, thread_str = name.rsplit('-', 1)
+                host, pid_str = host_pid.rsplit('.', 1)
+                pid = int(pid_str)
+                thread = int(thread_str)
+            except ValueError:
+                # Malformed lock name? Or just some new format we don't understand?
+                # It's safer to just exit.
+                return False
+
+            if platform.process_alive(host, pid, thread):
+                return False
+
+            if not self.kill_stale_locks:
+                if not self.stale_warning_printed:
+                    # Log this at warning level to hint the user at the ability
+                    logger.warning("Found stale lock %s, but not deleting because BORG_HOSTNAME_IS_UNIQUE is not set.", name)
+                    self.stale_warning_printed = True
+                return False
+
+            try:
+                os.unlink(os.path.join(self.path, name))
+                logger.warning('Killed stale lock %s.', name)
+            except OSError as err:
+                if not self.stale_warning_printed:
+                    # This error will bubble up and likely result in locking failure
+                    logger.error('Found stale lock %s, but cannot delete due to %s', name, str(err))
+                    self.stale_warning_printed = True
+                return False
+
+        try:
+            os.rmdir(self.path)
+        except OSError:
+            # Directory is not empty = we lost the race to somebody else
+            # Permission denied = we cannot operate anyway
+            # other error like EIO = we cannot operate and it's unsafe too.
+            return False
+
+        return True
+
     def break_lock(self):
         if self.is_locked():
             for name in os.listdir(self.path):
@@ -174,14 +210,30 @@ class LockRoster:
     Note: you usually should call the methods with an exclusive lock held,
     to avoid conflicting access by multiple threads/processes/machines.
     """
-    def __init__(self, path, id=None):
+    def __init__(self, path, id=None, kill_stale_locks=False):
         self.path = path
-        self.id = id or get_id()
+        self.id = id or platform.get_process_id()
+        self.kill_stale_locks = kill_stale_locks
 
     def load(self):
         try:
             with open(self.path) as f:
                 data = json.load(f)
+
+            # Just nuke the stale locks early on load
+            if self.kill_stale_locks:
+                for key in (SHARED, EXCLUSIVE):
+                    try:
+                        entries = data[key]
+                    except KeyError:
+                        continue
+                    elements = set()
+                    for host, pid, thread in entries:
+                        if platform.process_alive(host, pid, thread):
+                            elements.add((host, pid, thread))
+                        else:
+                            logger.warning('Removed stale %s roster lock for pid %d.', key, pid)
+                    data[key] = list(elements)
         except (FileNotFoundError, ValueError):
             # no or corrupt/empty roster file?
             data = {}
@@ -235,18 +287,18 @@ class Lock:
     This makes sure the lock is released again if the block is left, no
     matter how (e.g. if an exception occurred).
     """
-    def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None):
+    def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None, kill_stale_locks=False):
         self.path = path
         self.is_exclusive = exclusive
         self.sleep = sleep
         self.timeout = timeout
-        self.id = id or get_id()
+        self.id = id or platform.get_process_id()
         # globally keeping track of shared and exclusive lockers:
-        self._roster = LockRoster(path + '.roster', id=id)
+        self._roster = LockRoster(path + '.roster', id=id, kill_stale_locks=kill_stale_locks)
         # an exclusive lock, used for:
         # - holding while doing roster queries / updates
-        # - holding while the Lock instance itself is exclusive
-        self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout)
+        # - holding while the Lock itself is exclusive
+        self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout, kill_stale_locks=kill_stale_locks)
 
     def __enter__(self):
         return self.acquire()

+ 9 - 3
src/borg/platform/__init__.py

@@ -10,15 +10,21 @@ from .base import acl_get, acl_set
 from .base import set_flags, get_flags
 from .base import SaveFile, SyncFile, sync_dir, fdatasync
 from .base import swidth, API_VERSION
+from .posix import process_alive, get_process_id, local_pid_alive
 
+
+OS_API_VERSION = API_VERSION
 if sys.platform.startswith('linux'):  # pragma: linux only
+    from .linux import API_VERSION as OS_API_VERSION
     from .linux import acl_get, acl_set
     from .linux import set_flags, get_flags
     from .linux import SyncFile
-    from .linux import swidth, API_VERSION
+    from .linux import swidth
 elif sys.platform.startswith('freebsd'):  # pragma: freebsd only
+    from .freebsd import API_VERSION as OS_API_VERSION
     from .freebsd import acl_get, acl_set
-    from .freebsd import swidth, API_VERSION
+    from .freebsd import swidth
 elif sys.platform == 'darwin':  # pragma: darwin only
+    from .darwin import API_VERSION as OS_API_VERSION
     from .darwin import acl_get, acl_set
-    from .darwin import swidth, API_VERSION
+    from .darwin import swidth

+ 1 - 1
src/borg/platform/base.py

@@ -13,7 +13,7 @@ platform API: that way platform APIs provided by the platform-specific support m
 are correctly composed into the base functionality.
 """
 
-API_VERSION = 3
+API_VERSION = 4
 
 fdatasync = getattr(os, 'fdatasync', os.fsync)
 

+ 1 - 1
src/borg/platform/darwin.pyx

@@ -4,7 +4,7 @@ from ..helpers import user2uid, group2gid
 from ..helpers import safe_decode, safe_encode
 from .posix import swidth
 
-API_VERSION = 3
+API_VERSION = 4
 
 cdef extern from "sys/acl.h":
     ctypedef struct _acl_t:

+ 1 - 1
src/borg/platform/freebsd.pyx

@@ -4,7 +4,7 @@ from ..helpers import posix_acl_use_stored_uid_gid
 from ..helpers import safe_encode, safe_decode
 from .posix import swidth
 
-API_VERSION = 3
+API_VERSION = 4
 
 cdef extern from "errno.h":
     int errno

+ 1 - 1
src/borg/platform/linux.pyx

@@ -12,7 +12,7 @@ from .posix import swidth
 from libc cimport errno
 from libc.stdint cimport int64_t
 
-API_VERSION = 3
+API_VERSION = 4
 
 cdef extern from "sys/types.h":
     int ACL_TYPE_ACCESS

+ 61 - 0
src/borg/platform/posix.pyx

@@ -1,3 +1,8 @@
+
+import errno
+import os
+import socket
+
 cdef extern from "wchar.h":
     cdef int wcswidth(const Py_UNICODE *str, size_t n)
 
@@ -8,3 +13,59 @@ def swidth(s):
         return terminal_width
     else:
         return str_len
+
+
+# only determine the PID and hostname once.
+# for FUSE mounts, we fork a child process that needs to release
+# the lock made by the parent, so it needs to use the same PID for that.
+_pid = os.getpid()
+# XXX this sometimes requires live internet access for issuing a DNS query in the background.
+_hostname = socket.gethostname()
+
+
+def get_process_id():
+    """
+    Return identification tuple (hostname, pid, thread_id) for 'us'. If this is a FUSE process, then the PID will be
+    that of the parent, not the forked FUSE child.
+
+    Note: Currently thread_id is *always* zero.
+    """
+    thread_id = 0
+    return _hostname, _pid, thread_id
+
+
+def process_alive(host, pid, thread):
+    """
+    Check if the (host, pid, thread_id) combination corresponds to a potentially alive process.
+
+    If the process is local, then this will be accurate. If the process is not local, then this
+    returns always True, since there is no real way to check.
+    """
+    from . import local_pid_alive
+
+    if host != _hostname:
+        return True
+
+    if thread != 0:
+        # Currently thread is always 0, if we ever decide to set this to a non-zero value,
+        # this code needs to be revisited, too, to do a sensible thing
+        return True
+
+    return local_pid_alive(pid)
+
+
+def local_pid_alive(pid):
+    """Return whether *pid* is alive."""
+    try:
+        # This doesn't work on Windows.
+        # This does not kill anything, 0 means "see if we can send a signal to this process or not".
+        # Possible errors: No such process (== stale lock) or permission denied (not a stale lock).
+        # If the exception is not raised that means such a pid is valid and we can send a signal to it.
+        os.kill(pid, 0)
+        return True
+    except OSError as err:
+        if err.errno == errno.ESRCH:
+            # ESRCH = no such process
+            return False
+        # Any other error (eg. permissions) means that the process ID refers to a live process.
+        return True

+ 6 - 2
src/borg/remote.py

@@ -18,6 +18,7 @@ from .helpers import get_home_dir
 from .helpers import sysinfo
 from .helpers import bin_to_hex
 from .helpers import replace_placeholders
+from .helpers import yes
 from .repository import Repository
 
 RPC_PROTOCOL_VERSION = 2
@@ -326,12 +327,15 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
                 opts.append('--critical')
             else:
                 raise ValueError('log level missing, fix this code')
+        env_vars = []
+        if yes(env_var_override='BORG_HOSTNAME_IS_UNIQUE', env_msg=None, prompt=False):
+            env_vars.append('BORG_HOSTNAME_IS_UNIQUE=yes')
         if testing:
-            return [sys.executable, '-m', 'borg.archiver', 'serve'] + opts + self.extra_test_args
+            return env_vars + [sys.executable, '-m', 'borg.archiver', 'serve'] + opts + self.extra_test_args
         else:  # pragma: no cover
             remote_path = args.remote_path or os.environ.get('BORG_REMOTE_PATH', 'borg')
             remote_path = replace_placeholders(remote_path)
-            return [remote_path, 'serve'] + opts
+            return env_vars + [remote_path, 'serve'] + opts
 
     def ssh_cmd(self, location):
         """return a ssh command line that can be prefixed to a borg command line"""

+ 5 - 1
src/borg/repository.py

@@ -21,6 +21,7 @@ from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size
 from .helpers import Location
 from .helpers import ProgressIndicatorPercent
 from .helpers import bin_to_hex
+from .helpers import yes
 from .locking import Lock, LockError, LockErrorT
 from .logger import create_logger
 from .lrucache import LRUCache
@@ -121,6 +122,9 @@ class Repository:
         self.do_create = create
         self.exclusive = exclusive
         self.append_only = append_only
+        self.hostname_is_unique = yes(env_var_override='BORG_HOSTNAME_IS_UNIQUE', env_msg=None, prompt=False)
+        if self.hostname_is_unique:
+            logger.info('Enabled removal of stale repository locks')
 
     def __del__(self):
         if self.lock:
@@ -254,7 +258,7 @@ class Repository:
         if not os.path.isdir(path):
             raise self.DoesNotExist(path)
         if lock:
-            self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire()
+            self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait, kill_stale_locks=self.hostname_is_unique).acquire()
         else:
             self.lock = None
         self.config = ConfigParser(interpolation=None)

+ 73 - 10
src/borg/testsuite/locking.py

@@ -1,22 +1,25 @@
+import random
 import time
 
 import pytest
 
-from ..locking import get_id, TimeoutTimer, ExclusiveLock, Lock, LockRoster, \
-                      ADD, REMOVE, SHARED, EXCLUSIVE, LockTimeout
-
+from ..platform import get_process_id, process_alive
+from ..locking import TimeoutTimer, ExclusiveLock, Lock, LockRoster, \
+                      ADD, REMOVE, SHARED, EXCLUSIVE, LockTimeout, NotLocked, NotMyLock
 
 ID1 = "foo", 1, 1
 ID2 = "bar", 2, 2
 
 
-def test_id():
-    hostname, pid, tid = get_id()
-    assert isinstance(hostname, str)
-    assert isinstance(pid, int)
-    assert isinstance(tid, int)
-    assert len(hostname) > 0
-    assert pid > 0
+@pytest.fixture()
+def free_pid():
+    """Return a free PID not used by any process (naturally this is racy)"""
+    host, pid, tid = get_process_id()
+    while True:
+        # PIDs are often restricted to a small range. On Linux the range >32k is by default not used.
+        pid = random.randint(33000, 65000)
+        if not process_alive(host, pid, tid):
+            return pid
 
 
 class TestTimeoutTimer:
@@ -57,6 +60,22 @@ class TestExclusiveLock:
             with pytest.raises(LockTimeout):
                 ExclusiveLock(lockpath, id=ID2, timeout=0.1).acquire()
 
+    def test_kill_stale(self, lockpath, free_pid):
+        host, pid, tid = our_id = get_process_id()
+        dead_id = host, free_pid, tid
+        cant_know_if_dead_id = 'foo.bar.example.net', 1, 2
+
+        dead_lock = ExclusiveLock(lockpath, id=dead_id).acquire()
+        with ExclusiveLock(lockpath, id=our_id, kill_stale_locks=True):
+            with pytest.raises(NotMyLock):
+                dead_lock.release()
+        with pytest.raises(NotLocked):
+            dead_lock.release()
+
+        with ExclusiveLock(lockpath, id=cant_know_if_dead_id):
+            with pytest.raises(LockTimeout):
+                ExclusiveLock(lockpath, id=our_id, kill_stale_locks=True, timeout=0.1).acquire()
+
 
 class TestLock:
     def test_shared(self, lockpath):
@@ -117,6 +136,25 @@ class TestLock:
             with pytest.raises(LockTimeout):
                 Lock(lockpath, exclusive=True, id=ID2, timeout=0.1).acquire()
 
+    def test_kill_stale(self, lockpath, free_pid):
+        host, pid, tid = our_id = get_process_id()
+        dead_id = host, free_pid, tid
+        cant_know_if_dead_id = 'foo.bar.example.net', 1, 2
+
+        dead_lock = Lock(lockpath, id=dead_id, exclusive=True).acquire()
+        roster = dead_lock._roster
+        with Lock(lockpath, id=our_id, kill_stale_locks=True):
+            assert roster.get(EXCLUSIVE) == set()
+            assert roster.get(SHARED) == {our_id}
+        assert roster.get(EXCLUSIVE) == set()
+        assert roster.get(SHARED) == set()
+        with pytest.raises(KeyError):
+            dead_lock.release()
+
+        with Lock(lockpath, id=cant_know_if_dead_id, exclusive=True):
+            with pytest.raises(LockTimeout):
+                Lock(lockpath, id=our_id, kill_stale_locks=True, timeout=0.1).acquire()
+
 
 @pytest.fixture()
 def rosterpath(tmpdir):
@@ -144,3 +182,28 @@ class TestLockRoster:
         roster2 = LockRoster(rosterpath, id=ID2)
         roster2.modify(SHARED, REMOVE)
         assert roster2.get(SHARED) == set()
+
+    def test_kill_stale(self, rosterpath, free_pid):
+        host, pid, tid = our_id = get_process_id()
+        dead_id = host, free_pid, tid
+
+        roster1 = LockRoster(rosterpath, id=dead_id)
+        assert roster1.get(SHARED) == set()
+        roster1.modify(SHARED, ADD)
+        assert roster1.get(SHARED) == {dead_id}
+
+        cant_know_if_dead_id = 'foo.bar.example.net', 1, 2
+        roster1 = LockRoster(rosterpath, id=cant_know_if_dead_id)
+        assert roster1.get(SHARED) == {dead_id}
+        roster1.modify(SHARED, ADD)
+        assert roster1.get(SHARED) == {dead_id, cant_know_if_dead_id}
+
+        killer_roster = LockRoster(rosterpath, kill_stale_locks=True)
+        # Did kill the dead processes lock (which was alive ... I guess?!)
+        assert killer_roster.get(SHARED) == {cant_know_if_dead_id}
+        killer_roster.modify(SHARED, ADD)
+        assert killer_roster.get(SHARED) == {our_id, cant_know_if_dead_id}
+
+        other_killer_roster = LockRoster(rosterpath, kill_stale_locks=True)
+        # Did not kill us, since we're alive
+        assert other_killer_roster.get(SHARED) == {our_id, cant_know_if_dead_id}

+ 22 - 0
src/borg/testsuite/platform.py

@@ -1,5 +1,6 @@
 import functools
 import os
+import random
 import shutil
 import sys
 import tempfile
@@ -7,7 +8,9 @@ import pwd
 import unittest
 
 from ..platform import acl_get, acl_set, swidth
+from ..platform import get_process_id, process_alive
 from . import BaseTestCase, unopened_tempfile
+from .locking import free_pid
 
 
 ACCESS_ACL = """
@@ -186,3 +189,22 @@ class PlatformPosixTestCase(BaseTestCase):
 
     def test_swidth_mixed(self):
         self.assert_equal(swidth("borgバックアップ"), 4 + 6 * 2)
+
+
+def test_process_alive(free_pid):
+    id = get_process_id()
+    assert process_alive(*id)
+    host, pid, tid = id
+    assert process_alive(host + 'abc', pid, tid)
+    assert process_alive(host, pid, tid + 1)
+    assert not process_alive(host, free_pid, tid)
+
+
+def test_process_id():
+    hostname, pid, tid = get_process_id()
+    assert isinstance(hostname, str)
+    assert isinstance(pid, int)
+    assert isinstance(tid, int)
+    assert len(hostname) > 0
+    assert pid > 0
+    assert get_process_id() == (hostname, pid, tid)