123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460 |
- import errno
- import json
- import os
- import tempfile
- import time
- from . import platform
- from .helpers import Error, ErrorWithTraceback
- from .logger import create_logger
- ADD, REMOVE = "add", "remove"
- SHARED, EXCLUSIVE = "shared", "exclusive"
- logger = create_logger(__name__)
- class TimeoutTimer:
- """
- A timer for timeout checks (can also deal with "never timeout").
- It can also compute and optionally execute a reasonable sleep time (e.g. to avoid
- polling too often or to support thread/process rescheduling).
- """
- def __init__(self, timeout=None, sleep=None):
- """
- Initialize a timer.
- :param timeout: time out interval [s] or None (never timeout, wait forever) [default]
- :param sleep: sleep interval [s] (>= 0: do sleep call, <0: don't call sleep)
- or None (autocompute: use 10% of timeout [but not more than 60s],
- or 1s for "never timeout" mode)
- """
- if timeout is not None and timeout < 0:
- raise ValueError("timeout must be >= 0")
- self.timeout_interval = timeout
- if sleep is None:
- if timeout is None:
- sleep = 1.0
- else:
- sleep = min(60.0, timeout / 10.0)
- self.sleep_interval = sleep
- self.start_time = None
- self.end_time = None
- def __repr__(self):
- return "<{}: start={!r} end={!r} timeout={!r} sleep={!r}>".format(
- self.__class__.__name__, self.start_time, self.end_time, self.timeout_interval, self.sleep_interval
- )
- def start(self):
- self.start_time = time.time()
- if self.timeout_interval is not None:
- self.end_time = self.start_time + self.timeout_interval
- return self
- def sleep(self):
- if self.sleep_interval >= 0:
- time.sleep(self.sleep_interval)
- def timed_out(self):
- return self.end_time is not None and time.time() >= self.end_time
- def timed_out_or_sleep(self):
- if self.timed_out():
- return True
- else:
- self.sleep()
- return False
- class LockError(Error):
- """Failed to acquire the lock {}."""
- class LockErrorT(ErrorWithTraceback):
- """Failed to acquire the lock {}."""
- class LockTimeout(LockError):
- """Failed to create/acquire the lock {} (timeout)."""
- class LockFailed(LockErrorT):
- """Failed to create/acquire the lock {} ({})."""
- class NotLocked(LockErrorT):
- """Failed to release the lock {} (was not locked)."""
- class NotMyLock(LockErrorT):
- """Failed to release the lock {} (was/is locked, but not by me)."""
- class ExclusiveLock:
- """An exclusive Lock based on mkdir fs operation being atomic.
- If possible, try to use the contextmanager here like::
- with ExclusiveLock(...) as lock:
- ...
- This makes sure the lock is released again if the block is left, no
- matter how (e.g. if an exception occurred).
- """
- def __init__(self, path, timeout=None, sleep=None, id=None):
- self.timeout = timeout
- self.sleep = sleep
- self.path = os.path.abspath(path)
- self.id = id or platform.get_process_id()
- self.unique_name = os.path.join(self.path, "%s.%d-%x" % self.id)
- self.kill_stale_locks = True
- self.stale_warning_printed = False
- def __enter__(self):
- return self.acquire()
- def __exit__(self, *exc):
- self.release()
- def __repr__(self):
- return f"<{self.__class__.__name__}: {self.unique_name!r}>"
- def acquire(self, timeout=None, sleep=None):
- if timeout is None:
- timeout = self.timeout
- if sleep is None:
- sleep = self.sleep
- parent_path, base_name = os.path.split(self.path)
- unique_base_name = os.path.basename(self.unique_name)
- temp_path = None
- try:
- temp_path = tempfile.mkdtemp(".tmp", base_name + ".", parent_path)
- temp_unique_name = os.path.join(temp_path, unique_base_name)
- with open(temp_unique_name, "wb"):
- pass
- except OSError as err:
- raise LockFailed(self.path, str(err)) from None
- else:
- timer = TimeoutTimer(timeout, sleep).start()
- while True:
- try:
- os.rename(temp_path, self.path)
- except OSError: # already locked
- if self.by_me():
- return self
- self.kill_stale_lock()
- if timer.timed_out_or_sleep():
- raise LockTimeout(self.path) from None
- else:
- temp_path = None # see finally:-block below
- return self
- finally:
- if temp_path is not None:
- # Renaming failed for some reason, so temp_dir still exists and
- # should be cleaned up anyway. Try to clean up, but don't crash.
- try:
- os.unlink(temp_unique_name)
- except:
- pass
- try:
- os.rmdir(temp_path)
- except:
- pass
- def release(self):
- if not self.is_locked():
- raise NotLocked(self.path)
- if not self.by_me():
- raise NotMyLock(self.path)
- os.unlink(self.unique_name)
- try:
- os.rmdir(self.path)
- except OSError as err:
- if err.errno not in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT):
- # EACCES or EIO or ... = we cannot operate anyway, so re-throw
- raise err
- # else:
- # Directory is not empty or doesn't exist any more.
- # this means we lost the race to somebody else -- which is ok.
- def is_locked(self):
- return os.path.exists(self.path)
- def by_me(self):
- return os.path.exists(self.unique_name)
- def kill_stale_lock(self):
- try:
- names = os.listdir(self.path)
- except FileNotFoundError: # another process did our job in the meantime.
- pass
- else:
- for name in names:
- try:
- host_pid, thread_str = name.rsplit("-", 1)
- host, pid_str = host_pid.rsplit(".", 1)
- pid = int(pid_str)
- thread = int(thread_str)
- except ValueError:
- # Malformed lock name? Or just some new format we don't understand?
- logger.error("Found malformed lock %s in %s. Please check/fix manually.", name, self.path)
- return False
- if platform.process_alive(host, pid, thread):
- return False
- if not self.kill_stale_locks:
- if not self.stale_warning_printed:
- # Log this at warning level to hint the user at the ability
- logger.warning(
- "Found stale lock %s, but not deleting because self.kill_stale_locks = False.", name
- )
- self.stale_warning_printed = True
- return False
- try:
- os.unlink(os.path.join(self.path, name))
- logger.warning("Killed stale lock %s.", name)
- except OSError as err:
- if not self.stale_warning_printed:
- # This error will bubble up and likely result in locking failure
- logger.error("Found stale lock %s, but cannot delete due to %s", name, str(err))
- self.stale_warning_printed = True
- return False
- try:
- os.rmdir(self.path)
- except OSError as err:
- if err.errno in (errno.ENOTEMPTY, errno.EEXIST, errno.ENOENT):
- # Directory is not empty or doesn't exist any more = we lost the race to somebody else--which is ok.
- return False
- # EACCES or EIO or ... = we cannot operate anyway
- logger.error("Failed to remove lock dir: %s", str(err))
- return False
- return True
- def break_lock(self):
- if self.is_locked():
- for name in os.listdir(self.path):
- os.unlink(os.path.join(self.path, name))
- os.rmdir(self.path)
- def migrate_lock(self, old_id, new_id):
- """migrate the lock ownership from old_id to new_id"""
- assert self.id == old_id
- new_unique_name = os.path.join(self.path, "%s.%d-%x" % new_id)
- if self.is_locked() and self.by_me():
- with open(new_unique_name, "wb"):
- pass
- os.unlink(self.unique_name)
- self.id, self.unique_name = new_id, new_unique_name
- class LockRoster:
- """
- A Lock Roster to track shared/exclusive lockers.
- Note: you usually should call the methods with an exclusive lock held,
- to avoid conflicting access by multiple threads/processes/machines.
- """
- def __init__(self, path, id=None):
- self.path = path
- self.id = id or platform.get_process_id()
- self.kill_stale_locks = True
- def load(self):
- try:
- with open(self.path) as f:
- data = json.load(f)
- # Just nuke the stale locks early on load
- if self.kill_stale_locks:
- for key in (SHARED, EXCLUSIVE):
- try:
- entries = data[key]
- except KeyError:
- continue
- elements = set()
- for host, pid, thread in entries:
- if platform.process_alive(host, pid, thread):
- elements.add((host, pid, thread))
- else:
- logger.warning(
- "Removed stale %s roster lock for host %s pid %d thread %d.", key, host, pid, thread
- )
- data[key] = list(elements)
- except (FileNotFoundError, ValueError):
- # no or corrupt/empty roster file?
- data = {}
- return data
- def save(self, data):
- with open(self.path, "w") as f:
- json.dump(data, f)
- def remove(self):
- try:
- os.unlink(self.path)
- except FileNotFoundError:
- pass
- def get(self, key):
- roster = self.load()
- return {tuple(e) for e in roster.get(key, [])}
- def empty(self, *keys):
- return all(not self.get(key) for key in keys)
- def modify(self, key, op):
- roster = self.load()
- try:
- elements = {tuple(e) for e in roster[key]}
- except KeyError:
- elements = set()
- if op == ADD:
- elements.add(self.id)
- elif op == REMOVE:
- elements.remove(self.id)
- else:
- raise ValueError("Unknown LockRoster op %r" % op)
- roster[key] = list(list(e) for e in elements)
- self.save(roster)
- def migrate_lock(self, key, old_id, new_id):
- """migrate the lock ownership from old_id to new_id"""
- assert self.id == old_id
- # need to switch off stale lock killing temporarily as we want to
- # migrate rather than kill them (at least the one made by old_id).
- killing, self.kill_stale_locks = self.kill_stale_locks, False
- try:
- try:
- self.modify(key, REMOVE)
- except KeyError:
- # entry was not there, so no need to add a new one, but still update our id
- self.id = new_id
- else:
- # old entry removed, update our id and add a updated entry
- self.id = new_id
- self.modify(key, ADD)
- finally:
- self.kill_stale_locks = killing
- class Lock:
- """
- A Lock for a resource that can be accessed in a shared or exclusive way.
- Typically, write access to a resource needs an exclusive lock (1 writer,
- no one is allowed reading) and read access to a resource needs a shared
- lock (multiple readers are allowed).
- If possible, try to use the contextmanager here like::
- with Lock(...) as lock:
- ...
- This makes sure the lock is released again if the block is left, no
- matter how (e.g. if an exception occurred).
- """
- def __init__(self, path, exclusive=False, sleep=None, timeout=None, id=None):
- self.path = path
- self.is_exclusive = exclusive
- self.sleep = sleep
- self.timeout = timeout
- self.id = id or platform.get_process_id()
- # globally keeping track of shared and exclusive lockers:
- self._roster = LockRoster(path + ".roster", id=id)
- # an exclusive lock, used for:
- # - holding while doing roster queries / updates
- # - holding while the Lock itself is exclusive
- self._lock = ExclusiveLock(path + ".exclusive", id=id, timeout=timeout)
- def __enter__(self):
- return self.acquire()
- def __exit__(self, *exc):
- self.release()
- def __repr__(self):
- return f"<{self.__class__.__name__}: {self.id!r}>"
- def acquire(self, exclusive=None, remove=None, sleep=None):
- if exclusive is None:
- exclusive = self.is_exclusive
- sleep = sleep or self.sleep or 0.2
- if exclusive:
- self._wait_for_readers_finishing(remove, sleep)
- self._roster.modify(EXCLUSIVE, ADD)
- else:
- with self._lock:
- if remove is not None:
- self._roster.modify(remove, REMOVE)
- self._roster.modify(SHARED, ADD)
- self.is_exclusive = exclusive
- return self
- def _wait_for_readers_finishing(self, remove, sleep):
- timer = TimeoutTimer(self.timeout, sleep).start()
- while True:
- self._lock.acquire()
- try:
- if remove is not None:
- self._roster.modify(remove, REMOVE)
- if len(self._roster.get(SHARED)) == 0:
- return # we are the only one and we keep the lock!
- # restore the roster state as before (undo the roster change):
- if remove is not None:
- self._roster.modify(remove, ADD)
- except:
- # avoid orphan lock when an exception happens here, e.g. Ctrl-C!
- self._lock.release()
- raise
- else:
- self._lock.release()
- if timer.timed_out_or_sleep():
- raise LockTimeout(self.path)
- def release(self):
- if self.is_exclusive:
- self._roster.modify(EXCLUSIVE, REMOVE)
- if self._roster.empty(EXCLUSIVE, SHARED):
- self._roster.remove()
- self._lock.release()
- else:
- with self._lock:
- self._roster.modify(SHARED, REMOVE)
- if self._roster.empty(EXCLUSIVE, SHARED):
- self._roster.remove()
- def upgrade(self):
- # WARNING: if multiple read-lockers want to upgrade, it will deadlock because they
- # all will wait until the other read locks go away - and that won't happen.
- if not self.is_exclusive:
- self.acquire(exclusive=True, remove=SHARED)
- def downgrade(self):
- if self.is_exclusive:
- self.acquire(exclusive=False, remove=EXCLUSIVE)
- def got_exclusive_lock(self):
- return self.is_exclusive and self._lock.is_locked() and self._lock.by_me()
- def break_lock(self):
- self._roster.remove()
- self._lock.break_lock()
- def migrate_lock(self, old_id, new_id):
- assert self.id == old_id
- self.id = new_id
- if self.is_exclusive:
- self._lock.migrate_lock(old_id, new_id)
- self._roster.migrate_lock(EXCLUSIVE, old_id, new_id)
- else:
- with self._lock:
- self._lock.migrate_lock(old_id, new_id)
- self._roster.migrate_lock(SHARED, old_id, new_id)
|