locking.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. import errno
  2. import json
  3. import os
  4. import socket
  5. import threading
  6. import time
  7. from borg.helpers import Error
  8. ADD, REMOVE = 'add', 'remove'
  9. SHARED, EXCLUSIVE = 'shared', 'exclusive'
  10. def get_id():
  11. """Get identification tuple for 'us'"""
  12. hostname = socket.gethostname()
  13. pid = os.getpid()
  14. tid = threading.current_thread().ident & 0xffffffff
  15. return hostname, pid, tid
  16. class TimeoutTimer:
  17. """
  18. A timer for timeout checks (can also deal with no timeout, give timeout=None [default]).
  19. It can also compute and optionally execute a reasonable sleep time (e.g. to avoid
  20. polling too often or to support thread/process rescheduling).
  21. """
  22. def __init__(self, timeout=None, sleep=None):
  23. """
  24. Initialize a timer.
  25. :param timeout: time out interval [s] or None (no timeout)
  26. :param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep)
  27. or None (autocompute: use 10% of timeout, or 1s for no timeout)
  28. """
  29. if timeout is not None and timeout < 0:
  30. raise ValueError("timeout must be >= 0")
  31. self.timeout_interval = timeout
  32. if sleep is None:
  33. if timeout is None:
  34. sleep = 1.0
  35. else:
  36. sleep = timeout / 10.0
  37. self.sleep_interval = sleep
  38. self.start_time = None
  39. self.end_time = None
  40. def __repr__(self):
  41. return "<%s: start=%r end=%r timeout=%r sleep=%r>" % (
  42. self.__class__.__name__, self.start_time, self.end_time,
  43. self.timeout_interval, self.sleep_interval)
  44. def start(self):
  45. self.start_time = time.time()
  46. if self.timeout_interval is not None:
  47. self.end_time = self.start_time + self.timeout_interval
  48. return self
  49. def sleep(self):
  50. if self.sleep_interval >= 0:
  51. time.sleep(self.sleep_interval)
  52. def timed_out(self):
  53. return self.end_time is not None and time.time() >= self.end_time
  54. def timed_out_or_sleep(self):
  55. if self.timed_out():
  56. return True
  57. else:
  58. self.sleep()
  59. return False
  60. class ExclusiveLock:
  61. """An exclusive Lock based on mkdir fs operation being atomic"""
  62. class LockError(Error):
  63. """Failed to acquire the lock {}."""
  64. class LockTimeout(LockError):
  65. """Failed to create/acquire the lock {} (timeout)."""
  66. class LockFailed(LockError):
  67. """Failed to create/acquire the lock {} ({})."""
  68. class UnlockError(Error):
  69. """Failed to release the lock {}."""
  70. class NotLocked(UnlockError):
  71. """Failed to release the lock {} (was not locked)."""
  72. class NotMyLock(UnlockError):
  73. """Failed to release the lock {} (was/is locked, but not by me)."""
  74. def __init__(self, path, timeout=None, sleep=None, id=None):
  75. self.timeout = timeout
  76. self.sleep = sleep
  77. self.path = os.path.abspath(path)
  78. self.id = id or get_id()
  79. self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
  80. def __enter__(self):
  81. return self.acquire()
  82. def __exit__(self, *exc):
  83. self.release()
  84. def __repr__(self):
  85. return "<%s: %r>" % (self.__class__.__name__, self.unique_name)
  86. def acquire(self, timeout=None, sleep=None):
  87. if timeout is None:
  88. timeout = self.timeout
  89. if sleep is None:
  90. sleep = self.sleep
  91. timer = TimeoutTimer(timeout, sleep).start()
  92. while True:
  93. try:
  94. os.mkdir(self.path)
  95. except OSError as err:
  96. if err.errno == errno.EEXIST: # already locked
  97. if self.by_me():
  98. return self
  99. if timer.timed_out_or_sleep():
  100. raise self.LockTimeout(self.path)
  101. else:
  102. raise self.LockFailed(self.path, str(err))
  103. else:
  104. with open(self.unique_name, "wb"):
  105. pass
  106. return self
  107. def release(self):
  108. if not self.is_locked():
  109. raise self.NotLocked(self.path)
  110. if not self.by_me():
  111. raise self.NotMyLock(self.path)
  112. os.unlink(self.unique_name)
  113. os.rmdir(self.path)
  114. def is_locked(self):
  115. return os.path.exists(self.path)
  116. def by_me(self):
  117. return os.path.exists(self.unique_name)
  118. def break_lock(self):
  119. if self.is_locked():
  120. for name in os.listdir(self.path):
  121. os.unlink(os.path.join(self.path, name))
  122. os.rmdir(self.path)
  123. class LockRoster:
  124. """
  125. A Lock Roster to track shared/exclusive lockers.
  126. Note: you usually should call the methods with an exclusive lock held,
  127. to avoid conflicting access by multiple threads/processes/machines.
  128. """
  129. def __init__(self, path, id=None):
  130. self.path = path
  131. self.id = id or get_id()
  132. def load(self):
  133. try:
  134. with open(self.path) as f:
  135. data = json.load(f)
  136. except IOError as err:
  137. if err.errno != errno.ENOENT:
  138. raise
  139. data = {}
  140. return data
  141. def save(self, data):
  142. with open(self.path, "w") as f:
  143. json.dump(data, f)
  144. def remove(self):
  145. os.unlink(self.path)
  146. def get(self, key):
  147. roster = self.load()
  148. return set(tuple(e) for e in roster.get(key, []))
  149. def modify(self, key, op):
  150. roster = self.load()
  151. try:
  152. elements = set(tuple(e) for e in roster[key])
  153. except KeyError:
  154. elements = set()
  155. if op == ADD:
  156. elements.add(self.id)
  157. elif op == REMOVE:
  158. elements.remove(self.id)
  159. else:
  160. raise ValueError('Unknown LockRoster op %r' % op)
  161. roster[key] = list(list(e) for e in elements)
  162. self.save(roster)
  163. class UpgradableLock:
  164. """
  165. A Lock for a resource that can be accessed in a shared or exclusive way.
  166. Typically, write access to a resource needs an exclusive lock (1 writer,
  167. noone is allowed reading) and read access to a resource needs a shared
  168. lock (multiple readers are allowed).
  169. """
  170. class SharedLockFailed(Error):
  171. """Failed to acquire shared lock [{}]"""
  172. class ExclusiveLockFailed(Error):
  173. """Failed to acquire write lock [{}]"""
  174. def __init__(self, path, exclusive=False, sleep=None, id=None):
  175. self.path = path
  176. self.is_exclusive = exclusive
  177. self.sleep = sleep
  178. self.id = id or get_id()
  179. # globally keeping track of shared and exclusive lockers:
  180. self._roster = LockRoster(path + '.roster', id=id)
  181. # an exclusive lock, used for:
  182. # - holding while doing roster queries / updates
  183. # - holding while the UpgradableLock itself is exclusive
  184. self._lock = ExclusiveLock(path + '.exclusive', id=id)
  185. def __enter__(self):
  186. return self.acquire()
  187. def __exit__(self, *exc):
  188. self.release()
  189. def __repr__(self):
  190. return "<%s: %r>" % (self.__class__.__name__, self.id)
  191. def acquire(self, exclusive=None, remove=None, sleep=None):
  192. if exclusive is None:
  193. exclusive = self.is_exclusive
  194. sleep = sleep or self.sleep or 0.2
  195. try:
  196. if exclusive:
  197. self._wait_for_readers_finishing(remove, sleep)
  198. self._roster.modify(EXCLUSIVE, ADD)
  199. else:
  200. with self._lock:
  201. if remove is not None:
  202. self._roster.modify(remove, REMOVE)
  203. self._roster.modify(SHARED, ADD)
  204. self.is_exclusive = exclusive
  205. return self
  206. except ExclusiveLock.LockError as err:
  207. msg = str(err)
  208. if exclusive:
  209. raise self.ExclusiveLockFailed(msg)
  210. else:
  211. raise self.SharedLockFailed(msg)
  212. def _wait_for_readers_finishing(self, remove, sleep):
  213. while True:
  214. self._lock.acquire()
  215. if remove is not None:
  216. self._roster.modify(remove, REMOVE)
  217. remove = None
  218. if len(self._roster.get(SHARED)) == 0:
  219. return # we are the only one and we keep the lock!
  220. self._lock.release()
  221. time.sleep(sleep)
  222. def release(self):
  223. if self.is_exclusive:
  224. self._roster.modify(EXCLUSIVE, REMOVE)
  225. self._lock.release()
  226. else:
  227. with self._lock:
  228. self._roster.modify(SHARED, REMOVE)
  229. def upgrade(self):
  230. if not self.is_exclusive:
  231. self.acquire(exclusive=True, remove=SHARED)
  232. def downgrade(self):
  233. if self.is_exclusive:
  234. self.acquire(exclusive=False, remove=EXCLUSIVE)
  235. def break_lock(self):
  236. self._roster.remove()
  237. self._lock.break_lock()