locking.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. import errno
  2. import json
  3. import os
  4. import tempfile
  5. import time
  6. from . import platform
  7. from .helpers import Error, ErrorWithTraceback
  8. from .logger import create_logger
  9. ADD, REMOVE = "add", "remove"
  10. SHARED, EXCLUSIVE = "shared", "exclusive"
  11. logger = create_logger(__name__)
  12. class TimeoutTimer:
  13. """
  14. A timer for timeout checks (can also deal with "never timeout").
  15. It can also compute and optionally execute a reasonable sleep time (e.g. to avoid
  16. polling too often or to support thread/process rescheduling).
  17. """
  18. def __init__(self, timeout=None, sleep=None):
  19. """
  20. Initialize a timer.
  21. :param timeout: time out interval [s] or None (never timeout, wait forever) [default]
  22. :param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep)
  23. or None (autocompute: use 10% of timeout [but not more than 60s],
  24. or 1s for "never timeout" mode)
  25. """
  26. if timeout is not None and timeout < 0:
  27. raise ValueError("timeout must be >= 0")
  28. self.timeout_interval = timeout
  29. if sleep is None:
  30. if timeout is None:
  31. sleep = 1.0
  32. else:
  33. sleep = min(60.0, timeout / 10.0)
  34. self.sleep_interval = sleep
  35. self.start_time = None
  36. self.end_time = None
  37. def __repr__(self):
  38. return "<{}: start={!r} end={!r} timeout={!r} sleep={!r}>".format(
  39. self.__class__.__name__, self.start_time, self.end_time, self.timeout_interval, self.sleep_interval
  40. )
  41. def start(self):
  42. self.start_time = time.time()
  43. if self.timeout_interval is not None:
  44. self.end_time = self.start_time + self.timeout_interval
  45. return self
  46. def sleep(self):
  47. if self.sleep_interval >= 0:
  48. time.sleep(self.sleep_interval)
  49. def timed_out(self):
  50. return self.end_time is not None and time.time() >= self.end_time
  51. def timed_out_or_sleep(self):
  52. if self.timed_out():
  53. return True
  54. else:
  55. self.sleep()
  56. return False
  57. class LockError(Error):
  58. """Failed to acquire the lock {}."""
  59. class LockErrorT(ErrorWithTraceback):
  60. """Failed to acquire the lock {}."""
  61. class LockTimeout(LockError):
  62. """Failed to create/acquire the lock {} (timeout)."""
  63. class LockFailed(LockErrorT):
  64. """Failed to create/acquire the lock {} ({})."""
  65. class NotLocked(LockErrorT):
  66. """Failed to release the lock {} (was not locked)."""
  67. class NotMyLock(LockErrorT):
  68. """Failed to release the lock {} (was/is locked, but not by me)."""
  69. class ExclusiveLock:
  70. """An exclusive Lock based on mkdir fs operation being atomic.
  71. If possible, try to use the contextmanager here like::
  72. with ExclusiveLock(...) as lock:
  73. ...
  74. This makes sure the lock is released again if the block is left, no
  75. matter how (e.g. if an exception occurred).
  76. """
  77. def __init__(self, path, timeout=None, sleep=None, id=None):
  78. self.timeout = timeout
  79. self.sleep = sleep
  80. self.path = os.path.abspath(path)
  81. self.id = id or platform.get_process_id()
  82. self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
  83. self.kill_stale_locks = True
  84. self.stale_warning_printed = False
  85. def __enter__(self):
  86. return self.acquire()
  87. def __exit__(self, *exc):
  88. self.release()
  89. def __repr__(self):
  90. return f"<{self.__class__.__name__}: {self.unique_name!r}>"
  91. def acquire(self, timeout=None, sleep=None):
  92. if timeout is None:
  93. timeout = self.timeout
  94. if sleep is None:
  95. sleep = self.sleep
  96. parent_path, base_name = os.path.split(self.path)
  97. unique_base_name = os.path.basename(self.unique_name)
  98. temp_path = None
  99. try:
  100. temp_path = tempfile.mkdtemp(".tmp", base_name + ".", parent_path)
  101. temp_unique_name = os.path.join(temp_path, unique_base_name)
  102. with open(temp_unique_name, "wb"):
  103. pass
  104. except OSError as err:
  105. raise LockFailed(self.path, str(err)) from None
  106. else:
  107. timer = TimeoutTimer(timeout, sleep).start()
  108. while True:
  109. try:
  110. os.rename(temp_path, self.path)
  111. except OSError: # already locked
  112. if self.by_me():
  113. return self
  114. self.kill_stale_lock()
  115. if timer.timed_out_or_sleep():
  116. raise LockTimeout(self.path) from None
  117. else:
  118. temp_path = None # see finally:-block below
  119. return self
  120. finally:
  121. if temp_path is not None:
  122. # Renaming failed for some reason, so temp_dir still exists and
  123. # should be cleaned up anyway. Try to clean up, but don't crash.
  124. try:
  125. os.unlink(temp_unique_name)
  126. except:
  127. pass
  128. try:
  129. os.rmdir(temp_path)
  130. except:
  131. pass
  132. def release(self):
  133. if not self.is_locked():
  134. raise NotLocked(self.path)
  135. if not self.by_me():
  136. raise NotMyLock(self.path)
  137. os.unlink(self.unique_name)
  138. try:
  139. os.rmdir(self.path)
  140. except OSError as err:
  141. if err.errno not in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT):
  142. # EACCES or EIO or ... = we cannot operate anyway, so re-throw
  143. raise err
  144. # else:
  145. # Directory is not empty or doesn't exist any more.
  146. # this means we lost the race to somebody else -- which is ok.
  147. def is_locked(self):
  148. return os.path.exists(self.path)
  149. def by_me(self):
  150. return os.path.exists(self.unique_name)
  151. def kill_stale_lock(self):
  152. try:
  153. names = os.listdir(self.path)
  154. except FileNotFoundError: # another process did our job in the meantime.
  155. pass
  156. else:
  157. for name in names:
  158. try:
  159. host_pid, thread_str = name.rsplit("-", 1)
  160. host, pid_str = host_pid.rsplit(".", 1)
  161. pid = int(pid_str)
  162. thread = int(thread_str)
  163. except ValueError:
  164. # Malformed lock name? Or just some new format we don't understand?
  165. logger.error("Found malformed lock %s in %s. Please check/fix manually.", name, self.path)
  166. return False
  167. if platform.process_alive(host, pid, thread):
  168. return False
  169. if not self.kill_stale_locks:
  170. if not self.stale_warning_printed:
  171. # Log this at warning level to hint the user at the ability
  172. logger.warning(
  173. "Found stale lock %s, but not deleting because self.kill_stale_locks = False.", name
  174. )
  175. self.stale_warning_printed = True
  176. return False
  177. try:
  178. os.unlink(os.path.join(self.path, name))
  179. logger.warning("Killed stale lock %s.", name)
  180. except OSError as err:
  181. if not self.stale_warning_printed:
  182. # This error will bubble up and likely result in locking failure
  183. logger.error("Found stale lock %s, but cannot delete due to %s", name, str(err))
  184. self.stale_warning_printed = True
  185. return False
  186. try:
  187. os.rmdir(self.path)
  188. except OSError as err:
  189. if err.errno in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT):
  190. # Directory is not empty or doesn't exist any more = we lost the race to somebody else--which is ok.
  191. return False
  192. # EACCES or EIO or ... = we cannot operate anyway
  193. logger.error("Failed to remove lock dir: %s", str(err))
  194. return False
  195. return True
  196. def break_lock(self):
  197. if self.is_locked():
  198. for name in os.listdir(self.path):
  199. os.unlink(os.path.join(self.path, name))
  200. os.rmdir(self.path)
  201. def migrate_lock(self, old_id, new_id):
  202. """migrate the lock ownership from old_id to new_id"""
  203. assert self.id == old_id
  204. new_unique_name = os.path.join(self.path, "%s.%d-%x" % new_id)
  205. if self.is_locked() and self.by_me():
  206. with open(new_unique_name, "wb"):
  207. pass
  208. os.unlink(self.unique_name)
  209. self.id, self.unique_name = new_id, new_unique_name
  210. class LockRoster:
  211. """
  212. A Lock Roster to track shared/exclusive lockers.
  213. Note: you usually should call the methods with an exclusive lock held,
  214. to avoid conflicting access by multiple threads/processes/machines.
  215. """
  216. def __init__(self, path, id=None):
  217. self.path = path
  218. self.id = id or platform.get_process_id()
  219. self.kill_stale_locks = True
  220. def load(self):
  221. try:
  222. with open(self.path) as f:
  223. data = json.load(f)
  224. # Just nuke the stale locks early on load
  225. if self.kill_stale_locks:
  226. for key in (SHARED, EXCLUSIVE):
  227. try:
  228. entries = data[key]
  229. except KeyError:
  230. continue
  231. elements = set()
  232. for host, pid, thread in entries:
  233. if platform.process_alive(host, pid, thread):
  234. elements.add((host, pid, thread))
  235. else:
  236. logger.warning(
  237. "Removed stale %s roster lock for host %s pid %d thread %d.", key, host, pid, thread
  238. )
  239. data[key] = list(elements)
  240. except (FileNotFoundError, ValueError):
  241. # no or corrupt/empty roster file?
  242. data = {}
  243. return data
  244. def save(self, data):
  245. with open(self.path, "w") as f:
  246. json.dump(data, f)
  247. def remove(self):
  248. try:
  249. os.unlink(self.path)
  250. except FileNotFoundError:
  251. pass
  252. def get(self, key):
  253. roster = self.load()
  254. return {tuple(e) for e in roster.get(key, [])}
  255. def empty(self, *keys):
  256. return all(not self.get(key) for key in keys)
  257. def modify(self, key, op):
  258. roster = self.load()
  259. try:
  260. elements = {tuple(e) for e in roster[key]}
  261. except KeyError:
  262. elements = set()
  263. if op == ADD:
  264. elements.add(self.id)
  265. elif op == REMOVE:
  266. elements.remove(self.id)
  267. else:
  268. raise ValueError("Unknown LockRoster op %r" % op)
  269. roster[key] = list(list(e) for e in elements)
  270. self.save(roster)
  271. def migrate_lock(self, key, old_id, new_id):
  272. """migrate the lock ownership from old_id to new_id"""
  273. assert self.id == old_id
  274. # need to switch off stale lock killing temporarily as we want to
  275. # migrate rather than kill them (at least the one made by old_id).
  276. killing, self.kill_stale_locks = self.kill_stale_locks, False
  277. try:
  278. try:
  279. self.modify(key, REMOVE)
  280. except KeyError:
  281. # entry was not there, so no need to add a new one, but still update our id
  282. self.id = new_id
  283. else:
  284. # old entry removed, update our id and add a updated entry
  285. self.id = new_id
  286. self.modify(key, ADD)
  287. finally:
  288. self.kill_stale_locks = killing
  289. class Lock:
  290. """
  291. A Lock for a resource that can be accessed in a shared or exclusive way.
  292. Typically, write access to a resource needs an exclusive lock (1 writer,
  293. no one is allowed reading) and read access to a resource needs a shared
  294. lock (multiple readers are allowed).
  295. If possible, try to use the contextmanager here like::
  296. with Lock(...) as lock:
  297. ...
  298. This makes sure the lock is released again if the block is left, no
  299. matter how (e.g. if an exception occurred).
  300. """
  301. def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None):
  302. self.path = path
  303. self.is_exclusive = exclusive
  304. self.sleep = sleep
  305. self.timeout = timeout
  306. self.id = id or platform.get_process_id()
  307. # globally keeping track of shared and exclusive lockers:
  308. self._roster = LockRoster(path + ".roster", id=id)
  309. # an exclusive lock, used for:
  310. # - holding while doing roster queries / updates
  311. # - holding while the Lock itself is exclusive
  312. self._lock = ExclusiveLock(path + ".exclusive", id=id, timeout=timeout)
  313. def __enter__(self):
  314. return self.acquire()
  315. def __exit__(self, *exc):
  316. self.release()
  317. def __repr__(self):
  318. return f"<{self.__class__.__name__}: {self.id!r}>"
  319. def acquire(self, exclusive=None, remove=None, sleep=None):
  320. if exclusive is None:
  321. exclusive = self.is_exclusive
  322. sleep = sleep or self.sleep or 0.2
  323. if exclusive:
  324. self._wait_for_readers_finishing(remove, sleep)
  325. self._roster.modify(EXCLUSIVE, ADD)
  326. else:
  327. with self._lock:
  328. if remove is not None:
  329. self._roster.modify(remove, REMOVE)
  330. self._roster.modify(SHARED, ADD)
  331. self.is_exclusive = exclusive
  332. return self
  333. def _wait_for_readers_finishing(self, remove, sleep):
  334. timer = TimeoutTimer(self.timeout, sleep).start()
  335. while True:
  336. self._lock.acquire()
  337. try:
  338. if remove is not None:
  339. self._roster.modify(remove, REMOVE)
  340. if len(self._roster.get(SHARED)) == 0:
  341. return # we are the only one and we keep the lock!
  342. # restore the roster state as before (undo the roster change):
  343. if remove is not None:
  344. self._roster.modify(remove, ADD)
  345. except:
  346. # avoid orphan lock when an exception happens here, e.g. Ctrl-C!
  347. self._lock.release()
  348. raise
  349. else:
  350. self._lock.release()
  351. if timer.timed_out_or_sleep():
  352. raise LockTimeout(self.path)
  353. def release(self):
  354. if self.is_exclusive:
  355. self._roster.modify(EXCLUSIVE, REMOVE)
  356. if self._roster.empty(EXCLUSIVE, SHARED):
  357. self._roster.remove()
  358. self._lock.release()
  359. else:
  360. with self._lock:
  361. self._roster.modify(SHARED, REMOVE)
  362. if self._roster.empty(EXCLUSIVE, SHARED):
  363. self._roster.remove()
  364. def upgrade(self):
  365. # WARNING: if multiple read-lockers want to upgrade, it will deadlock because they
  366. # all will wait until the other read locks go away - and that won't happen.
  367. if not self.is_exclusive:
  368. self.acquire(exclusive=True, remove=SHARED)
  369. def downgrade(self):
  370. if self.is_exclusive:
  371. self.acquire(exclusive=False, remove=EXCLUSIVE)
  372. def got_exclusive_lock(self):
  373. return self.is_exclusive and self._lock.is_locked() and self._lock.by_me()
  374. def break_lock(self):
  375. self._roster.remove()
  376. self._lock.break_lock()
  377. def migrate_lock(self, old_id, new_id):
  378. assert self.id == old_id
  379. self.id = new_id
  380. if self.is_exclusive:
  381. self._lock.migrate_lock(old_id, new_id)
  382. self._roster.migrate_lock(EXCLUSIVE, old_id, new_id)
  383. else:
  384. with self._lock:
  385. self._lock.migrate_lock(old_id, new_id)
  386. self._roster.migrate_lock(SHARED, old_id, new_id)