kwutil.util_locks module¶
Experimental.
For handling race conditions within and between processes (ideally).
- class kwutil.util_locks.Superlock(lock_fpath=NoParam, thread_key=NoParam)[source]¶
Bases:
objectA thread and/or process lock
The lockiest lock that ever did lock… or at least an attempt at it.
This is experimental and not well tested.
If lock_fpath is NoParam, uses a global shared process lock. If None, then no process lock is used.
If thread_key is NoParam, uses a global shared thread lock. If None, then no thread lock is used.
Otherwise locks with the same process_fpath OR thread_key will not execute concurrently, up to system limitations of the locking mechanisms.
Uses [Fasteners] for the process-based file-locks, which do have fundamental issues [OnFileLocks].
TODO: Evaluate [FileLock] as an alternative.
References
SeeAlso:
Example
>>> # xdoctest: +REQUIRES(module:fasteners) >>> self = Superlock() >>> with self: >>> print('non-concurent code')
Example
>>> # xdoctest: +REQUIRES(module:fasteners) >>> from kwutil.util_locks import * # NOQA >>> import ubelt as ub >>> lock1 = Superlock() >>> lock2 = Superlock() >>> assert lock1.acquire(timeout=10) >>> assert not lock2.acquire(timeout=0.01) >>> lock1.release() >>> assert lock2.acquire() >>> lock2.release()
Example
>>> # Demonstrate a real world case with thread locks >>> # xdoctest: +REQUIRES(module:fasteners) >>> import time >>> from kwutil.util_locks import Superlock >>> import ubelt as ub >>> # >>> shared_counter = [] >>> # >>> def task(i): ... with Superlock(): ... # simulate work inside critical section ... current_len = len(shared_counter) >>> print(f'current_len={current_len}') ... time.sleep(0.05) ... shared_counter.append(i) ... # ensure no concurrent execution by checking counter length did not change during sleep ... assert len(shared_counter) == current_len + 1 ... return i >>> # >>> with ub.Executor(mode='thread', max_workers=4) as executor: ... results = list(executor.map(task, range(8))) >>> # >>> sorted(results) == list(range(8)) True >>> len(shared_counter) == 8 True
Example
>>> # Demonstrate a real world case with process locks >>> # xdoctest: +REQUIRES(module:fasteners) >>> # xdoctest: +SKIP('xdoctest does not support pickled functions yet') >>> import time >>> import ubelt as ub >>> from pathlib import Path >>> # >>> dpath = ub.Path.appdir('kwutil/tests/superlock').ensuredir() >>> counter_fpath = dpath / 'shared_counter.txt' >>> counter_fpath.write_text('0') >>> # >>> def task(i): ... import time ... from pathlib import Path ... from kwutil.util_locks import Superlock ... lock = Superlock() ... counter_fpath = Path(ub.Path.appdir('kwutil/tests/superlock') / 'shared_counter.txt') ... with lock: ... current = int(counter_fpath.read_text()) ... time.sleep(0.05) # simulate some work ... counter_fpath.write_text(str(current + 1)) ... return i >>> # >>> with ub.Executor(mode='process', max_workers=4) as executor: ... results = list(executor.map(task, range(8))) >>> # >>> sorted(results) == list(range(8)) True >>> final_value = int(counter_fpath.read_text()) >>> final_value == 8 True
- THREAD_LOCKS = <WeakValueDictionary>¶
- GLOBAL_THREAD_KEY = '__GLOBAL_THREAD_LOCK__'¶
- GLOBAL_APPNAME = 'fasteners_ext/file_locks'¶
- GLOBAL_LOCK_FNAME = 'superlock.lock'¶
- property global_lock_fpath¶