Source code for kwutil.util_locks

"""
Experimental.

For handling race conditions within and between processes (ideally).
"""
import ubelt as ub
import weakref


DEBUG = 0


[docs] class Superlock: """ A thread and/or process lock The lockiest lock that ever did lock... or at least an attempt at it. This is experimental and not well tested. If lock_fpath is NoParam, uses a global shared process lock. If None, then no process lock is used. If thread_key is NoParam, uses a global shared thread lock. If None, then no thread lock is used. Otherwise locks with the same process_fpath OR thread_key will not execute concurrently, up to system limitations of the locking mechanisms. Uses [Fasteners]_ for the process-based file-locks, which do have fundamental issues [OnFileLocks]_. TODO: Evaluate [FileLock]_ as an alternative. References: .. [FileLock] https://github.com/tox-dev/filelock .. [Fasteners] https://pypi.org/project/fasteners/ .. [OnFileLocks] https://0pointer.de/blog/projects/locking.html SeeAlso: https://pypi.org/project/readerwriterlock/#description https://github.com/python/cpython/issues/53046 https://gist.github.com/tylerneylon/a7ff6017b7a1f9a506cf75aa23eacfd6 https://discuss.python.org/t/add-rw-locks-to-python/76638 https://py-filelock.readthedocs.io/en/latest/index.html Ignore: Fasterners has `fasteners.ReaderWriterLock()` thread RW lock impelmentation that favors readers. It would be nice if we let the user specify priority. Example: >>> # xdoctest: +REQUIRES(module:fasteners) >>> self = Superlock() >>> with self: >>> print('non-concurent code') Example: >>> # xdoctest: +REQUIRES(module:fasteners) >>> from kwutil.util_locks import * # NOQA >>> import ubelt as ub >>> lock1 = Superlock() >>> lock2 = Superlock() >>> assert lock1.acquire(timeout=10) >>> assert not lock2.acquire(timeout=0.01) >>> lock1.release() >>> assert lock2.acquire() >>> lock2.release() Example: >>> # Demonstrate a real world case with thread locks >>> # xdoctest: +REQUIRES(module:fasteners) >>> import time >>> from kwutil.util_locks import Superlock >>> import ubelt as ub >>> # >>> shared_counter = [] >>> # >>> def task(i): ... with Superlock(): ... # simulate work inside critical section ... current_len = len(shared_counter) >>> print(f'current_len={current_len}') ... time.sleep(0.05) ... shared_counter.append(i) ... # ensure no concurrent execution by checking counter length did not change during sleep ... assert len(shared_counter) == current_len + 1 ... return i >>> # >>> with ub.Executor(mode='thread', max_workers=4) as executor: ... results = list(executor.map(task, range(8))) >>> # >>> sorted(results) == list(range(8)) True >>> len(shared_counter) == 8 True Example: >>> # Demonstrate a real world case with process locks >>> # xdoctest: +REQUIRES(module:fasteners) >>> # xdoctest: +SKIP('xdoctest does not support pickled functions yet') >>> import time >>> import ubelt as ub >>> from pathlib import Path >>> # >>> dpath = ub.Path.appdir('kwutil/tests/superlock').ensuredir() >>> counter_fpath = dpath / 'shared_counter.txt' >>> counter_fpath.write_text('0') >>> # >>> def task(i): ... import time ... from pathlib import Path ... from kwutil.util_locks import Superlock ... lock = Superlock() ... counter_fpath = Path(ub.Path.appdir('kwutil/tests/superlock') / 'shared_counter.txt') ... with lock: ... current = int(counter_fpath.read_text()) ... time.sleep(0.05) # simulate some work ... counter_fpath.write_text(str(current + 1)) ... return i >>> # >>> with ub.Executor(mode='process', max_workers=4) as executor: ... results = list(executor.map(task, range(8))) >>> # >>> sorted(results) == list(range(8)) True >>> final_value = int(counter_fpath.read_text()) >>> final_value == 8 True """ THREAD_LOCKS = weakref.WeakValueDictionary() GLOBAL_THREAD_KEY = '__GLOBAL_THREAD_LOCK__' GLOBAL_APPNAME = 'fasteners_ext/file_locks' GLOBAL_LOCK_FNAME = 'superlock.lock'
[docs] def _debug(self, msg): import os import threading print(f'[pid={os.getpid()} tid={threading.get_ident()}] {msg}')
def __init__(self, lock_fpath=ub.NoParam, thread_key=ub.NoParam): if DEBUG: self._debug('Init SuperLock') import fasteners import threading if lock_fpath is ub.NoParam: lock_fpath = self.global_lock_fpath if thread_key is ub.NoParam: thread_key = self.GLOBAL_THREAD_KEY self.lock_fpath = lock_fpath self.thread_key = thread_key self.process_lock = None self.thread_lock = None self._thread_counter = 0 self._process_counter = 0 # Only let one super lock within a thread aquire / release something at # the same time. But you can release while another is trying to aquire. self._aquire_lock = threading.Lock() self._release_lock = threading.Lock() if thread_key is not None: self.thread_lock = self.THREAD_LOCKS.get(thread_key, None) if self.thread_lock is None: self.thread_lock = threading.Lock() self.THREAD_LOCKS[thread_key] = self.thread_lock if lock_fpath is not None: self.process_lock = fasteners.InterProcessLock(lock_fpath) # @property def global_lock_fpath(self): global_dpath = ub.Path.appdir(self.GLOBAL_APPNAME, type='cache').ensuredir() global_lock_fpath = global_dpath / self.GLOBAL_LOCK_FNAME return global_lock_fpath
[docs] def acquire(self, blocking=True, timeout=None, delay=0.01, max_delay=0.1): # got = [] # # FIXME: corner case when one acquires and the other doesn't # if self.thread_lock is not None: # thread_timeout = -1 if timeout is None else timeout # gotten1 = self.thread_lock.acquire(blocking=blocking, timeout=thread_timeout) # got.append(gotten1) # if self.process_lock is not None: # gotten2 = self.process_lock.acquire( # blocking=blocking, timeout=timeout, delay=delay, # max_delay=max_delay) # got.append(gotten2) # gotten = all(got) # return gotten if DEBUG: self._debug('About to aquire SuperLock') gotten1 = False gotten2 = False with self._aquire_lock: if self.thread_lock is not None: thread_timeout = -1 if timeout is None else timeout if DEBUG: self._debug('About to aquire thread lock') gotten1 = self.thread_lock.acquire(blocking=blocking, timeout=thread_timeout) if not gotten1: if DEBUG: self._debug('Failed to aquire thread lock') return False self._thread_counter += 1 if DEBUG: self._debug('Aquired thread lock') if self.process_lock is not None: if DEBUG: self._debug('About to aquire process lock') gotten2 = self.process_lock.acquire( blocking=blocking, timeout=timeout, delay=delay, max_delay=max_delay) if not gotten2: # Undo thread_lock if process lock failed to aquire if DEBUG: self._debug('Failed to aquire process lock') if gotten1: if DEBUG: self._debug('Undo thread lock') self.thread_lock.release() self._thread_counter -= 1 return False self._process_counter += 1 if DEBUG: self._debug('Aquired process lock') # Both locks (if any) acquired successfully or no locks requested return True
[docs] def release(self): if DEBUG: self._debug('Attempting to release superlock') with self._release_lock: if self.process_lock is not None: self.process_lock.release() self._process_counter -= 1 if DEBUG: self._debug('Released process lock') if self.thread_lock is not None: self.thread_lock.release() self._thread_counter -= 1 if DEBUG: self._debug('Released thread lock') if DEBUG: self._debug('Releasing superlock')
def __enter__(self): gotten = self.acquire() assert gotten, 'should always be true' return self def __exit__(self, ex_type, ex_value, ex_traceback): """ Args: ex_type (Type[BaseException] | None): ex_value (BaseException | None): ex_traceback (TracebackType | None): Returns: bool | None """ self.release()