storelocking.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. import datetime
  2. import json
  3. import random
  4. import time
  5. from borgstore.store import ObjectNotFound
  6. from . import platform
  7. from .checksums import xxh64
  8. from .helpers import Error, ErrorWithTraceback, bin_to_hex
  9. from .logger import create_logger
  10. logger = create_logger(__name__)
  11. class LockError(Error):
  12. """Failed to acquire the lock {}."""
  13. exit_mcode = 70
  14. class LockErrorT(ErrorWithTraceback):
  15. """Failed to acquire the lock {}."""
  16. exit_mcode = 71
  17. class LockFailed(LockErrorT):
  18. """Failed to create/acquire the lock {} ({})."""
  19. exit_mcode = 72
  20. class LockTimeout(LockError):
  21. """Failed to create/acquire the lock {} (timeout)."""
  22. exit_mcode = 73
  23. class NotLocked(LockErrorT):
  24. """Failed to release the lock {} (was not locked)."""
  25. exit_mcode = 74
  26. class NotMyLock(LockErrorT):
  27. """Failed to release the lock {} (was/is locked, but not by me)."""
  28. exit_mcode = 75
  29. class Lock:
  30. """
  31. A Lock for a resource that can be accessed in a shared or exclusive way.
  32. Typically, write access to a resource needs an exclusive lock (1 writer,
  33. no one is allowed reading) and read access to a resource needs a shared
  34. lock (multiple readers are allowed).
  35. If possible, try to use the contextmanager here like::
  36. with Lock(...) as lock:
  37. ...
  38. This makes sure the lock is released again if the block is left, no
  39. matter how (e.g. if an exception occurred).
  40. """
  41. def __init__(self, store, exclusive=False, sleep=None, timeout=1.0, stale=30 * 60, id=None):
  42. self.store = store
  43. self.is_exclusive = exclusive
  44. self.sleep = sleep
  45. self.timeout = timeout
  46. self.race_recheck_delay = 0.01 # local: 0.01, network/slow remote: >= 1.0
  47. self.other_locks_go_away_delay = 0.1 # local: 0.1, network/slow remote: >= 1.0
  48. self.retry_delay_min = 1.0
  49. self.retry_delay_max = 5.0
  50. self.stale_td = datetime.timedelta(seconds=stale) # ignore/delete it if older
  51. self.refresh_td = datetime.timedelta(seconds=stale // 2) # don't refresh it if younger
  52. self.last_refresh_dt = None
  53. self.id = id or platform.get_process_id()
  54. assert len(self.id) == 3
  55. def __enter__(self):
  56. return self.acquire()
  57. def __exit__(self, *exc):
  58. self.release()
  59. def __repr__(self):
  60. return f"<{self.__class__.__name__}: {self.id!r}>"
  61. def _create_lock(self, *, exclusive=None):
  62. assert exclusive is not None
  63. now = datetime.datetime.now(datetime.timezone.utc)
  64. timestamp = now.isoformat(timespec="milliseconds")
  65. lock = dict(exclusive=exclusive, hostid=self.id[0], processid=self.id[1], threadid=self.id[2], time=timestamp)
  66. value = json.dumps(lock).encode("utf-8")
  67. key = bin_to_hex(xxh64(value))
  68. self.store.store(f"locks/{key}", value)
  69. self.last_refresh_dt = now
  70. return key
  71. def _delete_lock(self, key, *, ignore_not_found=False):
  72. try:
  73. self.store.delete(f"locks/{key}")
  74. except ObjectNotFound:
  75. if not ignore_not_found:
  76. raise
  77. def _is_stale_lock(self, lock):
  78. now = datetime.datetime.now(datetime.timezone.utc)
  79. if lock["dt"] < now - self.stale_td:
  80. # lock is too old, it was not refreshed.
  81. return True
  82. if not platform.process_alive(lock["hostid"], lock["processid"], lock["threadid"]):
  83. # we KNOW that the lock owning process is dead.
  84. return True
  85. return False
  86. def _get_locks(self):
  87. locks = {}
  88. try:
  89. infos = list(self.store.list("locks"))
  90. except ObjectNotFound:
  91. return {}
  92. for info in infos:
  93. key = info.name
  94. content = self.store.load(f"locks/{key}")
  95. lock = json.loads(content.decode("utf-8"))
  96. lock["key"] = key
  97. lock["dt"] = datetime.datetime.fromisoformat(lock["time"])
  98. if self._is_stale_lock(lock):
  99. # ignore it and delete it (even if it is not from us)
  100. self._delete_lock(key, ignore_not_found=True)
  101. else:
  102. locks[key] = lock
  103. return locks
  104. def _find_locks(self, *, only_exclusive=False, only_mine=False):
  105. locks = self._get_locks()
  106. found_locks = []
  107. for key in locks:
  108. lock = locks[key]
  109. if (not only_exclusive or lock["exclusive"]) and (
  110. not only_mine or (lock["hostid"], lock["processid"], lock["threadid"]) == self.id
  111. ):
  112. found_locks.append(lock)
  113. return found_locks
  114. def acquire(self):
  115. # goal
  116. # for exclusive lock: there must be only 1 exclusive lock and no other (exclusive or non-exclusive) locks.
  117. # for non-exclusive lock: there can be multiple n-e locks, but there must not exist an exclusive lock.
  118. started = time.monotonic()
  119. while time.monotonic() - started < self.timeout:
  120. exclusive_locks = self._find_locks(only_exclusive=True)
  121. if len(exclusive_locks) == 0:
  122. # looks like there are no exclusive locks, create our lock.
  123. key = self._create_lock(exclusive=self.is_exclusive)
  124. # obviously we have a race condition here: other client(s) might have created exclusive
  125. # lock(s) at the same time in parallel. thus we have to check again.
  126. time.sleep(
  127. self.race_recheck_delay
  128. ) # give other clients time to notice our exclusive lock, stop creating theirs
  129. exclusive_locks = self._find_locks(only_exclusive=True)
  130. if self.is_exclusive:
  131. if len(exclusive_locks) == 1 and exclusive_locks[0]["key"] == key:
  132. # success, we are the only exclusive lock! wait until the non-exclusive locks go away:
  133. while time.monotonic() - started < self.timeout:
  134. locks = self._find_locks(only_exclusive=False)
  135. if len(locks) == 1 and locks[0]["key"] == key:
  136. # success, we are alone!
  137. return self
  138. time.sleep(self.other_locks_go_away_delay)
  139. break # timeout
  140. else:
  141. # take back our lock as some other client(s) also created exclusive lock(s).
  142. self._delete_lock(key, ignore_not_found=True)
  143. else: # not is_exclusive
  144. if len(exclusive_locks) == 0:
  145. # success, noone else created an exclusive lock meanwhile!
  146. # We don't care for other non-exclusive locks.
  147. return self
  148. else:
  149. # take back our lock as some other client(s) also created exclusive lock(s).
  150. self._delete_lock(key, ignore_not_found=True)
  151. # wait a random bit before retrying
  152. time.sleep(self.retry_delay_min + (self.retry_delay_max - self.retry_delay_min) * random.random())
  153. # timeout
  154. raise LockFailed(str(self.store), "timeout")
  155. def release(self):
  156. locks = self._find_locks(only_mine=True)
  157. if not locks:
  158. raise NotLocked(str(self.store))
  159. assert len(locks) == 1
  160. self._delete_lock(locks[0]["key"], ignore_not_found=True)
  161. self.last_refresh_dt = None
  162. def got_exclusive_lock(self):
  163. locks = self._find_locks(only_mine=True, only_exclusive=True)
  164. return len(locks) == 1
  165. def break_lock(self):
  166. """break ALL locks (not just ours)"""
  167. locks = self._get_locks()
  168. for key in locks:
  169. self._delete_lock(key, ignore_not_found=True)
  170. self.last_refresh_dt = None
  171. def migrate_lock(self, old_id, new_id):
  172. """migrate the lock ownership from old_id to new_id"""
  173. assert self.id == old_id
  174. assert len(new_id) == 3
  175. old_locks = self._find_locks(only_mine=True)
  176. assert len(old_locks) == 1
  177. self.id = new_id
  178. self._create_lock(exclusive=old_locks[0]["exclusive"])
  179. self._delete_lock(old_locks[0]["key"])
  180. now = datetime.datetime.now(datetime.timezone.utc)
  181. self.last_refresh_dt = now
  182. def refresh(self):
  183. """refresh the lock - call this frequently, but not later than every <stale> seconds"""
  184. now = datetime.datetime.now(datetime.timezone.utc)
  185. if self.last_refresh_dt is not None and now > self.last_refresh_dt + self.refresh_td:
  186. old_locks = self._find_locks(only_mine=True)
  187. assert len(old_locks) == 1
  188. old_lock = old_locks[0]
  189. if old_lock["dt"] < now - self.refresh_td:
  190. self._create_lock(exclusive=old_lock["exclusive"])
  191. self._delete_lock(old_lock["key"])
  192. self.last_refresh_dt = now