kwutil.util_parallel module¶
Run parallel jobs but only let a fixed number run at a time and pause main thread execution (i.e. block) if the buffer gets too full. This can help if the main thread has the potential to overload the waiting buffer. This is used in several geowatch predict scripts.
Uses concurrent.futures thread or process pool executors.
- class kwutil.util_parallel.BlockingJobQueue(mode='thread', max_workers=0)[source]¶
Bases:
objectHelper to execute some number of processes in a separate thread.
A call to submit will block if there is any available background workers until at least one of them finishes.
The wait_until_finished should always be called at the end.
Todo
[ ] Compare with https://gist.github.com/noxdafox/4150eff0059ea43f6adbdd66e5d5e87e
- [ ] Add parameter that lets the user answer the question: “when does this block?”
- We could block:
When there are too many live jobs (I think it is, but how is this different from max-workers?)
Whenever there are more than some number of unhanled results. (Good if results take a lot of memory)
Example
>>> from kwutil.util_parallel import * # NOQA >>> import time >>> import random >>> # Test with zero workers >>> N = 100 >>> global_list = [] >>> def background_job(i): >>> time.sleep(random.random() * 0.001) >>> global_list.append(f'Executed job {i:03d}') >>> self = BlockingJobQueue() >>> for i in range(100): >>> self.submit(background_job, i) >>> self.wait_until_finished() >>> assert len(global_list) == N >>> assert sorted(global_list) == global_list >>> # >>> # xdoctest: +REQUIRES(env:TEST_BLOCKING_JOB_QUEUE_THREADS) >>> # >>> # Test the threaded case >>> global_list = [] >>> def background_job(i): >>> time.sleep(random.random() * 0.1 + 0.1) >>> global_list.append(f'Executed job {i:03d}') >>> self = BlockingJobQueue(max_workers=10) >>> for i in range(100): >>> if i == self.max_workers: >>> assert len(self.jobs) == self.max_workers >>> self.submit(background_job, i) >>> self.wait_until_finished() >>> assert len(global_list) == N >>> assert sorted(global_list) != global_list
- class kwutil.util_parallel._DelayedFuture(func, args, kwargs, parent)[source]¶
Bases:
objectWraps a future object so we can execute logic when its result has been accessed.
- class kwutil.util_parallel._DelayedBlockingJobQueue(max_unhandled_jobs, mode='thread', max_workers=None)[source]¶
Bases:
objectReferences
[GISTnoxdafoxMaxQueuePool]https://gist.github.com/noxdafox/4150eff0059ea43f6adbdd66e5d5e87e
- class kwutil.util_parallel._MaxQueuePool(max_queue_size=None, mode='thread', max_workers=0)[source]¶
Bases:
objectThis Class wraps a concurrent.futures.Executor limiting the size of its task queue. If max_queue_size tasks are submitted, the next call to submit will block until a previously submitted one is completed.
References
[GISTnoxdafoxMaxQueuePool]https://gist.github.com/noxdafox/4150eff0059ea43f6adbdd66e5d5e87e
- kwutil.util_parallel.coerce_num_workers(num_workers='auto', minimum=0)[source]¶
Return some number of CPUs based on a chosen heuristic
- Parameters:
num_workers (int | str) – A special string code, or an exact number of cpus
minimum (int) – minimum workers we are allowed to return
- Returns:
number of available cpus based on request parameters
- Return type:
CommandLine
xdoctest -m kwutil.util_parallel coerce_num_workers
Example
>>> # xdoctest: +REQUIRES(module:psutil) >>> from kwutil.util_parallel import * # NOQA >>> print(coerce_num_workers('all')) >>> print(coerce_num_workers('avail')) >>> print(coerce_num_workers('auto')) >>> print(coerce_num_workers('all-2')) >>> print(coerce_num_workers('avail-2')) >>> print(coerce_num_workers('all/2')) >>> print(coerce_num_workers('min(all,2)')) >>> #print(coerce_num_workers('[max(all,2)][0]')) >>> import pytest >>> with pytest.raises(Exception): >>> print(coerce_num_workers('all + 1' + (' + 1' * 100))) >>> total_cpus = coerce_num_workers('all') >>> assert coerce_num_workers('all-2') == max(total_cpus - 2, 0) >>> assert coerce_num_workers('all-100') == max(total_cpus - 100, 0) >>> assert coerce_num_workers('avail') <= coerce_num_workers('all') >>> assert coerce_num_workers(3) == max(3, 0)