r"""
POC for a better ProgIter with rich support
CommandLine:
DEMO_PROGRESS=1 xdoctest -m kwutil.util_progress __doc__:0
Example:
>>> # xdoctest: +REQUIRES(env:DEMO_PROGRESS)
>>> from kwutil.util_progress import ProgressManager
>>> import time
>>> delay = 0.05
>>> # Can use plain progiter or rich
>>> # The usecase for plain progiter is when threads / live output
>>> # is not desirable and you just want plain stdout progress
>>> for backend in ['rich', 'progiter']:
>>> print(f'\n\n -- starting {backend} --\n\n')
>>> pman = ProgressManager(backend=backend)
>>> with pman:
>>> pbar1 = pman.progiter(range(5), desc='outer loop', verbose=3)
>>> for i in pbar1:
>>> pbar1.set_postfix(f'\\[step {i}]', refresh=False)
>>> for j1 in pman.progiter(range(100), desc=f'prepare inner loop {i}', transient=True):
>>> time.sleep(delay / 3)
>>> for j2 in pman.progiter(range(100), desc=f'execute inner loop {i}'):
>>> time.sleep(delay)
>>> for j3 in pman.progiter(range(100), desc=f'shutdown inner loop {i}', transient=True):
>>> time.sleep(delay / 3)
"""
import ubelt as ub
import os
from progiter import ProgIter
import weakref
# If truthy disable all threaded rich options
PROGITER_NOTHREAD = os.environ.get('PROGITER_NOTHREAD', 'auto')
if PROGITER_NOTHREAD == 'auto':
# Use rich outside of slurm
PROGITER_NOTHREAD = os.environ.get('SLURM_JOBID', '')
else:
PROGITER_NOTHREAD = bool(PROGITER_NOTHREAD)
LIVE_PROGRESS_MANAGERS = weakref.WeakValueDictionary()
[docs]
class ProgIter2(ProgIter):
[docs]
def _set_manager(self, manager):
self.manager = weakref.proxy(manager)
[docs]
def update_info(self, text):
self.info_text = text
info_text = getattr(self, 'info_text', None)
self.ensure_newline()
if info_text is not None:
# if self._cursor_at_newline:
print('+ --- Info --- +')
print(info_text)
print('+ ------------ +')
# self.display_message()
[docs]
def update(self, n=1):
if not self.started:
self.begin()
manager = getattr(self, 'manager', None)
if manager is not None:
# TODO: vet this, not quite working. The idea is if part of a
# progress manager and this is not the "tail" progiter (i.e. it
# isn't the only progiter allowed to be clearing newlines) then we
# should ensure that the progiter that is allowed to clear the
# newline has its ensure_newline method called so we can actually
# write our progress without getting clobbered.
if len(manager.prog_iters) and manager.prog_iters[-1] is not self:
manager.prog_iters[-1].ensure_newline()
super().update(n=n)
[docs]
def display_message(self):
super().display_message()
set_postfix = ProgIter.set_postfix_str
[docs]
class RichProgIter:
"""
Ducktypes ProgIter
TODO: enhance with the ability to have a update info panel that removes
the circular reference
Ignore:
from kwutil import util_progress
util_progress.LIVE_PROGRESS_MANAGERS
print(len(util_progress.LIVE_PROGRESS_MANAGERS))
for v in util_progress.LIVE_PROGRESS_MANAGERS.values():
...
from kwutil.util_progress import * # NOQA
for _ in RichProgIter(range(1000)):
...
"""
def __init__(self, iterable=None, desc=None, total=None, freq=1, initial=0,
eta_window=64, clearline=True, adjust=True, time_thresh=2.0,
show_times=True, show_wall=False, enabled=True, verbose=None,
stream=None, chunksize=None, rel_adjust_limit=4.0,
transient=False, manager=None, spinner=False, **kwargs):
unhandled = {
'eta_window', 'clearline', 'adjust', 'time_thresh', 'show_times',
'show_wall', 'stream', 'chunksize', 'rel_adjust_limit',
}
kwargs = ub.udict(kwargs) - unhandled
if manager is None:
manager = _RichProgIterManager()
self._self_managed = True
else:
manager = weakref.proxy(manager)
self._self_managed = False
if verbose is None:
verbose = 1
if not verbose:
enabled = False
self.manager = manager
self.iterable = iterable
self.enabled = enabled
self.spinner = spinner
if total is None:
try:
total = len(iterable)
except Exception:
...
self.total = total
self.desc = desc
addtask_kw = {}
if desc is not None:
addtask_kw['description'] = desc
else:
addtask_kw['description'] = ''
addtask_kw['total'] = self.total
if self.enabled:
self.task_id = self.manager.rich_progress.add_task(**addtask_kw)
else:
self.task_id = None
self.transient = transient
self.extra = None
[docs]
def start(self):
return self.begin()
[docs]
def stop(self):
return self.end()
[docs]
def begin(self):
if self._self_managed:
self.manager.start()
[docs]
def end(self):
if self.transient:
self.remove()
if self._self_managed:
self.manager.stop()
[docs]
def update(self, n=1):
if self.enabled:
self.manager.rich_progress.update(self.task_id, advance=n)
step = update
def __iter__(self):
if not self.enabled:
yield from self.iterable
else:
self.start()
for item in self.iterable:
yield item
self.manager.rich_progress.update(self.task_id, advance=1)
if self.total is None:
task = self.manager.rich_progress._tasks[self.task_id]
self.manager.rich_progress.update(self.task_id, total=task.completed)
self.stop()
[docs]
def remove(self):
"""
Remove this progress task from its rich manager
"""
if self.enabled:
self.manager.rich_progress.remove_task(self.task_id)
[docs]
def update_info(self, text):
if self.enabled:
# FIXME: remove circular reference
self.manager.update_info(text)
[docs]
def ensure_newline(self):
...
[docs]
def set_postfix_str(self, text, refresh=True):
self.extra = text
parts = [self.desc] if self.desc is not None else []
if self.extra is not None:
parts.append(self.extra)
if self.enabled:
description = ' '.join(parts)
self.manager.rich_progress.update(
self.task_id, description=description, refresh=refresh)
set_postfix = set_postfix_str
set_extra = set_postfix_str
[docs]
class BaseProgIterManager:
[docs]
def new(self, *args, **kw):
return self.progiter(*args, **kw)
[docs]
def __call__(self, *args, **kw):
return self.progiter(*args, **kw)
[docs]
def start(self):
# TODO: be re-entrant? keep track of number of enters and only stop
# when exits get to zero?
return self
[docs]
def begin(self):
return self.start()
[docs]
def stop(self, **kwargs):
...
def __enter__(self):
return self.start()
def __exit__(self, exc_type=None, exc_val=None, exc_tb=None):
self.stop(exc_type=exc_type, exc_val=exc_val, exc_tb=exc_tb)
# Global var
MAIN_RICH_PMAN = None
[docs]
class _RichProgIterManager(BaseProgIterManager):
"""
rich specific backend.
Example:
>>> # Test verbose = 0
>>> # xdoctest: +REQUIRES(module:rich)
>>> from kwutil.util_progress import ProgressManager
>>> from kwutil import util_progress
>>> import time
>>> pman = ProgressManager(backend='rich', verbose=0)
>>> with pman:
>>> for i in pman.progiter(range(10), desc='should not show1'):
>>> ...
>>> for i in pman.progiter(range(10), verbose=1, desc='should show2'):
>>> ...
>>> for i in pman.progiter(range(10), verbose=0, desc='should not show3'):
>>> ...
"""
def __init__(self, **kwargs):
self.prog_iters = []
self.enabled = kwargs.pop('enabled', True)
self.info_panel = None
self.rich_progress = None
self._is_main_manager = None
self.setup_rich()
self._active = False
self.default_progkw = dict({
'verbose': kwargs.pop('verbose', 1),
})
self.default_progkw.update(kwargs)
# Can we make this work?
# def __del__(self):
# if self._active:
# self.stop()
[docs]
def progiter(self, iterable=None, total=None, desc=None, transient=False, spinner=False, verbose='auto', **kw):
if not self._active:
self.start()
# Fixme remove circular ref
# self.rich_progress.pman = self
progkw = self.default_progkw.copy()
progkw.update(kw)
progkw['verbose'] = verbose
if verbose == 'auto':
progkw['verbose'] = self.default_progkw.get('verbose', 1)
prog = RichProgIter(
manager=self, iterable=iterable, total=total, desc=desc,
transient=transient, spinner=spinner, **progkw)
self.prog_iters.append(prog)
return prog
# Alias for simpler drop-in-replacements
ProgIter = progiter
[docs]
def setup_rich(self):
global MAIN_RICH_PMAN
import rich
import rich.progress
from rich.console import Group
from rich.live import Live
from rich.progress import BarColumn, TextColumn
from rich.progress import Progress as richProgress
from rich.progress import SpinnerColumn
from rich.progress import ProgressColumn, Text
# from rich.style import Style
if MAIN_RICH_PMAN is not None:
self._is_main_manager = False
self.live_context = None
self.rich_progress = MAIN_RICH_PMAN.rich_progress
self.progress_group = MAIN_RICH_PMAN.progress_group
else:
# FIXME: instead of having a concept of a "main-manager", we should
# use reference counting to keep the rich backend alive until there
# are no managers referencing it.
self._is_main_manager = True
class ProgressRateColumn(ProgressColumn):
"""Renders human readable transfer speed."""
def render(self, task) -> Text:
"""Show progress speed speed."""
_iters_per_second = task.finished_speed or task.speed
if _iters_per_second is not None:
rate_format = '4.2f' if _iters_per_second > .001 else 'g'
fmt = '{:' + rate_format + '} Hz'
text = fmt.format(_iters_per_second)
else:
text = '?'
# style = Style(color="red")
style = 'progress.data.speed'
renderable = Text(text, style=style)
return renderable
self.rich_progress = richProgress(
TextColumn("{task.description}"),
SpinnerColumn(),
BarColumn(),
"[progress.percentage]{task.percentage:>3.0f}%",
rich.progress.MofNCompleteColumn(),
# rich.progress.TransferSpeedColumn(),
ProgressRateColumn(),
'eta',
rich.progress.TimeRemainingColumn(),
'total',
rich.progress.TimeElapsedColumn(),
)
self.info_panel = None
self.progress_group = Group(
# self.info_panel,
self.rich_progress,
)
self.live_context = Live(self.progress_group)
MAIN_RICH_PMAN = self
[docs]
def update_info(self, text):
from rich.panel import Panel
if self.info_panel is None:
self.info_panel = Panel(text)
self.progress_group.renderables.insert(0, self.info_panel)
else:
self.info_panel.renderable = text
[docs]
def start(self):
if self.enabled and not self._active:
self._active = True
if self._is_main_manager:
return self.live_context.__enter__()
[docs]
def stop(self, **kw):
if self.enabled and self._active:
if not kw:
kw['exc_type'] = None
kw['exc_val'] = None
kw['exc_tb'] = None
if self._is_main_manager:
global MAIN_RICH_PMAN
MAIN_RICH_PMAN = None
ret = self.live_context.__exit__(**kw)
self._is_main_manager = False
else:
ret = None
self._active = False
return ret
[docs]
class _ProgIterManager(BaseProgIterManager):
"""
progiter specific backend
"""
def __init__(self, **kwargs):
self.enabled = kwargs.get('enabled', True)
# Default arguments for new progiters
self.default_progkw = ub.udict({
'time_thresh': 2.0,
}) | ub.udict(kwargs)
self.prog_iters = []
[docs]
def progiter(self, iterable=None, total=None, desc=None, transient=False, spinner=False, verbose='auto', **kw):
progkw = self.default_progkw.copy()
progkw.update(kw)
progkw['verbose'] = verbose
if verbose == 'auto':
progkw['verbose'] = self.default_progkw.get('verbose', 1)
if True:
# Change all other - now outer - progiters to verbose=3 mode
for other in self.prog_iters:
other.ensure_newline()
if other.enabled:
other.clearline = False
other.adjust = False
other.freq = 1
prog = ProgIter2(iterable, total=total, desc=desc, **progkw)
prog._set_manager(self)
self.prog_iters.append(prog)
return prog
[docs]
def update_info(self, text):
if len(self.prog_iters) == 0:
# if self._cursor_at_newline:
print('+ --- Info --- +')
print(text)
print('+ ------------ +')
else:
self.prog_iters[0].update_info(text)
[docs]
class ProgressManager(BaseProgIterManager):
r"""
A progress manager.
Manage multiple progress bars, either with rich or ProgIter.
CommandLine:
xdoctest -m kwutil.util_progress ProgressManager:0
xdoctest -m kwutil.util_progress ProgressManager:1
xdoctest -m kwutil.util_progress ProgressManager:2
Example:
>>> from kwutil.util_progress import ProgressManager
>>> from progiter import progiter
>>> # Can use plain progiter or rich
>>> # The usecase for plain progiter is when threads / live output
>>> # is not desirable and you just want plain stdout progress
>>> pman = ProgressManager(backend='progiter')
>>> with pman:
>>> oprog = pman.progiter(range(20), desc='outer loop', verbose=3)
>>> for i in oprog:
>>> oprog.set_postfix(f'Doing step {i}', refresh=False)
>>> for i in pman.progiter(range(100), desc=f'inner loop {i}'):
>>> pass
>>> # xdoctest: +REQUIRES(module:rich)
>>> self = pman = ProgressManager(backend='rich')
>>> pman = ProgressManager(backend='rich')
>>> with pman:
>>> oprog = pman.progiter(range(20), desc='outer loop', verbose=3)
>>> for i in oprog:
>>> oprog.set_postfix(f'Doing step {i}', refresh=False)
>>> for i in pman.progiter(range(100), desc=f'inner loop {i}'):
>>> pass
Example:
>>> # xdoctest: +REQUIRES(module:rich)
>>> # A fairly complex example
>>> from kwutil.util_progress import ProgressManager
>>> import time
>>> delay = 0.00005
>>> N_inner = 300
>>> N_outer = 11
>>> self = pman = ProgressManager(backend='rich')
>>> with pman:
>>> oprog = pman(range(N_outer), desc='outer loop')
>>> for i in oprog:
>>> if i > 7:
>>> self.update_info(f'The info panel gives detailed updates\nWe are now at step {i}\nWe are just about done now')
>>> elif i > 5:
>>> self.update_info(f'The info panel gives detailed updates\nWe are now at step {i}')
>>> oprog.set_postfix(f'Doing step {i}')
>>> N = 1000
>>> for j in pman(iter(range(N_inner)), total=None if i % 2 == 0 else N_inner, desc=f'inner loop {i}', transient=i < 4):
>>> time.sleep(delay)
Example:
>>> # xdoctest: +REQUIRES(module:rich)
>>> # Test complex example over a grid of parameters
>>> from kwutil.util_progress import ProgressManager, ProgIter2
>>> import time
>>> delay = 0.000001
>>> N_inner = 100
>>> N_outer = 11
>>> basis = {
>>> 'with_info': [0, 1],
>>> 'backend': ['progiter', 'rich'],
>>> 'enabled': [0, 1],
>>> #'with_info': [1],
>>> }
>>> grid = list(ub.named_product(basis))
>>> grid_prog = ProgIter2(grid, desc='Test cases over grid', verbose=3)
>>> grid_prog.update_info('Here we go')
>>> for item in grid:
>>> grid_prog.ensure_newline()
>>> grid_prog.update_info(f'Running grid test {ub.urepr(item, nl=1)}')
>>> print('\n\n')
>>> self = ProgressManager(backend=item['backend'], enabled=item['enabled'])
>>> with self:
>>> outer_prog = self.progiter(range(N_outer), desc='outer loop')
>>> for i in outer_prog:
>>> if item['with_info']:
>>> if i > 7:
>>> outer_prog.update_info(f'The info panel gives detailed updates\nWe are now at step {i}\nWe are just about done now')
>>> elif i > 5:
>>> outer_prog.update_info(f'The info panel gives detailed updates\nWe are now at step {i}')
>>> outer_prog.set_postfix(f'Doing step {i}')
>>> inner_kwargs = dict(
>>> total=None if i % 2 == 0 else N_inner,
>>> transient=i < 4,
>>> time_thresh=delay * 2.3,
>>> desc=f'inner loop {i}',
>>> )
>>> for j in self.progiter(iter(range(N_inner)), **inner_kwargs):
>>> time.sleep(delay)
>>> grid_prog.update_info(f'Finished test item')
Example:
>>> # xdoctest: +REQUIRES(module:rich)
>>> # Demo manual usage
>>> from kwutil.util_progress import ProgressManager
>>> from kwutil import util_progress
>>> import time
>>> pman = ProgressManager()
>>> pman.start()
>>> task1 = pman.progiter(desc='task1', total=100)
>>> task2 = pman.progiter(desc='task2')
>>> for i in range(100):
>>> task1.update()
>>> task2.update(2)
>>> time.sleep(0.001)
>>> ProgressManager.stopall()
Example:
>>> # Demo manual usage (progiter backend)
>>> from kwutil.util_progress import ProgressManager
>>> from kwutil import util_progress
>>> import time
>>> pman = ProgressManager(backend='progiter', adjust=0, freq=1)
>>> pman.start()
>>> task1 = pman.progiter(desc='task1', total=12)
>>> task2 = pman.progiter(desc='task2')
>>> task1.update()
>>> task2.update()
>>> for i in range(10):
>>> time.sleep(0.001)
>>> task1.update()
>>> time.sleep(0.001)
>>> task2.update(2)
>>> ProgressManager.stopall()
"""
def __init__(self, backend='rich', **kwargs):
LIVE_PROGRESS_MANAGERS[id(self)] = self
# TODO: check if we are being tee-d and use progiter instead if we are.
if PROGITER_NOTHREAD:
backend = 'progiter'
if backend == 'rich':
self.backend = _RichProgIterManager(**kwargs)
self.is_rich = True
elif backend == 'progiter':
self.backend = _ProgIterManager(**kwargs)
self.is_rich = False
else:
raise KeyError(backend)
[docs]
def progiter(self, *args, **kw):
prog = self.backend.progiter(*args, **kw)
return prog
# Alias
ProgIter = progiter
[docs]
def update_info(self, text):
self.backend.update_info(text)
[docs]
def start(self):
self.backend.start()
[docs]
def stop(self, *args, **kwargs):
self.backend.stop(*args, **kwargs)
@property
def _is_main_manager(self):
return self.backend._is_main_manager
[docs]
@classmethod
def stopall(self):
"""
Stop all background progress threads (likely only 1 exists)
Ignore:
from kwutil import util_progress
util_progress.ProgressManager.stopall()
"""
for pman in LIVE_PROGRESS_MANAGERS.values():
pman.stop()
[docs]
def _progman_test_multiple_managers():
"""
Note:
We want the user to be able let the user create multiple
ProgressManagers, but we are only allowed one live display, therefore
the first ProgressManager needs to becomes the "main" manager and all
others will be secondary. Getting this exactly right may require a
refactor and locks, but this tests that our simple implementation works
well enough.
CommandLine:
xdoctest -m kwutil.util_progress _progman_test_multiple_managers
Example:
>>> # xdoctest: +REQUIRES(module:rich)
>>> _progman_test_multiple_managers()
"""
from kwutil import util_progress
import time
from rich import print
pman1 = util_progress.ProgressManager()
factor = 0.001
with pman1:
print(f'pman1._is_main_manager={pman1._is_main_manager}')
print('Print before loop 1')
for i in pman1.progiter(range(100), desc='PMAN(1) Iter(1)'):
time.sleep(0.01 * factor)
print('Print after loop 1 #1')
print('Print after loop 1 #2')
print('Print after loop 1 #3')
print('Print before loop 2')
for i in pman1.progiter(range(100), desc='PMAN(1) Iter(2)'):
time.sleep(0.009 * factor)
print('Print after loop 2 #1')
print('Print after loop 2 #2')
print('Print after loop 2 #3')
pman2 = util_progress.ProgressManager()
print(f'pman2._is_main_manager={pman2._is_main_manager}')
for idx in range(2):
for i in pman2.progiter(range(100), desc=f'PMAN(2) Iter({idx})'):
time.sleep(0.008 * factor)
print(f'pman2._is_main_manager={pman2._is_main_manager}')
print(f'pman1._is_main_manager={pman1._is_main_manager}')
print(f'pman1._is_main_manager={pman1._is_main_manager}')
pman3 = util_progress.ProgressManager()
print(f'pman3._is_main_manager={pman3._is_main_manager}')
with pman3:
print(f'pman3._is_main_manager={pman3._is_main_manager}')
for idx in range(3):
for i in pman2.progiter(range(100), desc=f'PMAN(3) Iter({idx})'):
time.sleep(0.005 * factor)
print(f'pman3._is_main_manager={pman3._is_main_manager}')
print(f'pman1._is_main_manager={pman1._is_main_manager}')
[docs]
def _progman_test_multiple_managers_tree():
""""
CommandLine:
xdoctest -m kwutil.util_progress _progman_test_multiple_managers_tree
Example:
>>> # xdoctest: +REQUIRES(module:rich)
>>> _progman_test_multiple_managers_tree()
"""
import time
from kwutil import util_progress
sleep_time = 0.001 * 0.001
def _nested_loop(max_depth=0):
pman = util_progress.ProgressManager()
with pman:
prog1 = pman.progiter(range(100), desc=f'P1: max_depth={max_depth}')
for i in zip(prog1, range(50)):
time.sleep(sleep_time)
yield None
if max_depth > 0:
yield from _nested_loop(max_depth=max_depth - 1)
for i in zip(prog1, range(50)):
time.sleep(sleep_time)
yield None
if max_depth > 0:
yield from _nested_loop(max_depth=max_depth - 1)
prog2 = pman.progiter(range(91), desc=f'P2: max_depth={max_depth}')
for i in prog2:
time.sleep(sleep_time)
yield None
list(_nested_loop(max_depth=3))