"""
Run parallel jobs but only let a fixed number run at a time and pause main
thread execution (i.e. block) if the buffer gets too full. This can help if the
main thread has the potential to overload the waiting buffer. This is used in
several geowatch predict scripts.
Uses concurrent.futures thread or process pool executors.
"""
import ubelt as ub
# from collections import deque
[docs]
class BlockingJobQueue:
"""
Helper to execute some number of processes in a separate thread.
A call to submit will block if there is any available background workers
until at least one of them finishes.
The wait_until_finished should always be called at the end.
TODO:
- [ ] Compare with https://gist.github.com/noxdafox/4150eff0059ea43f6adbdd66e5d5e87e
- [ ] Add parameter that lets the user answer the question: "when does this block?"
We could block:
1. When there are too many live jobs (I think it is, but how is this different from max-workers?)
2. Whenever there are more than some number of unhanled results. (Good if results take a lot of memory)
Example:
>>> from kwutil.util_parallel import * # NOQA
>>> import time
>>> import random
>>> # Test with zero workers
>>> N = 100
>>> global_list = []
>>> def background_job(i):
>>> time.sleep(random.random() * 0.001)
>>> global_list.append(f'Executed job {i:03d}')
>>> self = BlockingJobQueue()
>>> for i in range(100):
>>> self.submit(background_job, i)
>>> self.wait_until_finished()
>>> assert len(global_list) == N
>>> assert sorted(global_list) == global_list
>>> #
>>> # xdoctest: +REQUIRES(env:TEST_BLOCKING_JOB_QUEUE_THREADS)
>>> #
>>> # Test the threaded case
>>> global_list = []
>>> def background_job(i):
>>> time.sleep(random.random() * 0.1 + 0.1)
>>> global_list.append(f'Executed job {i:03d}')
>>> self = BlockingJobQueue(max_workers=10)
>>> for i in range(100):
>>> if i == self.max_workers:
>>> assert len(self.jobs) == self.max_workers
>>> self.submit(background_job, i)
>>> self.wait_until_finished()
>>> assert len(global_list) == N
>>> assert sorted(global_list) != global_list
"""
def __init__(self, mode='thread', max_workers=0):
self.max_workers = max_workers
self.executor = ub.Executor(mode=mode, max_workers=max_workers)
self.jobs = []
[docs]
def has_room(self):
return len(self.jobs) >= max(1, self.max_workers)
[docs]
def _wait_for_room(self):
# Wait until the pool has available workers
while len(self.jobs) >= max(1, self.max_workers):
new_active_jobs = []
for job in self.jobs:
if job.running():
new_active_jobs.append(job)
else:
# Check that the result is ok
job.result()
self.jobs = new_active_jobs
[docs]
def wait_until_finished(self, desc=None):
if desc is None:
jobiter = self.jobs
else:
jobiter = ub.ProgIter(self.jobs, desc=desc)
for job in jobiter:
job.result()
[docs]
def submit(self, func, *args, **kwargs):
self._wait_for_room()
job = self.executor.submit(func, *args, **kwargs)
self.jobs.append(job)
return job
[docs]
class _DelayedFuture:
"""
Wraps a future object so we can execute logic when its result has been
accessed.
"""
def __init__(self, func, args, kwargs, parent):
self.func = func
self.args = args
self.kwargs = kwargs
self.task = (func, args, kwargs)
self.parent = parent
self.future = None
[docs]
def result(self, timeout=None):
if self.future is None:
raise Exception('The task has not been submitted yet')
result = self.future.result(timeout)
self.parent._job_result_accessed_callback(self)
return result
[docs]
class _DelayedBlockingJobQueue:
"""
References:
.. [GISTnoxdafoxMaxQueuePool] https://gist.github.com/noxdafox/4150eff0059ea43f6adbdd66e5d5e87e
Ignore:
>>> from kwutil.util_parallel import _DelayedBlockingJobQueue
>>> self = _DelayedBlockingJobQueue(max_unhandled_jobs=5)
>>> futures = [
>>> self.submit(print, i)
>>> for i in range(10)
>>> ][::-1]
>>> import time
>>> time.sleep(0.5)
>>> print(self._num_submitted_jobs)
>>> print(self._num_handled_results)
>>> print('--- First 5 should have printed ---')
>>> for _ in range(3):
>>> f = futures.pop()
>>> f.result()
>>> time.sleep(0.5)
>>> print(self._num_submitted_jobs)
>>> print(self._num_handled_results)
>>> print('--- 3 Results were haneld, so 3 more can join the queue')
>>> for _ in range(3):
>>> f = futures.pop()
>>> f.result()
>>> time.sleep(0.5)
>>> print(self._num_submitted_jobs)
>>> print(self._num_handled_results)
>>> print('--- Handling the rest, but everything should have already been submitted')
>>> for _ in range(4):
>>> f = futures.pop()
>>> f.result()
"""
def __init__(self, max_unhandled_jobs, mode='thread', max_workers=None):
from collections import deque
self._unsubmitted = deque()
self.pool = ub.Executor(mode=mode, max_workers=max_workers)
self.max_unhandled_jobs = max_unhandled_jobs
self._num_handled_results = 0
self._num_submitted_jobs = 0
self._num_unhandled = 0
[docs]
def submit(self, func, *args, **kwargs):
"""
Queues a new job, but wont execute until
some conditions are met
"""
delayed = _DelayedFuture(func, args, kwargs, parent=self)
self._unsubmitted.append(delayed)
self._submit_if_room()
return delayed
[docs]
def _submit_if_room(self):
while self._num_unhandled < self.max_unhandled_jobs and self._unsubmitted:
delayed = self._unsubmitted.popleft()
self._num_submitted_jobs += 1
self._num_unhandled += 1
delayed.future = self.pool.submit(delayed.func, *delayed.args, **delayed.kwargs)
[docs]
def _job_result_accessed_callback(self, _):
"""Called when the user handles a result """
self._num_handled_results += 1
self._num_unhandled -= 1
self._submit_if_room()
[docs]
class _MaxQueuePool:
"""
This Class wraps a concurrent.futures.Executor
limiting the size of its task queue.
If `max_queue_size` tasks are submitted, the next call to submit will block
until a previously submitted one is completed.
References:
.. [GISTnoxdafoxMaxQueuePool] https://gist.github.com/noxdafox/4150eff0059ea43f6adbdd66e5d5e87e
Ignore:
import sys, ubelt
sys.path.append(ubelt.expandpath('~/code/geowatch'))
from geowatch.tasks.fusion.evaluate import * # NOQA
from geowatch.tasks.fusion.evaluate import _memo_legend, _redraw_measures
self = _MaxQueuePool(max_queue_size=0)
dpath = ub.Path.appdir('kwutil/doctests/maxpoolqueue')
dpath.delete().ensuredir()
signal_fpath = dpath / 'signal'
def waiting_worker():
counter = 0
while not signal_fpath.exists():
counter += 1
return counter
future = self.submit(waiting_worker)
try:
future.result(timeout=0.001)
except TimeoutError:
...
signal_fpath.touch()
result = future.result()
"""
def __init__(self, max_queue_size=None, mode='thread', max_workers=0):
from threading import BoundedSemaphore
if max_queue_size is None:
max_queue_size = max_workers
self.pool = ub.Executor(mode=mode, max_workers=max_workers)
if 'serial' in self.pool.backend.__class__.__name__.lower():
self.pool_queue = None
else:
self.pool_queue = BoundedSemaphore(max_queue_size)
[docs]
def submit(self, function, *args, **kwargs):
"""Submits a new task to the pool, blocks if Pool queue is full."""
if self.pool_queue is not None:
self.pool_queue.acquire()
future = self.pool.submit(function, *args, **kwargs)
future.add_done_callback(self.pool_queue_callback)
return future
[docs]
def pool_queue_callback(self, _):
"""Called once task is done, releases one queue slot."""
if self.pool_queue is not None:
self.pool_queue.release()
[docs]
def shutdown(self):
"""
Calls the shutdown function of the underlying backend.
"""
return self.pool.shutdown()
[docs]
def coerce_num_workers(num_workers='auto', minimum=0):
"""
Return some number of CPUs based on a chosen heuristic
Args:
num_workers (int | str):
A special string code, or an exact number of cpus
minimum (int): minimum workers we are allowed to return
Returns:
int : number of available cpus based on request parameters
CommandLine:
xdoctest -m kwutil.util_parallel coerce_num_workers
Example:
>>> # xdoctest: +REQUIRES(module:psutil)
>>> from kwutil.util_parallel import * # NOQA
>>> print(coerce_num_workers('all'))
>>> print(coerce_num_workers('avail'))
>>> print(coerce_num_workers('auto'))
>>> print(coerce_num_workers('all-2'))
>>> print(coerce_num_workers('avail-2'))
>>> print(coerce_num_workers('all/2'))
>>> print(coerce_num_workers('min(all,2)'))
>>> #print(coerce_num_workers('[max(all,2)][0]'))
>>> import pytest
>>> with pytest.raises(Exception):
>>> print(coerce_num_workers('all + 1' + (' + 1' * 100)))
>>> total_cpus = coerce_num_workers('all')
>>> assert coerce_num_workers('all-2') == max(total_cpus - 2, 0)
>>> assert coerce_num_workers('all-100') == max(total_cpus - 100, 0)
>>> assert coerce_num_workers('avail') <= coerce_num_workers('all')
>>> assert coerce_num_workers(3) == max(3, 0)
"""
import psutil
from kwutil.util_eval import restricted_eval
try:
num_workers = int(num_workers)
except Exception:
pass
if isinstance(num_workers, str):
num_workers = num_workers.lower()
if num_workers == 'auto':
num_workers = 'avail-2'
# input normalization
num_workers = num_workers.replace('available', 'avail')
local_dict = {}
if 'avail' in num_workers:
current_load = [p / 100 for p in psutil.cpu_percent(percpu=True)]
local_dict['avail'] = sum(f < 0.5 for f in current_load)
local_dict['all_'] = psutil.cpu_count()
if num_workers == 'none':
num_workers = None
else:
expr = num_workers.replace('all', 'all_')
# limit chars even further if eval is used
# Mitigate attack surface by restricting builtin usage
max_chars = 32
builtins_passlist = ['min', 'max', 'round', 'sum']
num_workers = restricted_eval(expr, max_chars, local_dict,
builtins_passlist)
if num_workers is not None:
num_workers = max(int(num_workers), minimum)
return num_workers