locking.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. import json
  2. import os
  3. import socket
  4. import time
  5. from borg.helpers import Error, ErrorWithTraceback
  6. ADD, REMOVE = 'add', 'remove'
  7. SHARED, EXCLUSIVE = 'shared', 'exclusive'
  8. # for performance reasons, only determine the hostname once.
  9. _hostname = socket.gethostname()
  10. def get_id():
  11. """
  12. Return identification tuple (hostname, pid, thread_id) for 'us'.
  13. This always returns the current pid, which might be different from before, e.g. if daemonize() was used.
  14. Note: Currently thread_id is *always* zero.
  15. """
  16. thread_id = 0
  17. pid = os.getpid()
  18. return _hostname, pid, thread_id
  19. class TimeoutTimer:
  20. """
  21. A timer for timeout checks (can also deal with no timeout, give timeout=None [default]).
  22. It can also compute and optionally execute a reasonable sleep time (e.g. to avoid
  23. polling too often or to support thread/process rescheduling).
  24. """
  25. def __init__(self, timeout=None, sleep=None):
  26. """
  27. Initialize a timer.
  28. :param timeout: time out interval [s] or None (no timeout)
  29. :param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep)
  30. or None (autocompute: use 10% of timeout [but not more than 60s],
  31. or 1s for no timeout)
  32. """
  33. if timeout is not None and timeout < 0:
  34. raise ValueError("timeout must be >= 0")
  35. self.timeout_interval = timeout
  36. if sleep is None:
  37. if timeout is None:
  38. sleep = 1.0
  39. else:
  40. sleep = min(60.0, timeout / 10.0)
  41. self.sleep_interval = sleep
  42. self.start_time = None
  43. self.end_time = None
  44. def __repr__(self):
  45. return "<%s: start=%r end=%r timeout=%r sleep=%r>" % (
  46. self.__class__.__name__, self.start_time, self.end_time,
  47. self.timeout_interval, self.sleep_interval)
  48. def start(self):
  49. self.start_time = time.time()
  50. if self.timeout_interval is not None:
  51. self.end_time = self.start_time + self.timeout_interval
  52. return self
  53. def sleep(self):
  54. if self.sleep_interval >= 0:
  55. time.sleep(self.sleep_interval)
  56. def timed_out(self):
  57. return self.end_time is not None and time.time() >= self.end_time
  58. def timed_out_or_sleep(self):
  59. if self.timed_out():
  60. return True
  61. else:
  62. self.sleep()
  63. return False
  64. class LockError(Error):
  65. """Failed to acquire the lock {}."""
  66. class LockErrorT(ErrorWithTraceback):
  67. """Failed to acquire the lock {}."""
  68. class LockTimeout(LockError):
  69. """Failed to create/acquire the lock {} (timeout)."""
  70. class LockFailed(LockErrorT):
  71. """Failed to create/acquire the lock {} ({})."""
  72. class NotLocked(LockErrorT):
  73. """Failed to release the lock {} (was not locked)."""
  74. class NotMyLock(LockErrorT):
  75. """Failed to release the lock {} (was/is locked, but not by me)."""
  76. class ExclusiveLock:
  77. """An exclusive Lock based on mkdir fs operation being atomic.
  78. If possible, try to use the contextmanager here like::
  79. with ExclusiveLock(...) as lock:
  80. ...
  81. This makes sure the lock is released again if the block is left, no
  82. matter how (e.g. if an exception occurred).
  83. """
  84. def __init__(self, path, timeout=None, sleep=None, id=None):
  85. self.timeout = timeout
  86. self.sleep = sleep
  87. self.path = os.path.abspath(path)
  88. self.id = id or get_id()
  89. self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
  90. def __enter__(self):
  91. return self.acquire()
  92. def __exit__(self, *exc):
  93. self.release()
  94. def __repr__(self):
  95. return "<%s: %r>" % (self.__class__.__name__, self.unique_name)
  96. def acquire(self, timeout=None, sleep=None):
  97. if timeout is None:
  98. timeout = self.timeout
  99. if sleep is None:
  100. sleep = self.sleep
  101. timer = TimeoutTimer(timeout, sleep).start()
  102. while True:
  103. try:
  104. os.mkdir(self.path)
  105. except FileExistsError: # already locked
  106. if self.by_me():
  107. return self
  108. if timer.timed_out_or_sleep():
  109. raise LockTimeout(self.path)
  110. except OSError as err:
  111. raise LockFailed(self.path, str(err)) from None
  112. else:
  113. with open(self.unique_name, "wb"):
  114. pass
  115. return self
  116. def release(self):
  117. if not self.is_locked():
  118. raise NotLocked(self.path)
  119. if not self.by_me():
  120. raise NotMyLock(self.path)
  121. os.unlink(self.unique_name)
  122. os.rmdir(self.path)
  123. def is_locked(self):
  124. return os.path.exists(self.path)
  125. def by_me(self):
  126. return os.path.exists(self.unique_name)
  127. def break_lock(self):
  128. if self.is_locked():
  129. for name in os.listdir(self.path):
  130. os.unlink(os.path.join(self.path, name))
  131. os.rmdir(self.path)
  132. def migrate_lock(self, old_id, new_id):
  133. """migrate the lock ownership from old_id to new_id"""
  134. assert self.id == old_id
  135. new_unique_name = os.path.join(self.path, "%s.%d-%x" % new_id)
  136. if self.is_locked() and self.by_me():
  137. with open(new_unique_name, "wb"):
  138. pass
  139. os.unlink(self.unique_name)
  140. self.id, self.unique_name = new_id, new_unique_name
  141. class LockRoster:
  142. """
  143. A Lock Roster to track shared/exclusive lockers.
  144. Note: you usually should call the methods with an exclusive lock held,
  145. to avoid conflicting access by multiple threads/processes/machines.
  146. """
  147. def __init__(self, path, id=None):
  148. self.path = path
  149. self.id = id or get_id()
  150. def load(self):
  151. try:
  152. with open(self.path) as f:
  153. data = json.load(f)
  154. except (FileNotFoundError, ValueError):
  155. # no or corrupt/empty roster file?
  156. data = {}
  157. return data
  158. def save(self, data):
  159. with open(self.path, "w") as f:
  160. json.dump(data, f)
  161. def remove(self):
  162. try:
  163. os.unlink(self.path)
  164. except FileNotFoundError:
  165. pass
  166. def get(self, key):
  167. roster = self.load()
  168. return set(tuple(e) for e in roster.get(key, []))
  169. def empty(self, *keys):
  170. return all(not self.get(key) for key in keys)
  171. def modify(self, key, op):
  172. roster = self.load()
  173. try:
  174. elements = set(tuple(e) for e in roster[key])
  175. except KeyError:
  176. elements = set()
  177. if op == ADD:
  178. elements.add(self.id)
  179. elif op == REMOVE:
  180. elements.remove(self.id)
  181. else:
  182. raise ValueError('Unknown LockRoster op %r' % op)
  183. roster[key] = list(list(e) for e in elements)
  184. self.save(roster)
  185. def migrate_lock(self, key, old_id, new_id):
  186. """migrate the lock ownership from old_id to new_id"""
  187. assert self.id == old_id
  188. try:
  189. self.modify(key, REMOVE)
  190. except KeyError:
  191. # entry was not there, so no need to add a new one, but still update our id
  192. self.id = new_id
  193. else:
  194. # old entry removed, update our id and add a updated entry
  195. self.id = new_id
  196. self.modify(key, ADD)
  197. class Lock:
  198. """
  199. A Lock for a resource that can be accessed in a shared or exclusive way.
  200. Typically, write access to a resource needs an exclusive lock (1 writer,
  201. noone is allowed reading) and read access to a resource needs a shared
  202. lock (multiple readers are allowed).
  203. If possible, try to use the contextmanager here like::
  204. with Lock(...) as lock:
  205. ...
  206. This makes sure the lock is released again if the block is left, no
  207. matter how (e.g. if an exception occurred).
  208. """
  209. def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None):
  210. self.path = path
  211. self.is_exclusive = exclusive
  212. self.sleep = sleep
  213. self.timeout = timeout
  214. self.id = id or get_id()
  215. # globally keeping track of shared and exclusive lockers:
  216. self._roster = LockRoster(path + '.roster', id=id)
  217. # an exclusive lock, used for:
  218. # - holding while doing roster queries / updates
  219. # - holding while the Lock instance itself is exclusive
  220. self._lock = ExclusiveLock(path + '.exclusive', id=id, timeout=timeout)
  221. def __enter__(self):
  222. return self.acquire()
  223. def __exit__(self, *exc):
  224. self.release()
  225. def __repr__(self):
  226. return "<%s: %r>" % (self.__class__.__name__, self.id)
  227. def acquire(self, exclusive=None, remove=None, sleep=None):
  228. if exclusive is None:
  229. exclusive = self.is_exclusive
  230. sleep = sleep or self.sleep or 0.2
  231. if exclusive:
  232. self._wait_for_readers_finishing(remove, sleep)
  233. self._roster.modify(EXCLUSIVE, ADD)
  234. else:
  235. with self._lock:
  236. if remove is not None:
  237. self._roster.modify(remove, REMOVE)
  238. self._roster.modify(SHARED, ADD)
  239. self.is_exclusive = exclusive
  240. return self
  241. def _wait_for_readers_finishing(self, remove, sleep):
  242. timer = TimeoutTimer(self.timeout, sleep).start()
  243. while True:
  244. self._lock.acquire()
  245. try:
  246. if remove is not None:
  247. self._roster.modify(remove, REMOVE)
  248. if len(self._roster.get(SHARED)) == 0:
  249. return # we are the only one and we keep the lock!
  250. # restore the roster state as before (undo the roster change):
  251. if remove is not None:
  252. self._roster.modify(remove, ADD)
  253. except:
  254. # avoid orphan lock when an exception happens here, e.g. Ctrl-C!
  255. self._lock.release()
  256. raise
  257. else:
  258. self._lock.release()
  259. if timer.timed_out_or_sleep():
  260. raise LockTimeout(self.path)
  261. def release(self):
  262. if self.is_exclusive:
  263. self._roster.modify(EXCLUSIVE, REMOVE)
  264. if self._roster.empty(EXCLUSIVE, SHARED):
  265. self._roster.remove()
  266. self._lock.release()
  267. else:
  268. with self._lock:
  269. self._roster.modify(SHARED, REMOVE)
  270. if self._roster.empty(EXCLUSIVE, SHARED):
  271. self._roster.remove()
  272. def upgrade(self):
  273. # WARNING: if multiple read-lockers want to upgrade, it will deadlock because they
  274. # all will wait until the other read locks go away - and that won't happen.
  275. if not self.is_exclusive:
  276. self.acquire(exclusive=True, remove=SHARED)
  277. def downgrade(self):
  278. if self.is_exclusive:
  279. self.acquire(exclusive=False, remove=EXCLUSIVE)
  280. def got_exclusive_lock(self):
  281. return self.is_exclusive and self._lock.is_locked() and self._lock.by_me()
  282. def break_lock(self):
  283. self._roster.remove()
  284. self._lock.break_lock()
  285. def migrate_lock(self, old_id, new_id):
  286. assert self.id == old_id
  287. self.id = new_id
  288. if self.is_exclusive:
  289. self._lock.migrate_lock(old_id, new_id)
  290. self._roster.migrate_lock(EXCLUSIVE, old_id, new_id)
  291. else:
  292. with self._lock:
  293. self._lock.migrate_lock(old_id, new_id)
  294. self._roster.migrate_lock(SHARED, old_id, new_id)