locking.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. import json
  2. import os
  3. import socket
  4. import time
  5. from borg.helpers import Error, ErrorWithTraceback
  6. ADD, REMOVE = 'add', 'remove'
  7. SHARED, EXCLUSIVE = 'shared', 'exclusive'
  8. # only determine the PID and hostname once.
  9. # for FUSE mounts, we fork a child process that needs to release
  10. # the lock made by the parent, so it needs to use the same PID for that.
  11. _pid = os.getpid()
  12. _hostname = socket.gethostname()
  13. def get_id():
  14. """Get identification tuple for 'us'"""
  15. thread_id = 0
  16. return _hostname, _pid, thread_id
  17. class TimeoutTimer:
  18. """
  19. A timer for timeout checks (can also deal with no timeout, give timeout=None [default]).
  20. It can also compute and optionally execute a reasonable sleep time (e.g. to avoid
  21. polling too often or to support thread/process rescheduling).
  22. """
  23. def __init__(self, timeout=None, sleep=None):
  24. """
  25. Initialize a timer.
  26. :param timeout: time out interval [s] or None (no timeout)
  27. :param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep)
  28. or None (autocompute: use 10% of timeout, or 1s for no timeout)
  29. """
  30. if timeout is not None and timeout < 0:
  31. raise ValueError("timeout must be >= 0")
  32. self.timeout_interval = timeout
  33. if sleep is None:
  34. if timeout is None:
  35. sleep = 1.0
  36. else:
  37. sleep = timeout / 10.0
  38. self.sleep_interval = sleep
  39. self.start_time = None
  40. self.end_time = None
  41. def __repr__(self):
  42. return "<%s: start=%r end=%r timeout=%r sleep=%r>" % (
  43. self.__class__.__name__, self.start_time, self.end_time,
  44. self.timeout_interval, self.sleep_interval)
  45. def start(self):
  46. self.start_time = time.time()
  47. if self.timeout_interval is not None:
  48. self.end_time = self.start_time + self.timeout_interval
  49. return self
  50. def sleep(self):
  51. if self.sleep_interval >= 0:
  52. time.sleep(self.sleep_interval)
  53. def timed_out(self):
  54. return self.end_time is not None and time.time() >= self.end_time
  55. def timed_out_or_sleep(self):
  56. if self.timed_out():
  57. return True
  58. else:
  59. self.sleep()
  60. return False
  61. class LockError(Error):
  62. """Failed to acquire the lock {}."""
  63. class LockErrorT(ErrorWithTraceback):
  64. """Failed to acquire the lock {}."""
  65. class LockTimeout(LockError):
  66. """Failed to create/acquire the lock {} (timeout)."""
  67. class LockFailed(LockErrorT):
  68. """Failed to create/acquire the lock {} ({})."""
  69. class NotLocked(LockErrorT):
  70. """Failed to release the lock {} (was not locked)."""
  71. class NotMyLock(LockErrorT):
  72. """Failed to release the lock {} (was/is locked, but not by me)."""
  73. class ExclusiveLock:
  74. """An exclusive Lock based on mkdir fs operation being atomic.
  75. If possible, try to use the contextmanager here like:
  76. with ExclusiveLock(...) as lock:
  77. ...
  78. This makes sure the lock is released again if the block is left, no
  79. matter how (e.g. if an exception occurred).
  80. """
  81. def __init__(self, path, timeout=None, sleep=None, id=None):
  82. self.timeout = timeout
  83. self.sleep = sleep
  84. self.path = os.path.abspath(path)
  85. self.id = id or get_id()
  86. self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
  87. def __enter__(self):
  88. return self.acquire()
  89. def __exit__(self, *exc):
  90. self.release()
  91. def __repr__(self):
  92. return "<%s: %r>" % (self.__class__.__name__, self.unique_name)
  93. def acquire(self, timeout=None, sleep=None):
  94. if timeout is None:
  95. timeout = self.timeout
  96. if sleep is None:
  97. sleep = self.sleep
  98. timer = TimeoutTimer(timeout, sleep).start()
  99. while True:
  100. try:
  101. os.mkdir(self.path)
  102. except FileExistsError: # already locked
  103. if self.by_me():
  104. return self
  105. if timer.timed_out_or_sleep():
  106. raise LockTimeout(self.path)
  107. except OSError as err:
  108. raise LockFailed(self.path, str(err)) from None
  109. else:
  110. with open(self.unique_name, "wb"):
  111. pass
  112. return self
  113. def release(self):
  114. if not self.is_locked():
  115. raise NotLocked(self.path)
  116. if not self.by_me():
  117. raise NotMyLock(self.path)
  118. os.unlink(self.unique_name)
  119. os.rmdir(self.path)
  120. def is_locked(self):
  121. return os.path.exists(self.path)
  122. def by_me(self):
  123. return os.path.exists(self.unique_name)
  124. def break_lock(self):
  125. if self.is_locked():
  126. for name in os.listdir(self.path):
  127. os.unlink(os.path.join(self.path, name))
  128. os.rmdir(self.path)
  129. class LockRoster:
  130. """
  131. A Lock Roster to track shared/exclusive lockers.
  132. Note: you usually should call the methods with an exclusive lock held,
  133. to avoid conflicting access by multiple threads/processes/machines.
  134. """
  135. def __init__(self, path, id=None):
  136. self.path = path
  137. self.id = id or get_id()
  138. def load(self):
  139. try:
  140. with open(self.path) as f:
  141. data = json.load(f)
  142. except (FileNotFoundError, ValueError):
  143. # no or corrupt/empty roster file?
  144. data = {}
  145. return data
  146. def save(self, data):
  147. with open(self.path, "w") as f:
  148. json.dump(data, f)
  149. def remove(self):
  150. try:
  151. os.unlink(self.path)
  152. except FileNotFoundError:
  153. pass
  154. def get(self, key):
  155. roster = self.load()
  156. return set(tuple(e) for e in roster.get(key, []))
  157. def modify(self, key, op):
  158. roster = self.load()
  159. try:
  160. elements = set(tuple(e) for e in roster[key])
  161. except KeyError:
  162. elements = set()
  163. if op == ADD:
  164. elements.add(self.id)
  165. elif op == REMOVE:
  166. elements.remove(self.id)
  167. else:
  168. raise ValueError('Unknown LockRoster op %r' % op)
  169. roster[key] = list(list(e) for e in elements)
  170. self.save(roster)
  171. class UpgradableLock:
  172. """
  173. A Lock for a resource that can be accessed in a shared or exclusive way.
  174. Typically, write access to a resource needs an exclusive lock (1 writer,
  175. noone is allowed reading) and read access to a resource needs a shared
  176. lock (multiple readers are allowed).
  177. If possible, try to use the contextmanager here like:
  178. with UpgradableLock(...) as lock:
  179. ...
  180. This makes sure the lock is released again if the block is left, no
  181. matter how (e.g. if an exception occurred).
  182. """
  183. def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None):
  184. self.path = path
  185. self.is_exclusive = exclusive
  186. self.sleep = sleep
  187. self.timeout = timeout
  188. self.id = id or get_id()
  189. # globally keeping track of shared and exclusive lockers:
  190. self._roster = LockRoster(path + '.roster', id=id)
  191. # an exclusive lock, used for:
  192. # - holding while doing roster queries / updates
  193. # - holding while the UpgradableLock itself is exclusive
  194. self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout)
  195. def __enter__(self):
  196. return self.acquire()
  197. def __exit__(self, *exc):
  198. self.release()
  199. def __repr__(self):
  200. return "<%s: %r>" % (self.__class__.__name__, self.id)
  201. def acquire(self, exclusive=None, remove=None, sleep=None):
  202. if exclusive is None:
  203. exclusive = self.is_exclusive
  204. sleep = sleep or self.sleep or 0.2
  205. if exclusive:
  206. self._wait_for_readers_finishing(remove, sleep)
  207. self._roster.modify(EXCLUSIVE, ADD)
  208. else:
  209. with self._lock:
  210. if remove is not None:
  211. self._roster.modify(remove, REMOVE)
  212. self._roster.modify(SHARED, ADD)
  213. self.is_exclusive = exclusive
  214. return self
  215. def _wait_for_readers_finishing(self, remove, sleep):
  216. timer = TimeoutTimer(self.timeout, sleep).start()
  217. while True:
  218. self._lock.acquire()
  219. try:
  220. if remove is not None:
  221. self._roster.modify(remove, REMOVE)
  222. if len(self._roster.get(SHARED)) == 0:
  223. return # we are the only one and we keep the lock!
  224. # restore the roster state as before (undo the roster change):
  225. if remove is not None:
  226. self._roster.modify(remove, ADD)
  227. except:
  228. # avoid orphan lock when an exception happens here, e.g. Ctrl-C!
  229. self._lock.release()
  230. raise
  231. else:
  232. self._lock.release()
  233. if timer.timed_out_or_sleep():
  234. raise LockTimeout(self.path)
  235. def release(self):
  236. if self.is_exclusive:
  237. self._roster.modify(EXCLUSIVE, REMOVE)
  238. self._lock.release()
  239. else:
  240. with self._lock:
  241. self._roster.modify(SHARED, REMOVE)
  242. def upgrade(self):
  243. if not self.is_exclusive:
  244. self.acquire(exclusive=True, remove=SHARED)
  245. def downgrade(self):
  246. if self.is_exclusive:
  247. self.acquire(exclusive=False, remove=EXCLUSIVE)
  248. def break_lock(self):
  249. self._roster.remove()
  250. self._lock.break_lock()