浏览代码

Detect and delete stale locks when it's safe

If BORG_UNIQUE_HOSTNAME shell variable is set, stale locks
in both cache and repository are deleted.

Stale lock is defined as a lock that's originating from the same
hostname as us, and correspond to a pid that no longer exists.

This fixes #562
Oleg Drokin 9 年之前
父节点
当前提交
d490292be3
共有 3 个文件被更改,包括 100 次插入8 次删除
  1. 2 1
      src/borg/cache.py
  2. 96 6
      src/borg/locking.py
  3. 2 1
      src/borg/repository.py

+ 2 - 1
src/borg/cache.py

@@ -75,6 +75,7 @@ class Cache:
         self.key = key
         self.key = key
         self.manifest = manifest
         self.manifest = manifest
         self.path = path or os.path.join(get_cache_dir(), repository.id_str)
         self.path = path or os.path.join(get_cache_dir(), repository.id_str)
+        self.unique_hostname = bool(os.environ.get('BORG_UNIQUE_HOSTNAME'))
         self.do_files = do_files
         self.do_files = do_files
         # Warn user before sending data to a never seen before unencrypted repository
         # Warn user before sending data to a never seen before unencrypted repository
         if not os.path.exists(self.path):
         if not os.path.exists(self.path):
@@ -202,7 +203,7 @@ Chunk index:    {0.total_unique_chunks:20d} {0.total_chunks:20d}"""
     def open(self, lock_wait=None):
     def open(self, lock_wait=None):
         if not os.path.isdir(self.path):
         if not os.path.isdir(self.path):
             raise Exception('%s Does not look like a Borg cache' % self.path)
             raise Exception('%s Does not look like a Borg cache' % self.path)
-        self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait).acquire()
+        self.lock = Lock(os.path.join(self.path, 'lock'), exclusive=True, timeout=lock_wait, kill_stale_locks=self.unique_hostname).acquire()
         self.rollback()
         self.rollback()
 
 
     def close(self):
     def close(self):

+ 96 - 6
src/borg/locking.py

@@ -1,6 +1,8 @@
+import errno
 import json
 import json
 import os
 import os
 import socket
 import socket
+import sys
 import time
 import time
 
 
 from .helpers import Error, ErrorWithTraceback
 from .helpers import Error, ErrorWithTraceback
@@ -17,10 +19,36 @@ _hostname = socket.gethostname()
 
 
 def get_id():
 def get_id():
     """Get identification tuple for 'us'"""
     """Get identification tuple for 'us'"""
+
+    # If changing the thread_id to ever be non-zero, also revisit the check_lock_stale() below.
     thread_id = 0
     thread_id = 0
     return _hostname, _pid, thread_id
     return _hostname, _pid, thread_id
 
 
 
 
+def check_lock_stale(host, pid, thread):
+    """Check if the host, pid, thread combination corresponds to a dead process on our local node or not."""
+    if host != _hostname:
+        return False
+
+    if thread != 0:
+        # Currently thread is always 0, if we ever decide to set this to a non-zero value, this code needs to be revisited too to do a sensible thing
+        return False
+
+    try:
+        # This may not work in Windows.
+        # This does not kill anything, 0 means "see if we can send a signal to this process or not".
+        # Possible errors: No such process (== stale lock) or permission denied (not a stale lock)
+        # If the exception is not raised that means such a pid is valid and we can send a signal to it (== not a stale lock too).
+        os.kill(pid, 0)
+        return False
+    except OSError as err:
+        if err.errno != errno.ESRCH:
+            return False
+        pass
+
+    return True
+
+
 class TimeoutTimer:
 class TimeoutTimer:
     """
     """
     A timer for timeout checks (can also deal with no timeout, give timeout=None [default]).
     A timer for timeout checks (can also deal with no timeout, give timeout=None [default]).
@@ -109,12 +137,14 @@ class ExclusiveLock:
     This makes sure the lock is released again if the block is left, no
     This makes sure the lock is released again if the block is left, no
     matter how (e.g. if an exception occurred).
     matter how (e.g. if an exception occurred).
     """
     """
-    def __init__(self, path, timeout=None, sleep=None, id=None):
+    def __init__(self, path, timeout=None, sleep=None, id=None, kill_stale_locks=False):
         self.timeout = timeout
         self.timeout = timeout
         self.sleep = sleep
         self.sleep = sleep
         self.path = os.path.abspath(path)
         self.path = os.path.abspath(path)
         self.id = id or get_id()
         self.id = id or get_id()
         self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
         self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
+        self.ok_to_kill_stale_locks = kill_stale_locks
+        self.stale_warning_printed = False
 
 
     def __enter__(self):
     def __enter__(self):
         return self.acquire()
         return self.acquire()
@@ -137,6 +167,8 @@ class ExclusiveLock:
             except FileExistsError:  # already locked
             except FileExistsError:  # already locked
                 if self.by_me():
                 if self.by_me():
                     return self
                     return self
+                if self.kill_stale_lock():
+                    pass
                 if timer.timed_out_or_sleep():
                 if timer.timed_out_or_sleep():
                     raise LockTimeout(self.path)
                     raise LockTimeout(self.path)
             except OSError as err:
             except OSError as err:
@@ -160,6 +192,47 @@ class ExclusiveLock:
     def by_me(self):
     def by_me(self):
         return os.path.exists(self.unique_name)
         return os.path.exists(self.unique_name)
 
 
+    def kill_stale_lock(self):
+        for name in os.listdir(self.path):
+
+            try:
+                host_pid, thread_str = name.rsplit('-', 1)
+                host, pid_str = host_pid.rsplit('.', 1)
+                pid = int(pid_str)
+                thread = int(thread_str)
+            except ValueError:
+                # Malformed lock name? Or just some new format we don't understand?
+                # It's safer to just exit
+                return False
+
+            if not check_lock_stale(host, pid, thread):
+                return False
+
+            if not self.ok_to_kill_stale_locks:
+                if not self.stale_warning_printed:
+                    print(("Found stale lock %s, but not deleting because BORG_UNIQUE_HOSTNAME is not set." % name), file=sys.stderr)
+                    self.stale_warning_printed = True
+                return False
+
+            try:
+                os.unlink(os.path.join(self.path, name))
+                print(("Killed stale lock %s." % name), file=sys.stderr)
+            except OSError as err:
+                if not self.stale_warning_printed:
+                    print(("Found stale lock %s, but cannot delete due to %s" % (name, str(err))), file=sys.stderr)
+                    self.stale_warning_printed = True
+                return False
+
+        try:
+            os.rmdir(self.path)
+        except OSError:
+            # Directory is not empty = we lost the race to somebody else
+            # Permission denied = we cannot operate anyway
+            # other error like EIO = we cannot operate and it's unsafe too.
+            return False
+
+        return True
+
     def break_lock(self):
     def break_lock(self):
         if self.is_locked():
         if self.is_locked():
             for name in os.listdir(self.path):
             for name in os.listdir(self.path):
@@ -174,17 +247,34 @@ class LockRoster:
     Note: you usually should call the methods with an exclusive lock held,
     Note: you usually should call the methods with an exclusive lock held,
     to avoid conflicting access by multiple threads/processes/machines.
     to avoid conflicting access by multiple threads/processes/machines.
     """
     """
-    def __init__(self, path, id=None):
+    def __init__(self, path, id=None, kill_stale_locks=False):
         self.path = path
         self.path = path
         self.id = id or get_id()
         self.id = id or get_id()
+        self.ok_to_kill_zombie_locks = kill_stale_locks
 
 
     def load(self):
     def load(self):
         try:
         try:
             with open(self.path) as f:
             with open(self.path) as f:
                 data = json.load(f)
                 data = json.load(f)
+
+            # Just nuke the stale locks early on load
+            if self.ok_to_kill_zombie_locks:
+                for key in (SHARED, EXCLUSIVE):
+                    elements = set()
+                    try:
+                        for e in data[key]:
+                            (host, pid, thread) = e
+                            if not check_lock_stale(host, pid, thread):
+                                elements.add(tuple(e))
+                            else:
+                                print(("Removed stale %s roster lock for pid %d." % (key, pid)), file=sys.stderr)
+                        data[key] = list(list(e) for e in elements)
+                    except KeyError:
+                        pass
         except (FileNotFoundError, ValueError):
         except (FileNotFoundError, ValueError):
             # no or corrupt/empty roster file?
             # no or corrupt/empty roster file?
             data = {}
             data = {}
+
         return data
         return data
 
 
     def save(self, data):
     def save(self, data):
@@ -235,18 +325,18 @@ class Lock:
     This makes sure the lock is released again if the block is left, no
     This makes sure the lock is released again if the block is left, no
     matter how (e.g. if an exception occurred).
     matter how (e.g. if an exception occurred).
     """
     """
-    def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None):
+    def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None, kill_stale_locks=False):
         self.path = path
         self.path = path
         self.is_exclusive = exclusive
         self.is_exclusive = exclusive
         self.sleep = sleep
         self.sleep = sleep
         self.timeout = timeout
         self.timeout = timeout
         self.id = id or get_id()
         self.id = id or get_id()
         # globally keeping track of shared and exclusive lockers:
         # globally keeping track of shared and exclusive lockers:
-        self._roster = LockRoster(path + '.roster', id=id)
+        self._roster = LockRoster(path + '.roster', id=id, kill_stale_locks=kill_stale_locks)
         # an exclusive lock, used for:
         # an exclusive lock, used for:
         # - holding while doing roster queries / updates
         # - holding while doing roster queries / updates
-        # - holding while the Lock instance itself is exclusive
-        self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout)
+        # - holding while the Lock itself is exclusive
+        self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout, kill_stale_locks=kill_stale_locks)
 
 
     def __enter__(self):
     def __enter__(self):
         return self.acquire()
         return self.acquire()

+ 2 - 1
src/borg/repository.py

@@ -121,6 +121,7 @@ class Repository:
         self.do_create = create
         self.do_create = create
         self.exclusive = exclusive
         self.exclusive = exclusive
         self.append_only = append_only
         self.append_only = append_only
+        self.unique_hostname = bool(os.environ.get('BORG_UNIQUE_HOSTNAME'))
 
 
     def __del__(self):
     def __del__(self):
         if self.lock:
         if self.lock:
@@ -254,7 +255,7 @@ class Repository:
         if not os.path.isdir(path):
         if not os.path.isdir(path):
             raise self.DoesNotExist(path)
             raise self.DoesNotExist(path)
         if lock:
         if lock:
-            self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait).acquire()
+            self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait, kill_stale_locks=self.unique_hostname).acquire()
         else:
         else:
             self.lock = None
             self.lock = None
         self.config = ConfigParser(interpolation=None)
         self.config = ConfigParser(interpolation=None)