kwutil.util_parallel module

Run parallel jobs but only let a fixed number run at a time and pause main thread execution (i.e. block) if the buffer gets too full. This can help if the main thread has the potential to overload the waiting buffer. This is used in several geowatch predict scripts.

Uses concurrent.futures thread or process pool executors.

class kwutil.util_parallel.BlockingJobQueue(mode='thread', max_workers=0)[source]

Bases: object

Helper to execute some number of processes in a separate thread.

A call to submit will block if there is any available background workers until at least one of them finishes.

The wait_until_finished should always be called at the end.

Todo

  • [ ] Compare with https://gist.github.com/noxdafox/4150eff0059ea43f6adbdd66e5d5e87e

  • [ ] Add parameter that lets the user answer the question: “when does this block?”
    We could block:
    1. When there are too many live jobs (I think it is, but how is this different from max-workers?)

    2. Whenever there are more than some number of unhanled results. (Good if results take a lot of memory)

Example

>>> from kwutil.util_parallel import *  # NOQA
>>> import time
>>> import random
>>> # Test with zero workers
>>> N = 100
>>> global_list = []
>>> def background_job(i):
>>>     time.sleep(random.random() * 0.001)
>>>     global_list.append(f'Executed job {i:03d}')
>>> self = BlockingJobQueue()
>>> for i in range(100):
>>>     self.submit(background_job, i)
>>> self.wait_until_finished()
>>> assert len(global_list) == N
>>> assert sorted(global_list) == global_list
>>> #
>>> # xdoctest: +REQUIRES(env:TEST_BLOCKING_JOB_QUEUE_THREADS)
>>> #
>>> # Test the threaded case
>>> global_list = []
>>> def background_job(i):
>>>     time.sleep(random.random() * 0.1 + 0.1)
>>>     global_list.append(f'Executed job {i:03d}')
>>> self = BlockingJobQueue(max_workers=10)
>>> for i in range(100):
>>>     if i == self.max_workers:
>>>         assert len(self.jobs) == self.max_workers
>>>     self.submit(background_job, i)
>>> self.wait_until_finished()
>>> assert len(global_list) == N
>>> assert sorted(global_list) != global_list
has_room()[source]
_wait_for_room()[source]
wait_until_finished(desc=None)[source]
submit(func, *args, **kwargs)[source]
class kwutil.util_parallel._DelayedFuture(func, args, kwargs, parent)[source]

Bases: object

Wraps a future object so we can execute logic when its result has been accessed.

result(timeout=None)[source]
class kwutil.util_parallel._DelayedBlockingJobQueue(max_unhandled_jobs, mode='thread', max_workers=None)[source]

Bases: object

References

submit(func, *args, **kwargs)[source]

Queues a new job, but wont execute until some conditions are met

_submit_if_room()[source]
_job_result_accessed_callback(_)[source]

Called when the user handles a result

class kwutil.util_parallel._MaxQueuePool(max_queue_size=None, mode='thread', max_workers=0)[source]

Bases: object

This Class wraps a concurrent.futures.Executor limiting the size of its task queue. If max_queue_size tasks are submitted, the next call to submit will block until a previously submitted one is completed.

References

submit(function, *args, **kwargs)[source]

Submits a new task to the pool, blocks if Pool queue is full.

pool_queue_callback(_)[source]

Called once task is done, releases one queue slot.

shutdown()[source]

Calls the shutdown function of the underlying backend.

kwutil.util_parallel.coerce_num_workers(num_workers='auto', minimum=0)[source]

Return some number of CPUs based on a chosen heuristic

Parameters:
  • num_workers (int | str) – A special string code, or an exact number of cpus

  • minimum (int) – minimum workers we are allowed to return

Returns:

number of available cpus based on request parameters

Return type:

int

CommandLine

xdoctest -m kwutil.util_parallel coerce_num_workers

Example

>>> # xdoctest: +REQUIRES(module:psutil)
>>> from kwutil.util_parallel import *  # NOQA
>>> print(coerce_num_workers('all'))
>>> print(coerce_num_workers('avail'))
>>> print(coerce_num_workers('auto'))
>>> print(coerce_num_workers('all-2'))
>>> print(coerce_num_workers('avail-2'))
>>> print(coerce_num_workers('all/2'))
>>> print(coerce_num_workers('min(all,2)'))
>>> #print(coerce_num_workers('[max(all,2)][0]'))
>>> import pytest
>>> with pytest.raises(Exception):
>>>     print(coerce_num_workers('all + 1' + (' + 1' * 100)))
>>> total_cpus = coerce_num_workers('all')
>>> assert coerce_num_workers('all-2') == max(total_cpus - 2, 0)
>>> assert coerce_num_workers('all-100') == max(total_cpus - 100, 0)
>>> assert coerce_num_workers('avail') <= coerce_num_workers('all')
>>> assert coerce_num_workers(3) == max(3, 0)