Source code for kwutil.util_progress

r"""
POC for a better ProgIter with rich support

CommandLine:
    DEMO_PROGRESS=1 xdoctest -m kwutil.util_progress __doc__:0

Example:
    >>> # xdoctest: +REQUIRES(env:DEMO_PROGRESS)
    >>> from kwutil.util_progress import ProgressManager
    >>> import time
    >>> delay = 0.05
    >>> # Can use plain progiter or rich
    >>> # The usecase for plain progiter is when threads / live output
    >>> # is not desirable and you just want plain stdout progress
    >>> for backend in ['rich', 'progiter']:
    >>>     print(f'\n\n -- starting {backend} --\n\n')
    >>>     pman = ProgressManager(backend=backend)
    >>>     with pman:
    >>>         pbar1 = pman.progiter(range(5), desc='outer loop', verbose=3)
    >>>         for i in pbar1:
    >>>             pbar1.set_postfix(f'\\[step {i}]', refresh=False)
    >>>             for j1 in pman.progiter(range(100), desc=f'prepare inner loop {i}', transient=True):
    >>>                 time.sleep(delay / 3)
    >>>             for j2 in pman.progiter(range(100), desc=f'execute inner loop {i}'):
    >>>                 time.sleep(delay)
    >>>             for j3 in pman.progiter(range(100), desc=f'shutdown inner loop {i}', transient=True):
    >>>                 time.sleep(delay / 3)
"""
import ubelt as ub
import os
from progiter import ProgIter
import weakref


# If truthy disable all threaded rich options
PROGITER_NOTHREAD = os.environ.get('PROGITER_NOTHREAD', 'auto')
if PROGITER_NOTHREAD == 'auto':
    # Use rich outside of slurm
    PROGITER_NOTHREAD = os.environ.get('SLURM_JOBID', '')
else:
    PROGITER_NOTHREAD = bool(PROGITER_NOTHREAD)


LIVE_PROGRESS_MANAGERS = weakref.WeakValueDictionary()


[docs] class ProgIter2(ProgIter):
[docs] def _set_manager(self, manager): self.manager = weakref.proxy(manager)
[docs] def update_info(self, text): self.info_text = text info_text = getattr(self, 'info_text', None) self.ensure_newline() if info_text is not None: # if self._cursor_at_newline: print('+ --- Info --- +') print(info_text) print('+ ------------ +')
# self.display_message()
[docs] def update(self, n=1): if not self.started: self.begin() manager = getattr(self, 'manager', None) if manager is not None: # TODO: vet this, not quite working. The idea is if part of a # progress manager and this is not the "tail" progiter (i.e. it # isn't the only progiter allowed to be clearing newlines) then we # should ensure that the progiter that is allowed to clear the # newline has its ensure_newline method called so we can actually # write our progress without getting clobbered. if len(manager.prog_iters) and manager.prog_iters[-1] is not self: manager.prog_iters[-1].ensure_newline() super().update(n=n)
[docs] def display_message(self): super().display_message()
set_postfix = ProgIter.set_postfix_str
[docs] class RichProgIter: """ Ducktypes ProgIter TODO: enhance with the ability to have a update info panel that removes the circular reference Ignore: from kwutil import util_progress util_progress.LIVE_PROGRESS_MANAGERS print(len(util_progress.LIVE_PROGRESS_MANAGERS)) for v in util_progress.LIVE_PROGRESS_MANAGERS.values(): ... from kwutil.util_progress import * # NOQA for _ in RichProgIter(range(1000)): ... """ def __init__(self, iterable=None, desc=None, total=None, freq=1, initial=0, eta_window=64, clearline=True, adjust=True, time_thresh=2.0, show_times=True, show_wall=False, enabled=True, verbose=None, stream=None, chunksize=None, rel_adjust_limit=4.0, transient=False, manager=None, spinner=False, **kwargs): unhandled = { 'eta_window', 'clearline', 'adjust', 'time_thresh', 'show_times', 'show_wall', 'stream', 'chunksize', 'rel_adjust_limit', } kwargs = ub.udict(kwargs) - unhandled if manager is None: manager = _RichProgIterManager() self._self_managed = True else: manager = weakref.proxy(manager) self._self_managed = False if verbose is None: verbose = 1 if not verbose: enabled = False self.manager = manager self.iterable = iterable self.enabled = enabled self.spinner = spinner if total is None: try: total = len(iterable) except Exception: ... self.total = total self.desc = desc addtask_kw = {} if desc is not None: addtask_kw['description'] = desc else: addtask_kw['description'] = '' addtask_kw['total'] = self.total if self.enabled: self.task_id = self.manager.rich_progress.add_task(**addtask_kw) else: self.task_id = None self.transient = transient self.extra = None
[docs] def start(self): return self.begin()
[docs] def stop(self): return self.end()
[docs] def begin(self): if self._self_managed: self.manager.start()
[docs] def end(self): if self.transient: self.remove() if self._self_managed: self.manager.stop()
[docs] def update(self, n=1): if self.enabled: self.manager.rich_progress.update(self.task_id, advance=n)
step = update def __iter__(self): if not self.enabled: yield from self.iterable else: self.start() for item in self.iterable: yield item self.manager.rich_progress.update(self.task_id, advance=1) if self.total is None: task = self.manager.rich_progress._tasks[self.task_id] self.manager.rich_progress.update(self.task_id, total=task.completed) self.stop()
[docs] def remove(self): """ Remove this progress task from its rich manager """ if self.enabled: self.manager.rich_progress.remove_task(self.task_id)
[docs] def update_info(self, text): if self.enabled: # FIXME: remove circular reference self.manager.update_info(text)
[docs] def ensure_newline(self): ...
[docs] def set_postfix_str(self, text, refresh=True): self.extra = text parts = [self.desc] if self.desc is not None else [] if self.extra is not None: parts.append(self.extra) if self.enabled: description = ' '.join(parts) self.manager.rich_progress.update( self.task_id, description=description, refresh=refresh)
set_postfix = set_postfix_str set_extra = set_postfix_str
[docs] class BaseProgIterManager:
[docs] def new(self, *args, **kw): return self.progiter(*args, **kw)
[docs] def __call__(self, *args, **kw): return self.progiter(*args, **kw)
[docs] def start(self): # TODO: be re-entrant? keep track of number of enters and only stop # when exits get to zero? return self
[docs] def begin(self): return self.start()
[docs] def stop(self, **kwargs): ...
def __enter__(self): return self.start() def __exit__(self, exc_type=None, exc_val=None, exc_tb=None): self.stop(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
# Global var MAIN_RICH_PMAN = None
[docs] class _RichProgIterManager(BaseProgIterManager): """ rich specific backend. Example: >>> # Test verbose = 0 >>> # xdoctest: +REQUIRES(module:rich) >>> from kwutil.util_progress import ProgressManager >>> from kwutil import util_progress >>> import time >>> pman = ProgressManager(backend='rich', verbose=0) >>> with pman: >>> for i in pman.progiter(range(10), desc='should not show1'): >>> ... >>> for i in pman.progiter(range(10), verbose=1, desc='should show2'): >>> ... >>> for i in pman.progiter(range(10), verbose=0, desc='should not show3'): >>> ... """ def __init__(self, **kwargs): self.prog_iters = [] self.enabled = kwargs.pop('enabled', True) self.info_panel = None self.rich_progress = None self._is_main_manager = None self.setup_rich() self._active = False self.default_progkw = dict({ 'verbose': kwargs.pop('verbose', 1), }) self.default_progkw.update(kwargs) # Can we make this work? # def __del__(self): # if self._active: # self.stop()
[docs] def progiter(self, iterable=None, total=None, desc=None, transient=False, spinner=False, verbose='auto', **kw): if not self._active: self.start() # Fixme remove circular ref # self.rich_progress.pman = self progkw = self.default_progkw.copy() progkw.update(kw) progkw['verbose'] = verbose if verbose == 'auto': progkw['verbose'] = self.default_progkw.get('verbose', 1) prog = RichProgIter( manager=self, iterable=iterable, total=total, desc=desc, transient=transient, spinner=spinner, **progkw) self.prog_iters.append(prog) return prog
# Alias for simpler drop-in-replacements ProgIter = progiter
[docs] def setup_rich(self): global MAIN_RICH_PMAN import rich import rich.progress from rich.console import Group from rich.live import Live from rich.progress import BarColumn, TextColumn from rich.progress import Progress as richProgress from rich.progress import SpinnerColumn from rich.progress import ProgressColumn, Text # from rich.style import Style if MAIN_RICH_PMAN is not None: self._is_main_manager = False self.live_context = None self.rich_progress = MAIN_RICH_PMAN.rich_progress self.progress_group = MAIN_RICH_PMAN.progress_group else: # FIXME: instead of having a concept of a "main-manager", we should # use reference counting to keep the rich backend alive until there # are no managers referencing it. self._is_main_manager = True class ProgressRateColumn(ProgressColumn): """Renders human readable transfer speed.""" def render(self, task) -> Text: """Show progress speed speed.""" _iters_per_second = task.finished_speed or task.speed if _iters_per_second is not None: rate_format = '4.2f' if _iters_per_second > .001 else 'g' fmt = '{:' + rate_format + '} Hz' text = fmt.format(_iters_per_second) else: text = '?' # style = Style(color="red") style = 'progress.data.speed' renderable = Text(text, style=style) return renderable self.rich_progress = richProgress( TextColumn("{task.description}"), SpinnerColumn(), BarColumn(), "[progress.percentage]{task.percentage:>3.0f}%", rich.progress.MofNCompleteColumn(), # rich.progress.TransferSpeedColumn(), ProgressRateColumn(), 'eta', rich.progress.TimeRemainingColumn(), 'total', rich.progress.TimeElapsedColumn(), ) self.info_panel = None self.progress_group = Group( # self.info_panel, self.rich_progress, ) self.live_context = Live(self.progress_group) MAIN_RICH_PMAN = self
[docs] def update_info(self, text): from rich.panel import Panel if self.info_panel is None: self.info_panel = Panel(text) self.progress_group.renderables.insert(0, self.info_panel) else: self.info_panel.renderable = text
[docs] def start(self): if self.enabled and not self._active: self._active = True if self._is_main_manager: return self.live_context.__enter__()
[docs] def stop(self, **kw): if self.enabled and self._active: if not kw: kw['exc_type'] = None kw['exc_val'] = None kw['exc_tb'] = None if self._is_main_manager: global MAIN_RICH_PMAN MAIN_RICH_PMAN = None ret = self.live_context.__exit__(**kw) self._is_main_manager = False else: ret = None self._active = False return ret
[docs] class _ProgIterManager(BaseProgIterManager): """ progiter specific backend """ def __init__(self, **kwargs): self.enabled = kwargs.get('enabled', True) # Default arguments for new progiters self.default_progkw = ub.udict({ 'time_thresh': 2.0, }) | ub.udict(kwargs) self.prog_iters = []
[docs] def progiter(self, iterable=None, total=None, desc=None, transient=False, spinner=False, verbose='auto', **kw): progkw = self.default_progkw.copy() progkw.update(kw) progkw['verbose'] = verbose if verbose == 'auto': progkw['verbose'] = self.default_progkw.get('verbose', 1) if True: # Change all other - now outer - progiters to verbose=3 mode for other in self.prog_iters: other.ensure_newline() if other.enabled: other.clearline = False other.adjust = False other.freq = 1 prog = ProgIter2(iterable, total=total, desc=desc, **progkw) prog._set_manager(self) self.prog_iters.append(prog) return prog
[docs] def update_info(self, text): if len(self.prog_iters) == 0: # if self._cursor_at_newline: print('+ --- Info --- +') print(text) print('+ ------------ +') else: self.prog_iters[0].update_info(text)
[docs] class ProgressManager(BaseProgIterManager): r""" A progress manager. Manage multiple progress bars, either with rich or ProgIter. CommandLine: xdoctest -m kwutil.util_progress ProgressManager:0 xdoctest -m kwutil.util_progress ProgressManager:1 xdoctest -m kwutil.util_progress ProgressManager:2 Example: >>> from kwutil.util_progress import ProgressManager >>> from progiter import progiter >>> # Can use plain progiter or rich >>> # The usecase for plain progiter is when threads / live output >>> # is not desirable and you just want plain stdout progress >>> pman = ProgressManager(backend='progiter') >>> with pman: >>> oprog = pman.progiter(range(20), desc='outer loop', verbose=3) >>> for i in oprog: >>> oprog.set_postfix(f'Doing step {i}', refresh=False) >>> for i in pman.progiter(range(100), desc=f'inner loop {i}'): >>> pass >>> # xdoctest: +REQUIRES(module:rich) >>> self = pman = ProgressManager(backend='rich') >>> pman = ProgressManager(backend='rich') >>> with pman: >>> oprog = pman.progiter(range(20), desc='outer loop', verbose=3) >>> for i in oprog: >>> oprog.set_postfix(f'Doing step {i}', refresh=False) >>> for i in pman.progiter(range(100), desc=f'inner loop {i}'): >>> pass Example: >>> # xdoctest: +REQUIRES(module:rich) >>> # A fairly complex example >>> from kwutil.util_progress import ProgressManager >>> import time >>> delay = 0.00005 >>> N_inner = 300 >>> N_outer = 11 >>> self = pman = ProgressManager(backend='rich') >>> with pman: >>> oprog = pman(range(N_outer), desc='outer loop') >>> for i in oprog: >>> if i > 7: >>> self.update_info(f'The info panel gives detailed updates\nWe are now at step {i}\nWe are just about done now') >>> elif i > 5: >>> self.update_info(f'The info panel gives detailed updates\nWe are now at step {i}') >>> oprog.set_postfix(f'Doing step {i}') >>> N = 1000 >>> for j in pman(iter(range(N_inner)), total=None if i % 2 == 0 else N_inner, desc=f'inner loop {i}', transient=i < 4): >>> time.sleep(delay) Example: >>> # xdoctest: +REQUIRES(module:rich) >>> # Test complex example over a grid of parameters >>> from kwutil.util_progress import ProgressManager, ProgIter2 >>> import time >>> delay = 0.000001 >>> N_inner = 100 >>> N_outer = 11 >>> basis = { >>> 'with_info': [0, 1], >>> 'backend': ['progiter', 'rich'], >>> 'enabled': [0, 1], >>> #'with_info': [1], >>> } >>> grid = list(ub.named_product(basis)) >>> grid_prog = ProgIter2(grid, desc='Test cases over grid', verbose=3) >>> grid_prog.update_info('Here we go') >>> for item in grid: >>> grid_prog.ensure_newline() >>> grid_prog.update_info(f'Running grid test {ub.urepr(item, nl=1)}') >>> print('\n\n') >>> self = ProgressManager(backend=item['backend'], enabled=item['enabled']) >>> with self: >>> outer_prog = self.progiter(range(N_outer), desc='outer loop') >>> for i in outer_prog: >>> if item['with_info']: >>> if i > 7: >>> outer_prog.update_info(f'The info panel gives detailed updates\nWe are now at step {i}\nWe are just about done now') >>> elif i > 5: >>> outer_prog.update_info(f'The info panel gives detailed updates\nWe are now at step {i}') >>> outer_prog.set_postfix(f'Doing step {i}') >>> inner_kwargs = dict( >>> total=None if i % 2 == 0 else N_inner, >>> transient=i < 4, >>> time_thresh=delay * 2.3, >>> desc=f'inner loop {i}', >>> ) >>> for j in self.progiter(iter(range(N_inner)), **inner_kwargs): >>> time.sleep(delay) >>> grid_prog.update_info(f'Finished test item') Example: >>> # xdoctest: +REQUIRES(module:rich) >>> # Demo manual usage >>> from kwutil.util_progress import ProgressManager >>> from kwutil import util_progress >>> import time >>> pman = ProgressManager() >>> pman.start() >>> task1 = pman.progiter(desc='task1', total=100) >>> task2 = pman.progiter(desc='task2') >>> for i in range(100): >>> task1.update() >>> task2.update(2) >>> time.sleep(0.001) >>> ProgressManager.stopall() Example: >>> # Demo manual usage (progiter backend) >>> from kwutil.util_progress import ProgressManager >>> from kwutil import util_progress >>> import time >>> pman = ProgressManager(backend='progiter', adjust=0, freq=1) >>> pman.start() >>> task1 = pman.progiter(desc='task1', total=12) >>> task2 = pman.progiter(desc='task2') >>> task1.update() >>> task2.update() >>> for i in range(10): >>> time.sleep(0.001) >>> task1.update() >>> time.sleep(0.001) >>> task2.update(2) >>> ProgressManager.stopall() """ def __init__(self, backend='rich', **kwargs): LIVE_PROGRESS_MANAGERS[id(self)] = self # TODO: check if we are being tee-d and use progiter instead if we are. if PROGITER_NOTHREAD: backend = 'progiter' if backend == 'rich': self.backend = _RichProgIterManager(**kwargs) self.is_rich = True elif backend == 'progiter': self.backend = _ProgIterManager(**kwargs) self.is_rich = False else: raise KeyError(backend)
[docs] def progiter(self, *args, **kw): prog = self.backend.progiter(*args, **kw) return prog
# Alias ProgIter = progiter
[docs] def update_info(self, text): self.backend.update_info(text)
[docs] def start(self): self.backend.start()
[docs] def stop(self, *args, **kwargs): self.backend.stop(*args, **kwargs)
@property def _is_main_manager(self): return self.backend._is_main_manager
[docs] @classmethod def stopall(self): """ Stop all background progress threads (likely only 1 exists) Ignore: from kwutil import util_progress util_progress.ProgressManager.stopall() """ for pman in LIVE_PROGRESS_MANAGERS.values(): pman.stop()
[docs] def _progman_test_multiple_managers(): """ Note: We want the user to be able let the user create multiple ProgressManagers, but we are only allowed one live display, therefore the first ProgressManager needs to becomes the "main" manager and all others will be secondary. Getting this exactly right may require a refactor and locks, but this tests that our simple implementation works well enough. CommandLine: xdoctest -m kwutil.util_progress _progman_test_multiple_managers Example: >>> # xdoctest: +REQUIRES(module:rich) >>> _progman_test_multiple_managers() """ from kwutil import util_progress import time from rich import print pman1 = util_progress.ProgressManager() factor = 0.001 with pman1: print(f'pman1._is_main_manager={pman1._is_main_manager}') print('Print before loop 1') for i in pman1.progiter(range(100), desc='PMAN(1) Iter(1)'): time.sleep(0.01 * factor) print('Print after loop 1 #1') print('Print after loop 1 #2') print('Print after loop 1 #3') print('Print before loop 2') for i in pman1.progiter(range(100), desc='PMAN(1) Iter(2)'): time.sleep(0.009 * factor) print('Print after loop 2 #1') print('Print after loop 2 #2') print('Print after loop 2 #3') pman2 = util_progress.ProgressManager() print(f'pman2._is_main_manager={pman2._is_main_manager}') for idx in range(2): for i in pman2.progiter(range(100), desc=f'PMAN(2) Iter({idx})'): time.sleep(0.008 * factor) print(f'pman2._is_main_manager={pman2._is_main_manager}') print(f'pman1._is_main_manager={pman1._is_main_manager}') print(f'pman1._is_main_manager={pman1._is_main_manager}') pman3 = util_progress.ProgressManager() print(f'pman3._is_main_manager={pman3._is_main_manager}') with pman3: print(f'pman3._is_main_manager={pman3._is_main_manager}') for idx in range(3): for i in pman2.progiter(range(100), desc=f'PMAN(3) Iter({idx})'): time.sleep(0.005 * factor) print(f'pman3._is_main_manager={pman3._is_main_manager}') print(f'pman1._is_main_manager={pman1._is_main_manager}')
[docs] def _progman_test_multiple_managers_tree(): """" CommandLine: xdoctest -m kwutil.util_progress _progman_test_multiple_managers_tree Example: >>> # xdoctest: +REQUIRES(module:rich) >>> _progman_test_multiple_managers_tree() """ import time from kwutil import util_progress sleep_time = 0.001 * 0.001 def _nested_loop(max_depth=0): pman = util_progress.ProgressManager() with pman: prog1 = pman.progiter(range(100), desc=f'P1: max_depth={max_depth}') for i in zip(prog1, range(50)): time.sleep(sleep_time) yield None if max_depth > 0: yield from _nested_loop(max_depth=max_depth - 1) for i in zip(prog1, range(50)): time.sleep(sleep_time) yield None if max_depth > 0: yield from _nested_loop(max_depth=max_depth - 1) prog2 = pman.progiter(range(91), desc=f'P2: max_depth={max_depth}') for i in prog2: time.sleep(sleep_time) yield None list(_nested_loop(max_depth=3))