kwutil.util_locks module

Experimental.

For handling race conditions within and between processes (ideally).

class kwutil.util_locks.Superlock(lock_fpath=NoParam, thread_key=NoParam)[source]

Bases: object

A thread and/or process lock

The lockiest lock that ever did lock… or at least an attempt at it.

This is experimental and not well tested.

If lock_fpath is NoParam, uses a global shared process lock. If None, then no process lock is used.

If thread_key is NoParam, uses a global shared thread lock. If None, then no thread lock is used.

Otherwise locks with the same process_fpath OR thread_key will not execute concurrently, up to system limitations of the locking mechanisms.

Uses [Fasteners] for the process-based file-locks, which do have fundamental issues [OnFileLocks].

TODO: Evaluate [FileLock] as an alternative.

References

SeeAlso:

Example

>>> # xdoctest: +REQUIRES(module:fasteners)
>>> self = Superlock()
>>> with self:
>>>     print('non-concurent code')

Example

>>> # xdoctest: +REQUIRES(module:fasteners)
>>> from kwutil.util_locks import *  # NOQA
>>> import ubelt as ub
>>> lock1 = Superlock()
>>> lock2 = Superlock()
>>> assert lock1.acquire(timeout=10)
>>> assert not lock2.acquire(timeout=0.01)
>>> lock1.release()
>>> assert lock2.acquire()
>>> lock2.release()

Example

>>> # Demonstrate a real world case with thread locks
>>> # xdoctest: +REQUIRES(module:fasteners)
>>> import time
>>> from kwutil.util_locks import Superlock
>>> import ubelt as ub
>>> #
>>> shared_counter = []
>>> #
>>> def task(i):
...     with Superlock():
...         # simulate work inside critical section
...         current_len = len(shared_counter)
>>>         print(f'current_len={current_len}')
...         time.sleep(0.05)
...         shared_counter.append(i)
...         # ensure no concurrent execution by checking counter length did not change during sleep
...         assert len(shared_counter) == current_len + 1
...     return i
>>> #
>>> with ub.Executor(mode='thread', max_workers=4) as executor:
...     results = list(executor.map(task, range(8)))
>>> #
>>> sorted(results) == list(range(8))
True
>>> len(shared_counter) == 8
True

Example

>>> # Demonstrate a real world case with process locks
>>> # xdoctest: +REQUIRES(module:fasteners)
>>> # xdoctest: +SKIP('xdoctest does not support pickled functions yet')
>>> import time
>>> import ubelt as ub
>>> from pathlib import Path
>>> #
>>> dpath = ub.Path.appdir('kwutil/tests/superlock').ensuredir()
>>> counter_fpath = dpath / 'shared_counter.txt'
>>> counter_fpath.write_text('0')
>>> #
>>> def task(i):
...     import time
...     from pathlib import Path
...     from kwutil.util_locks import Superlock
...     lock = Superlock()
...     counter_fpath = Path(ub.Path.appdir('kwutil/tests/superlock') / 'shared_counter.txt')
...     with lock:
...         current = int(counter_fpath.read_text())
...         time.sleep(0.05)  # simulate some work
...         counter_fpath.write_text(str(current + 1))
...     return i
>>> #
>>> with ub.Executor(mode='process', max_workers=4) as executor:
...     results = list(executor.map(task, range(8)))
>>> #
>>> sorted(results) == list(range(8))
True
>>> final_value = int(counter_fpath.read_text())
>>> final_value == 8
True
THREAD_LOCKS = <WeakValueDictionary>
GLOBAL_THREAD_KEY = '__GLOBAL_THREAD_LOCK__'
GLOBAL_APPNAME = 'fasteners_ext/file_locks'
GLOBAL_LOCK_FNAME = 'superlock.lock'
_debug(msg)[source]
property global_lock_fpath
acquire(blocking=True, timeout=None, delay=0.01, max_delay=0.1)[source]
release()[source]