locking.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. import errno
  2. import json
  3. import os
  4. import socket
  5. import time
  6. from borg.helpers import Error, ErrorWithTraceback
  7. ADD, REMOVE = 'add', 'remove'
  8. SHARED, EXCLUSIVE = 'shared', 'exclusive'
  9. # only determine the PID and hostname once.
  10. # for FUSE mounts, we fork a child process that needs to release
  11. # the lock made by the parent, so it needs to use the same PID for that.
  12. _pid = os.getpid()
  13. _hostname = socket.gethostname()
  14. def get_id():
  15. """Get identification tuple for 'us'"""
  16. thread_id = 0
  17. return _hostname, _pid, thread_id
  18. class TimeoutTimer:
  19. """
  20. A timer for timeout checks (can also deal with no timeout, give timeout=None [default]).
  21. It can also compute and optionally execute a reasonable sleep time (e.g. to avoid
  22. polling too often or to support thread/process rescheduling).
  23. """
  24. def __init__(self, timeout=None, sleep=None):
  25. """
  26. Initialize a timer.
  27. :param timeout: time out interval [s] or None (no timeout)
  28. :param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep)
  29. or None (autocompute: use 10% of timeout, or 1s for no timeout)
  30. """
  31. if timeout is not None and timeout < 0:
  32. raise ValueError("timeout must be >= 0")
  33. self.timeout_interval = timeout
  34. if sleep is None:
  35. if timeout is None:
  36. sleep = 1.0
  37. else:
  38. sleep = timeout / 10.0
  39. self.sleep_interval = sleep
  40. self.start_time = None
  41. self.end_time = None
  42. def __repr__(self):
  43. return "<%s: start=%r end=%r timeout=%r sleep=%r>" % (
  44. self.__class__.__name__, self.start_time, self.end_time,
  45. self.timeout_interval, self.sleep_interval)
  46. def start(self):
  47. self.start_time = time.time()
  48. if self.timeout_interval is not None:
  49. self.end_time = self.start_time + self.timeout_interval
  50. return self
  51. def sleep(self):
  52. if self.sleep_interval >= 0:
  53. time.sleep(self.sleep_interval)
  54. def timed_out(self):
  55. return self.end_time is not None and time.time() >= self.end_time
  56. def timed_out_or_sleep(self):
  57. if self.timed_out():
  58. return True
  59. else:
  60. self.sleep()
  61. return False
  62. class LockError(Error):
  63. """Failed to acquire the lock {}."""
  64. class LockErrorT(ErrorWithTraceback):
  65. """Failed to acquire the lock {}."""
  66. class LockTimeout(LockError):
  67. """Failed to create/acquire the lock {} (timeout)."""
  68. class LockFailed(LockErrorT):
  69. """Failed to create/acquire the lock {} ({})."""
  70. class NotLocked(LockErrorT):
  71. """Failed to release the lock {} (was not locked)."""
  72. class NotMyLock(LockErrorT):
  73. """Failed to release the lock {} (was/is locked, but not by me)."""
  74. class ExclusiveLock:
  75. """An exclusive Lock based on mkdir fs operation being atomic.
  76. If possible, try to use the contextmanager here like:
  77. with ExclusiveLock(...) as lock:
  78. ...
  79. This makes sure the lock is released again if the block is left, no
  80. matter how (e.g. if an exception occurred).
  81. """
  82. def __init__(self, path, timeout=None, sleep=None, id=None):
  83. self.timeout = timeout
  84. self.sleep = sleep
  85. self.path = os.path.abspath(path)
  86. self.id = id or get_id()
  87. self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
  88. def __enter__(self):
  89. return self.acquire()
  90. def __exit__(self, *exc):
  91. self.release()
  92. def __repr__(self):
  93. return "<%s: %r>" % (self.__class__.__name__, self.unique_name)
  94. def acquire(self, timeout=None, sleep=None):
  95. if timeout is None:
  96. timeout = self.timeout
  97. if sleep is None:
  98. sleep = self.sleep
  99. timer = TimeoutTimer(timeout, sleep).start()
  100. while True:
  101. try:
  102. os.mkdir(self.path)
  103. except OSError as err:
  104. if err.errno == errno.EEXIST: # already locked
  105. if self.by_me():
  106. return self
  107. if timer.timed_out_or_sleep():
  108. raise LockTimeout(self.path)
  109. else:
  110. raise LockFailed(self.path, str(err))
  111. else:
  112. with open(self.unique_name, "wb"):
  113. pass
  114. return self
  115. def release(self):
  116. if not self.is_locked():
  117. raise NotLocked(self.path)
  118. if not self.by_me():
  119. raise NotMyLock(self.path)
  120. os.unlink(self.unique_name)
  121. os.rmdir(self.path)
  122. def is_locked(self):
  123. return os.path.exists(self.path)
  124. def by_me(self):
  125. return os.path.exists(self.unique_name)
  126. def break_lock(self):
  127. if self.is_locked():
  128. for name in os.listdir(self.path):
  129. os.unlink(os.path.join(self.path, name))
  130. os.rmdir(self.path)
  131. class LockRoster:
  132. """
  133. A Lock Roster to track shared/exclusive lockers.
  134. Note: you usually should call the methods with an exclusive lock held,
  135. to avoid conflicting access by multiple threads/processes/machines.
  136. """
  137. def __init__(self, path, id=None):
  138. self.path = path
  139. self.id = id or get_id()
  140. def load(self):
  141. try:
  142. with open(self.path) as f:
  143. data = json.load(f)
  144. except IOError as err:
  145. if err.errno != errno.ENOENT:
  146. raise
  147. data = {}
  148. except ValueError:
  149. # corrupt/empty roster file?
  150. data = {}
  151. return data
  152. def save(self, data):
  153. with open(self.path, "w") as f:
  154. json.dump(data, f)
  155. def remove(self):
  156. try:
  157. os.unlink(self.path)
  158. except OSError as e:
  159. if e.errno != errno.ENOENT:
  160. raise
  161. def get(self, key):
  162. roster = self.load()
  163. return set(tuple(e) for e in roster.get(key, []))
  164. def modify(self, key, op):
  165. roster = self.load()
  166. try:
  167. elements = set(tuple(e) for e in roster[key])
  168. except KeyError:
  169. elements = set()
  170. if op == ADD:
  171. elements.add(self.id)
  172. elif op == REMOVE:
  173. elements.remove(self.id)
  174. else:
  175. raise ValueError('Unknown LockRoster op %r' % op)
  176. roster[key] = list(list(e) for e in elements)
  177. self.save(roster)
  178. class UpgradableLock:
  179. """
  180. A Lock for a resource that can be accessed in a shared or exclusive way.
  181. Typically, write access to a resource needs an exclusive lock (1 writer,
  182. noone is allowed reading) and read access to a resource needs a shared
  183. lock (multiple readers are allowed).
  184. If possible, try to use the contextmanager here like:
  185. with UpgradableLock(...) as lock:
  186. ...
  187. This makes sure the lock is released again if the block is left, no
  188. matter how (e.g. if an exception occurred).
  189. """
  190. def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None):
  191. self.path = path
  192. self.is_exclusive = exclusive
  193. self.sleep = sleep
  194. self.timeout = timeout
  195. self.id = id or get_id()
  196. # globally keeping track of shared and exclusive lockers:
  197. self._roster = LockRoster(path + '.roster', id=id)
  198. # an exclusive lock, used for:
  199. # - holding while doing roster queries / updates
  200. # - holding while the UpgradableLock itself is exclusive
  201. self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout)
  202. def __enter__(self):
  203. return self.acquire()
  204. def __exit__(self, *exc):
  205. self.release()
  206. def __repr__(self):
  207. return "<%s: %r>" % (self.__class__.__name__, self.id)
  208. def acquire(self, exclusive=None, remove=None, sleep=None):
  209. if exclusive is None:
  210. exclusive = self.is_exclusive
  211. sleep = sleep or self.sleep or 0.2
  212. if exclusive:
  213. self._wait_for_readers_finishing(remove, sleep)
  214. self._roster.modify(EXCLUSIVE, ADD)
  215. else:
  216. with self._lock:
  217. if remove is not None:
  218. self._roster.modify(remove, REMOVE)
  219. self._roster.modify(SHARED, ADD)
  220. self.is_exclusive = exclusive
  221. return self
  222. def _wait_for_readers_finishing(self, remove, sleep):
  223. timer = TimeoutTimer(self.timeout, sleep).start()
  224. while True:
  225. self._lock.acquire()
  226. try:
  227. if remove is not None:
  228. self._roster.modify(remove, REMOVE)
  229. remove = None
  230. if len(self._roster.get(SHARED)) == 0:
  231. return # we are the only one and we keep the lock!
  232. except:
  233. # avoid orphan lock when an exception happens here, e.g. Ctrl-C!
  234. self._lock.release()
  235. raise
  236. else:
  237. self._lock.release()
  238. if timer.timed_out_or_sleep():
  239. raise LockTimeout(self.path)
  240. def release(self):
  241. if self.is_exclusive:
  242. self._roster.modify(EXCLUSIVE, REMOVE)
  243. self._lock.release()
  244. else:
  245. with self._lock:
  246. self._roster.modify(SHARED, REMOVE)
  247. def upgrade(self):
  248. if not self.is_exclusive:
  249. self.acquire(exclusive=True, remove=SHARED)
  250. def downgrade(self):
  251. if self.is_exclusive:
  252. self.acquire(exclusive=False, remove=EXCLUSIVE)
  253. def break_lock(self):
  254. self._roster.remove()
  255. self._lock.break_lock()