kwutil package

Submodules

Module contents

The kwutil Module

Read the docs

https://kwutil.readthedocs.io

Gitlab (main)

https://gitlab.kitware.com/computer-vision/kwutil

Github (mirror)

https://github.com/Kitware/kwutil

Pypi

https://pypi.org/project/kwutil

The Kitware utility module.

This module is for small, pure-python utility functions. Dependencies are allowed, but they must be small and highly standard packages (e.g. rich, psutil, ruamel.yaml).

class kwutil.CopyManager(workers=0, mode='thread', eager=False, overwrite=False, skip_existing=False)[source]

Bases: _FilesystemOperationManager

Helper to execute multiple copy operations on a local filesystem.

Notes

It would be nice for this to support an rsync backend that could sync at the src/dst pair level. Not sure if this works.

References

https://unix.stackexchange.com/questions/133995/rsyncing-multiple-src-dest-pairs https://serverfault.com/questions/163859/using-rsync-as-a-queue https://unix.stackexchange.com/questions/602606/rsync-source-list-to-destination-list

Todo

  • [ ] Add optional check that all src paths exist

  • [ ] Add optional check that all dst paths do not exist (unless overwrite=True or skip_existing=True)

  • [ ] Add optional check that that no dst path is or is inside of a src

    dpath (would make things ambiguous), the operation graph should be bipartite.

  • [ ] Add backend that uses a fast protocol like rsync (or write one in Rust)

Example

>>> import ubelt as ub
>>> from kwutil.fsops_managers import CopyManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'copy_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> dst_dpath = (dpath / 'dst').delete()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>>     fpath.touch()
>>> # To use a copy manager, iterate through your source and
>>> # destination paths and submit them.
>>> copyman = CopyManager(workers=0)
>>> # by default it will do nothing
>>> # unless you specify eager=True or explicitly call run.
>>> for fpath in src_fpaths:
>>>     dst = fpath.augment(dpath=dst_dpath)
>>>     copyman.submit(fpath, dst)
>>> report = copyman.report()
>>> print(f'report = {ub.urepr(report, nl=1)}')
>>> copyman.run()

Example

>>> import ubelt as ub
>>> from kwutil.fsops_managers import CopyManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'copy_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> dst_dpath = (dpath / 'dst').delete()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>>     fpath.touch()
>>> copyman = CopyManager(workers=0)
>>> for fpath in src_fpaths:
>>>     dst = fpath.augment(dpath=dst_dpath)
>>>     copyman.submit(fpath, dst)
>>> copyman.run()
>>> assert len(dst_dpath.ls()) == len(src_dpath.ls())
>>> copyman = CopyManager(workers=0)
>>> for fpath in src_fpaths:
>>>     dst = fpath.augment(dpath=dst_dpath)
>>>     copyman.submit(fpath, dst)
>>> import pytest
>>> with pytest.raises(FileExistsError):
>>>     copyman.run()
>>> copyman = CopyManager(workers=0)
>>> for fpath in src_fpaths:
>>>     dst = fpath.augment(dpath=dst_dpath)
>>>     copyman.submit(fpath, dst, skip_existing=True)
>>> copyman.run()
Parameters:
  • workers (int) – number of parallel workers to use

  • mode (str) – thread, process, or serial

  • eager (bool) – if True starts copying as soon as a job is submitted, otherwise it wait until run is called.

  • overwrite (bool) – if True will overwrite the file if it exists, otherwise it will error unless skip_existing is True. Defaults to False.

  • skip_existing (bool) – if jobs where the destination already exists should be skipped by default. Default=False

_operation_name = 'copy'
_unsubmitted_report()[source]

Build a report on the unsubmitted jobs.

_worker_func(dst, skip_existing, overwrite, follow_file_symlinks, follow_dir_symlinks, meta)
Parameters:
  • str (PathLike | str)

  • dst (PathLike | str)

  • overwrite (bool)

  • skip_existing (bool)

report()[source]
submit(src, dst, skip_existing=False, overwrite=None, follow_file_symlinks=False, follow_dir_symlinks=False, meta='stats')[source]
Parameters:
  • src (str | PathLike) – source file or directory

  • dst (str | PathLike) – destination file or directory

  • skip_existing (bool | None) – if jobs where the destination already exists should be skipped by default. If None, then uses the class default. Default=None

  • overwrite (bool | None) – if True will overwrite the file if it exists, otherwise it will error unless skip_existing is True. If None, then uses the class default. Default=None.

  • follow_file_symlinks (bool) – If True and src is a link, the link will be resolved before it is copied (i.e. the data is duplicated), otherwise just the link itself will be copied.

  • follow_dir_symlinks (bool) – if True when src is a directory and contains symlinks to other directories, the contents of the linked data are copied, otherwise when False only the link itself is copied.

  • meta (str | None) – Indicates what metadata bits to copy. This can be ‘stats’ which tries to copy all metadata (i.e. like shutil.copy2()), ‘mode’ which copies just the permission bits (i.e. like shutil.copy()), or None, which ignores all metadata (i.e. like shutil.copyfile()).

class kwutil.DeleteManager(workers=0, mode='thread', eager=False, overwrite=False, skip_existing=False)[source]

Bases: _FilesystemOperationManager

Helper to execute multiple delete operations on a local filesystem.

Example

>>> import ubelt as ub
>>> from kwutil.fsops_managers import DeleteManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'delete_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>>     fpath.touch()
>>> deleteman = DeleteManager(workers=0, eager=False)
>>> for fpath in src_fpaths:
>>>     deleteman.submit(fpath)
>>> assert len(src_dpath.ls()) == 10
>>> deleteman.run()
>>> assert len(src_dpath.ls()) == 0
Parameters:
  • workers (int) – number of parallel workers to use

  • mode (str) – thread, process, or serial

  • eager (bool) – if True starts copying as soon as a job is submitted, otherwise it wait until run is called.

  • overwrite (bool) – if True will overwrite the file if it exists, otherwise it will error unless skip_existing is True. Defaults to False.

  • skip_existing (bool) – if jobs where the destination already exists should be skipped by default. Default=False

_operation_name = 'delete'
_worker_func(verbose=False)

Removes a file or recursively removes a directory. If a path does not exist, then this is does nothing.

Parameters:
  • path (str | PathLike) – file or directory to remove

  • verbose (bool) – if True prints what is being done

SeeAlso:
send2trash -

A cross-platform Python package for sending files to the trash instead of irreversibly deleting them.

ubelt.util_path.Path.delete()

Notes

This can call os.unlink(), os.rmdir(), or shutil.rmtree(), depending on what path references on the filesystem. (On windows may also call a custom ubelt._win32_links._win32_rmtree()).

Example

>>> import ubelt as ub
>>> from os.path import join
>>> base = ub.Path.appdir('ubelt', 'delete_test').ensuredir()
>>> dpath1 = ub.ensuredir(join(base, 'dir'))
>>> ub.ensuredir(join(base, 'dir', 'subdir'))
>>> ub.touch(join(base, 'dir', 'to_remove1.txt'))
>>> fpath1 = join(base, 'dir', 'subdir', 'to_remove3.txt')
>>> fpath2 = join(base, 'dir', 'subdir', 'to_remove2.txt')
>>> ub.touch(fpath1)
>>> ub.touch(fpath2)
>>> assert all(map(exists, (dpath1, fpath1, fpath2)))
>>> ub.delete(fpath1)
>>> assert all(map(exists, (dpath1, fpath2)))
>>> assert not exists(fpath1)
>>> ub.delete(dpath1)
>>> assert not any(map(exists, (dpath1, fpath1, fpath2)))

Example

>>> import ubelt as ub
>>> from os.path import exists, join
>>> dpath = ub.Path.appdir('ubelt', 'delete_test2').ensuredir()
>>> dpath1 = ub.ensuredir(join(dpath, 'dir'))
>>> fpath1 = ub.touch(join(dpath1, 'to_remove.txt'))
>>> assert exists(fpath1)
>>> ub.delete(dpath)
>>> assert not exists(fpath1)
submit(path)[source]
Parameters:

path (str | PathLike) – path to delete

submit_many(paths)[source]
class kwutil.Hardware[source]

Bases: object

TODO: class level namespace

References

https://pypi.org/project/hardware/

Example

>>> # xdoctest: +SKIP
>>> import kwutil
>>> kwutil.Hardware.report()
static cpus()[source]
static disks()[source]
static gpus()[source]
static memory()[source]
static motherboard()[source]
static networking()[source]
static peripherals()[source]
static report()[source]

Build a high level hardware report

class kwutil.Json[source]

Bases: object

Similar to kwutil.Yaml, the Json class provides a set of helpers to make working with json easier.

Example

>>> from kwutil.util_json import Json
>>> import ubelt as ub
>>> unserializable_data = {
>>>     'a': 'hello world',
>>>     'b': ub.udict({'a': 3}),
>>>     'c': ub.Path('a/path/object'),
>>> }
>>> data = Json.ensure_serializable(unserializable_data)
>>> text1 = Json.dumps(data, backend='stdlib')
>>> # Coerce is idempotent and resolves the input to nested Python
>>> # structures.
>>> resolved1 = Json.coerce(data)
>>> resolved2 = Json.coerce(text1)
>>> resolved3 = Json.coerce(resolved2)
>>> assert resolved1 == resolved2 == resolved3 == data
>>> # with stdlib
>>> data2 = Json.loads(text1)
>>> assert data2 == data
>>> # with ujson
>>> # xdoctest: +REQUIRES(module:ujson)
>>> data2 = Json.loads(text1, backend='ujson')
>>> assert data2 == data
classmethod coerce(data, backend='stdlib', path_policy='existing_file_with_extension')[source]

Example

>>> from kwutil.util_json import Json
>>> import ubelt as ub
>>> Json.coerce('[1, 2, 3]')
[1, 2, 3]
>>> fpath = ub.Path.appdir('kwutil/tests/util_json').ensuredir() / 'file.json'
>>> fpath.write_text(Json.dumps([4, 5, 6]))
>>> Json.coerce(fpath)
[4, 5, 6]
>>> Json.coerce(str(fpath))
[4, 5, 6]
>>> dict(Json.coerce('{"a": "b", "c": "d"}'))
{'a': 'b', 'c': 'd'}
>>> Json.coerce(None)
None
classmethod debug_unserializable(data, msg='')[source]

Raises an exception if the data is not serializable and prints information about it. This is a thin wrapper around Json.find_unserializable().

Example

>>> import kwutil
>>> import ubelt as ub
>>> data = {
>>>     'a': 1,
>>>     'b': 2,
>>>     'c': ub.Path('/pathlib/object')
>>> }
>>> try:
>>>     kwutil.Json.debug_unserializable(data, 'obj had non-json data at: ')
>>> except Exception as ex:
>>>     print(f'Exception: {ex}')
Exception: obj had non-json data at: [
    {'loc': ['c'], 'data': Path('/pathlib/object')},
]
static dump(data, fp, backend='stdlib', **kwargs)[source]

Write json data to a file with a chosen backend.

Parameters:
  • data (dict | list | int | float | str) – json serializable data.

  • fp (PathLike | IO) – Where to write the data

  • backend (str) – stdlib, ujson, or orjson

  • **kwargs – additional arguments to pass to the specific backend.

static dumps(data, backend='stdlib', **kwargs)[source]

Convert json data to text with a chosen backend.

Parameters:
  • data (dict | list | int | float | str) – json serializable data.

  • backend (str) – stdlib, ujson, or orjson

  • **kwargs – additional arguments to pass to the specific backend.

classmethod ensure_serializable(dict_, normalize_containers=False, verbose=0, unhandled_policy='keep')[source]

Example

>>> import kwutil
>>> import pathlib
>>> data = {
>>>     'a': 1,
>>>     'b': 2,
>>>     'c': pathlib.Path('/pathlib/object')
>>> }
>>> results = kwutil.Json.ensure_serializable(data)
>>> print(f'results = {ub.urepr(results, nl=1)}')
results = {
    'a': 1,
    'b': 2,
    'c': '/pathlib/object',
}
classmethod find_unserializable(data, quickcheck=False)[source]

Example

>>> import kwutil
>>> import ubelt as ub
>>> data = {
>>>     'a': 1,
>>>     'b': 2,
>>>     'c': ub.Path('/pathlib/object')
>>> }
>>> results = list(kwutil.Json.find_unserializable(data))
>>> print(f'results = {ub.urepr(results, nl=1)}')
results = [
    {'loc': ['c'], 'data': Path('/pathlib/object')},
]
static load(file, backend='stdlib')[source]
static loads(text, backend='stdlib')[source]
class kwutil.MoveManager(workers=0, mode='thread', eager=False)[source]

Bases: _FilesystemOperationManager

Helper to execute multiple move operations on a local filesystem.

Todo

  • [ ] Add optional check that all src paths exist

  • [ ] Add optional check that all dst paths do not exist

  • [ ] Add optional check that that no dst path is or is inside of a src

    dpath (would make things ambiguous), the operation graph should be bipartite.

Example

>>> import ubelt as ub
>>> from kwutil.fsops_managers import MoveManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'move_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> dst_dpath = (dpath / 'dst').delete()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>>     fpath.touch()
>>> moveman = MoveManager(workers=0)
>>> for src_fpath in src_fpaths:
>>>     dst_fpath = src_fpath.augment(dpath=dst_dpath)
>>>     moveman.submit(src_fpath, dst_fpath)
>>> moveman.run()
>>> assert len(dst_dpath.ls()) == len(src_fpaths)
>>> assert len(src_dpath.ls()) == 0
Parameters:
  • workers (int) – number of parallel workers to use

  • mode (str) – thread, process, or serial

  • eager (bool) – if True starts copying as soon as a job is submitted, otherwise it wait until run is called.

_check()[source]

Validate that the set of move tasks looks sane.

Exact logic of this is currently in flux.

_operation_name = 'move'
_worker_func(dst, follow_file_symlinks, follow_dir_symlinks, meta)
Parameters:
  • str (PathLike | str)

  • dst (PathLike | str)

submit(src, dst, skip_existing=False, follow_file_symlinks=False, follow_dir_symlinks=False, meta='stats')[source]
Parameters:
  • src (str | PathLike) – source file or directory

  • dst (str | PathLike) – destination file or directory

  • follow_file_symlinks (bool) – If True and src is a link, the link will be resolved before it is copied (i.e. the data is duplicated), otherwise just the link itself will be copied.

  • follow_dir_symlinks (bool) – if True when src is a directory and contains symlinks to other directories, the contents of the linked data are copied, otherwise when False only the link itself is copied.

  • meta (str | None) – Indicates what metadata bits to copy. This can be ‘stats’ which tries to copy all metadata (i.e. like shutil.copy2()), ‘mode’ which copies just the permission bits (i.e. like shutil.copy()), or None, which ignores all metadata (i.e. like shutil.copyfile()).

class kwutil.MultiPattern(patterns, predicate)[source]

Bases: PatternBase, NiceRepr

Groups multiple patterns together with an “any” or “all” predicate.

Note

We may remove the idea of a predicate in the future and just use behavior that currently corresponds to the “any” predicate.

Example

>>> import kwutil
>>> pat = kwutil.MultiPattern.coerce(['aaa*', 'bbb'])
>>> assert not pat.match('aabb')
>>> assert pat.match('aaabb')
>>> assert pat.match('bbb')
>>> assert not pat.match('bbbaaa')

Example

>>> dpath = ub.Path.appdir('xdev/tests/multipattern_paths').ensuredir().delete().ensuredir()
>>> (dpath / 'file0.txt').touch()
>>> (dpath / 'data0.dat').touch()
>>> (dpath / 'other0.txt').touch()
>>> ((dpath / 'dir1').ensuredir() / 'file1.txt').touch()
>>> ((dpath / 'dir2').ensuredir() / 'file2.txt').touch()
>>> ((dpath / 'dir2').ensuredir() / 'file3.txt').touch()
>>> ((dpath / 'dir1').ensuredir() / 'data.dat').touch()
>>> ((dpath / 'dir2').ensuredir() / 'data.dat').touch()
>>> ((dpath / 'dir2').ensuredir() / 'data.dat').touch()
>>> pat = MultiPattern.coerce(['*.txt'], 'glob')
>>> print(list(pat.paths(cwd=dpath)))
>>> pat = MultiPattern.coerce(['*0*', '**/*.txt'], 'glob')
>>> print(list(pat.paths(cwd=dpath, recursive=1)))
>>> pat = MultiPattern.coerce(['*.txt', '**/*.txt', '**/*.dat'], 'glob')
>>> print(list(pat.paths(cwd=dpath)))
_squeeze()[source]
classmethod coerce(data, hint='auto', predicate='any')[source]
Parameters:
  • data (str | List | Pattern | PathLike | MultiPattern)

  • hint (str) – can be ‘glob’, ‘regex’, ‘strict’ or ‘auto’. In ‘auto’ we will use ‘glob’ if the input is a string and ‘*’ is in the pattern, otherwise we will use strict. Pattern inputs keep their existing interpretation.

Returns:

MultiPattern

Example

>>> from kwutil.util_pattern import *  # NOQA
>>> pat = MultiPattern.coerce('foo*', 'glob')
>>> pat2 = MultiPattern.coerce(pat, 'regex')
>>> pat3 = MultiPattern.coerce([pat, pat], 'regex')
>>> pat4 = MultiPattern.coerce([ub.Path('bar*'), pat], 'regex')
>>> print('pat = {}'.format(ub.urepr(pat, nl=1)))
>>> print('pat2 = {}'.format(ub.urepr(pat2, nl=1)))
>>> print('pat3 = {!r}'.format(pat3))
>>> print('pat4 = {!r}'.format(pat4))
>>> pat00 = MultiPattern.coerce('foo', 'glob')
>>> pat01 = MultiPattern.coerce('foo*', 'glob')
>>> pat02 = MultiPattern.coerce('foo*', 'regex')
>>> pat5 = MultiPattern.coerce(['foo', 'foo*', pat, pat00, pat01, pat02])
>>> print(f'pat5={pat5}')

Example

>>> # Test all acceptable input types
>>> from kwutil.util_pattern import *  # NOQA
>>> import itertools as it
>>> str_pat = 'pattern*'
>>> scalar_inputs = {
>>>     'str': str_pat,
>>>     'path': ub.Path(str_pat),
>>>     'pat': Pattern.coerce(str_pat),
>>>     'mpat': MultiPattern.coerce(str_pat)
>>> }
>>> # Test scalar input types
>>> scalar_outputs = {}
>>> for k, v in scalar_inputs.items():
>>>     scalar_outputs[k] = MultiPattern.coerce(v)
>>> print('scalar_outputs = {}'.format(ub.urepr(scalar_outputs, nl=1)))
>>> #
>>> # Test iterable input types
>>> multi_outputs = []
>>> for v in it.combinations(scalar_inputs.values(), 2):
>>>     multi_outputs.append(MultiPattern.coerce(v))
>>> for v in it.combinations(scalar_inputs.values(), 3):
>>>     multi_outputs.append(MultiPattern.coerce(v))
>>> # Higher order nesting test
>>> higher_order_output = MultiPattern.coerce(multi_outputs)
>>> print('higher_order_output = {}'.format(ub.urepr(higher_order_output, nl=1)))
match(text)[source]

Check if a string matches this multipattern

Parameters:

text (str) – text to check matches against

Example

>>> # xdoctest: +REQUIRES(module:parse)
>>> import kwutil
>>> self = kwutil.MultiPattern.coerce([
>>>     kwutil.Pattern.coerce('{key1}={val1},{key2}={val2}', hint='parse')
>>> ])
>>> text = 'aaa=bbb,ccc=ddd'
>>> result = self.match(text)
>>> assert result
>>> assert result.named['val1'] == 'bbb'
matches(text)[source]

Returns all valid matches to the pattern.

paths(cwd=None, recursive=False)[source]

Return paths matching this multipattern

class kwutil.Pattern(pattern, backend)[source]

Bases: PatternBase, NiceRepr

Provides a common API to several common pattern matching syntaxes.

A general patterns class, which can use a backend from BACKENDS

Parameters:
  • pattern (str | object) – The pattern text or a precompiled backend pattern object

  • backend (str) – Code indicating what backend the pattern text should be interpreted with. See BACKENDS for available choices.

Notes

# BACKENDS

The glob backend uses the fnmatch module [fnmatch_docs]. The regex backend uses the Python re module. The strict backend uses the “==” string equality testing. The parse backend uses the parse module.

References

Example

>>> # The most flexible way to define a pattern is using the
>>> # coerce method with a prefixed pattern string.
>>> import kwutil
>>> # Glob pattern: matches filenames ending in .jpg
>>> pat = kwutil.Pattern.coerce('glob:*.jpg')
>>> assert pat.match('image.jpg')
>>> assert not pat.match('image.png')
>>> # Regex pattern: similar logic using regular expressions
>>> pat = kwutil.Pattern.coerce(r'regex:.*\.jpg')
>>> assert pat.match('photo.jpg')
>>> assert not pat.match('photo.jpeg')
>>> # Strict pattern: exact string match
>>> pat = kwutil.Pattern.coerce('strict:hello.jpg')
>>> assert pat.match('hello.jpg')
>>> assert not pat.match('hello2.jpg')
>>> # Parse pattern: extract named groups from string
>>> # xdoctest: +REQUIRES(module:parse)
>>> pat = kwutil.Pattern.coerce('parse:{name}.jpg')
>>> assert pat.match('cat.jpg').named == {'name': 'cat'}
>>> assert pat.match('cat.png') is None

Example

>>> # But you can also explicitly define the backend with a hint.
>>> # Test Regex backend
>>> repat = Pattern.coerce('foo.*', 'regex')
>>> assert repat.match('foobar')
>>> assert not repat.match('barfoo')
>>> match = repat.search('baz-biz-foobar')
>>> match = repat.match('baz-biz-foobar')
>>> # Test Glob backend
>>> globpat = Pattern.coerce('foo*', 'glob')
>>> assert globpat.match('foobar')
>>> assert not globpat.match('barfoo')
>>> globpat = Pattern.coerce('[foo|bar]', 'glob')
>>> assert not globpat.match('foo')

Example

>>> # xdoctest: +REQUIRES(module:parse)
>>> # Test parse backend
>>> pattern1 = Pattern.coerce('A {adjective} pattern', 'parse')
>>> result1 = pattern1.match('A cool pattern')
>>> print(f'result1.named = {ub.urepr(result1.named, nl=1)}')
>>> pattern2 = pattern1.to_regex()
>>> result2 = pattern2.match('A cool pattern')
_prefix_mappings = {'exact:': 'strict', 'glob:': 'glob', 'parse:': 'parse', 'regex:': 'regex', 'strict:': 'strict'}
classmethod coerce(data, hint='auto')[source]

Attempt to automatically interpret the input data with the appropriate pattern backend. If it cannot be determined, then fallback to the hint.

Parameters:
  • data (str | Pattern | PathLike) – an input string or existing object

  • hint (str) – can be ‘glob’, ‘regex’, ‘strict’ or ‘auto’. In ‘auto’ we will use ‘glob’ if the input is a string and ‘*’ is in the pattern, otherwise we will use strict. Pattern inputs keep their existing interpretation.

Example

>>> import kwutil
>>> # Coerce assumes glob if there is a star
>>> pat = kwutil.Pattern.coerce('foo*')
>>> bool(pat.match('foobar'))
True
>>> # Otherwise it is a strict match
>>> pat = kwutil.Pattern.coerce('foo')
>>> bool(pat.match('foobar'))
False

Example

>>> # xdoctest: +REQUIRES(module:parse)
>>> import kwutil
>>> # The hint can explicitly specify the backend to use
>>> pat1 = kwutil.Pattern.coerce('foo.*', 'glob')
>>> pat2 = kwutil.Pattern.coerce('foo.*', 'regex')
>>> pat3 = kwutil.Pattern.coerce('foo.{}*', 'parse')
>>> inputs = ['spam', 'foobar', 'foo.bar', 'foo.bar*']
>>> print([bool(pat1.match(x)) for x in inputs])
>>> print([bool(pat2.match(x)) for x in inputs])
>>> print([bool(pat3.match(x)) for x in inputs])
[False, False, True, True]
[False, True, True, True]
[False, False, False, True]

Example

>>> # The hint can explicitly specify the backend to use
>>> import kwutil
>>> pat = kwutil.Pattern.coerce('foo*', 'glob')
>>> # A hint is ignored if the input data is not a string
>>> pat2 = kwutil.Pattern.coerce(pat, 'regex')
>>> assert pat2.backend == 'glob'

Example

>>> from kwutil.util_pattern import *  # NOQA
>>> assert Pattern.coerce('glob:*.jpg').backend == 'glob'
>>> assert Pattern.coerce('regex:.*\.jpg').backend == 'regex'
>>> assert Pattern.coerce('exact:hello.jpg').backend == 'strict'
>>> assert Pattern.coerce('strict:hello.jpg').backend == 'strict'
>>> assert Pattern.coerce('hello*.jpg').backend == 'glob'
>>> assert Pattern.coerce('hello.jpg').backend == 'strict'
>>> assert Pattern.coerce('nopat:data').backend == 'strict'
>>> assert Pattern.coerce('foo').backend == 'strict'
>>> assert Pattern.coerce('foo*').backend == 'glob'
>>> assert Pattern.coerce(re.compile('foo*')) .backend == 'regex'
>>> # xdoctest: +REQUIRES(module:parse)
>>> assert Pattern.coerce('parse:hello.jpg').backend == 'parse'
classmethod coerce_backend(data, hint='auto')[source]

Example

>>> assert Pattern.coerce_backend('foo', hint='auto')[1] == 'strict'
>>> assert Pattern.coerce_backend('foo*', hint='auto')[1] == 'glob'
>>> assert Pattern.coerce_backend(re.compile('foo*'), hint='auto')[1] == 'regex'
classmethod from_glob(data)[source]

Create a Pattern object with a glob backend.

classmethod from_regex(data, flags=0, multiline=False, dotall=False, ignorecase=False)[source]

Create a Pattern object with a regex backend.

match(text)[source]
paths(cwd=None, recursive=False)[source]

Find paths in the filesystem that match this pattern

Yields:

ub.Path

search(text)[source]
sub(repl, text, count=-1)[source]
Parameters:
  • repl (str) – text to insert in place of pattern

  • text (str) – text to be searched and modified

  • count (int) – if non-negative, the maximum number of replacements that will be made.

to_regex()[source]

Returns an equivalent pattern with the regular expression backend

Returns:

Pattern

Example

>>> globpat = Pattern.coerce('foo*', 'glob')
>>> strictpat = Pattern.coerce('foo*', 'strict')
>>> repat1 = strictpat.to_regex()
>>> repat2 = globpat.to_regex()
>>> print(f'repat1={repat1}')
>>> print(f'repat2={repat2}')
class kwutil.ProcessContext(name=None, type='process', args=None, config=None, extra=None, track_emissions=False, request_all_telemetry=True, request_most_telemetry=True, output_dpath=None, output_fpath=None)[source]

Bases: object

Context manager to track the context under which a result was computed.

This tracks things like start / end time. The command line that can reproduce the process (assuming an appropriate environment. The configuration the process was run with. The machine details the process was run on. The power usage / carbon emissions the process used, and other information.

Parameters:
  • args (str | List[str]) – This should be the sys.argv or the command line string that can be used to rerun the process

  • config (Dict) – This should be a configuration dictionary (likely based on sys.argv)

  • name (str) – the name of this process

  • type (str) – The type of this process (usually keep the default of process)

  • request_all_telemetry (bool) – if False, telemetry is disabled. This is forced to False if PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY is in the environment.

  • request_most_telemetry (bool) – if False, telemetry is disabled. This is forced to False if PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY is in the environment.

Note

This module provides telemetry, which records user-identifiable information. While useful, it does raise ethical concerns about user privacy, and the people running this code have a right to know about it and opt out. Notably, this module simply records the information, but does not send it anywhere. As such, a default opt-in is reasonable, but any future work that sends this information anywhere must be opt-out by default.

Note

There are two levels of telemetry.

Environment telemetry. These are things like the machine the code was run on. Use PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY=0 to opt-out.

The start / stop / sys.argv / config objects are necessary for mlops to do anything. But these can leak information by containing system paths. Emissions is also in this category. Use PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY to opt out.

CommandLine

xdoctest -m kwutil.process_context ProcessContext

Example

>>> # xdoctest: +REQUIRES(module:psutil)
>>> from kwutil.process_context import *
>>> import rich
>>> # Adding things like disk info an tracking emission usage
>>> self = ProcessContext(track_emissions='offline')
>>> obj1 = self.start().stop()
>>> self.add_disk_info('.')
>>> #
>>> # Telemetry can be mostly disabled
>>> self = ProcessContext(track_emissions='offline', request_most_telemetry=False)
>>> obj2 = self.start().stop()
>>> self.add_disk_info('.')
>>> # Telemetry can be completely disabled
>>> self = ProcessContext(track_emissions='offline', request_all_telemetry=False)
>>> obj3 = self.start().stop()
>>> self.add_disk_info('.')
>>> rich.print('full_telemetry = {}'.format(ub.urepr(obj1, nl=3)))
>>> rich.print('some_telemetry = {}'.format(ub.urepr(obj2, nl=3)))
>>> rich.print('no_telemetry = {}'.format(ub.urepr(obj3, nl=3)))

Example

>>> # xdoctest: +REQUIRES(module:psutil)
>>> from kwutil.process_context import *
>>> # flush can measure intermediate progress
>>> self = ProcessContext(track_emissions=True)
>>> self.add_disk_info('.')
>>> obj1 = self.start().flush()
>>> obj1_orig = obj1.copy()
>>> obj2 = self.stop()
_cpuinfo()[source]
_device_info(device)[source]
_flush_emissions_tracker()[source]
_gpuinfo()[source]
_hostinfo()[source]
_infer_dynamic_properties(func, args, kwargs)[source]
_infer_static_properties(func)[source]
_machine()[source]
_meminfo()[source]
_osinfo()[source]
_pyinfo()[source]
_start_emissions_tracker()[source]
_stop_emissions_tracker()[source]
_timestamp()[source]
add_device_info(device)[source]

Add information about a torch device that was used in this process.

Does nothing if telemetry is disabled.

Parameters:

device (torch.device) – torch device to add info about

Example

>>> # xdoctest: +REQUIRES(module:torch)
>>> from kwutil.process_context import *
>>> import torch
>>> import rich
>>> device = torch.device(0) if torch.cuda.is_available() else torch.device('cpu')
>>> # Adding things like disk info an tracking emission usage
>>> self = ProcessContext(track_emissions='offline')
>>> obj1 = self.start().stop()
>>> self.add_disk_info('.')
>>> self.add_device_info(device)
>>> #
>>> # Telemetry can be mostly disabled
>>> self = ProcessContext(track_emissions='offline', request_most_telemetry=False)
>>> obj2 = self.start().stop()
>>> self.add_disk_info('.')
>>> self.add_device_info(device)
>>> # Telemetry can be completely disabled
>>> self = ProcessContext(track_emissions='offline', request_all_telemetry=False)
>>> obj3 = self.start().stop()
>>> self.add_disk_info('.')
>>> self.add_device_info(device)
>>> rich.print('full_telemetry = {}'.format(ub.urepr(obj1, nl=3)))
>>> rich.print('some_telemetry = {}'.format(ub.urepr(obj2, nl=3)))
>>> rich.print('no_telemetry = {}'.format(ub.urepr(obj3, nl=3)))
add_disk_info(path)[source]

Add information about a storage disk that was used in this process

Does nothing if telemetry is disabled.

dump()[source]
flush()[source]
property is_running

Has the context object started and not yet been stopped?

property is_started

Has the context object ever started? This can still return True if it has stopped.

start()[source]
stop()[source]
write_invocation(invocation_fpath)[source]

Write a helper file that contains a locally reproducible invocation of this process.

class kwutil.ProgressManager(backend='rich', **kwargs)[source]

Bases: BaseProgIterManager

A progress manager.

Manage multiple progress bars, either with rich or ProgIter.

CommandLine

xdoctest -m kwutil.util_progress ProgressManager:0
xdoctest -m kwutil.util_progress ProgressManager:1
xdoctest -m kwutil.util_progress ProgressManager:2

Example

>>> from kwutil.util_progress import ProgressManager
>>> from progiter import progiter
>>> # Can use plain progiter or rich
>>> # The usecase for plain progiter is when threads / live output
>>> # is not desirable and you just want plain stdout progress
>>> pman = ProgressManager(backend='progiter')
>>> with pman:
>>>     oprog = pman.progiter(range(20), desc='outer loop', verbose=3)
>>>     for i in oprog:
>>>         oprog.set_postfix(f'Doing step {i}', refresh=False)
>>>         for i in pman.progiter(range(100), desc=f'inner loop {i}'):
>>>             pass
>>> # xdoctest: +REQUIRES(module:rich)
>>> self = pman = ProgressManager(backend='rich')
>>> pman = ProgressManager(backend='rich')
>>> with pman:
>>>     oprog = pman.progiter(range(20), desc='outer loop', verbose=3)
>>>     for i in oprog:
>>>         oprog.set_postfix(f'Doing step {i}', refresh=False)
>>>         for i in pman.progiter(range(100), desc=f'inner loop {i}'):
>>>             pass

Example

>>> # xdoctest: +REQUIRES(module:rich)
>>> # A fairly complex example
>>> from kwutil.util_progress import ProgressManager
>>> import time
>>> delay = 0.00005
>>> N_inner = 300
>>> N_outer = 11
>>> self = pman = ProgressManager(backend='rich')
>>> with pman:
>>>     oprog = pman(range(N_outer), desc='outer loop')
>>>     for i in oprog:
>>>         if i > 7:
>>>             self.update_info(f'The info panel gives detailed updates\nWe are now at step {i}\nWe are just about done now')
>>>         elif i > 5:
>>>             self.update_info(f'The info panel gives detailed updates\nWe are now at step {i}')
>>>         oprog.set_postfix(f'Doing step {i}')
>>>         N = 1000
>>>         for j in pman(iter(range(N_inner)), total=None if i % 2 == 0 else N_inner, desc=f'inner loop {i}', transient=i < 4):
>>>             time.sleep(delay)

Example

>>> # xdoctest: +REQUIRES(module:rich)
>>> # Test complex example over a grid of parameters
>>> from kwutil.util_progress import ProgressManager, ProgIter2
>>> import time
>>> delay = 0.000005
>>> N_inner = 300
>>> N_outer = 11
>>> basis = {
>>>     'with_info': [0, 1],
>>>     'backend': ['progiter', 'rich'],
>>>     'enabled': [0, 1],
>>>     #'with_info': [1],
>>> }
>>> grid = list(ub.named_product(basis))
>>> grid_prog = ProgIter2(grid, desc='Test cases over grid', verbose=3)
>>> grid_prog.update_info('Here we go')
>>> for item in grid:
>>>     grid_prog.ensure_newline()
>>>     grid_prog.update_info(f'Running grid test {ub.urepr(item, nl=1)}')
>>>     print('\n\n')
>>>     self = ProgressManager(backend=item['backend'], enabled=item['enabled'])
>>>     with self:
>>>         outer_prog = self.progiter(range(N_outer), desc='outer loop')
>>>         for i in outer_prog:
>>>             if item['with_info']:
>>>                 if i > 7:
>>>                     outer_prog.update_info(f'The info panel gives detailed updates\nWe are now at step {i}\nWe are just about done now')
>>>                 elif i > 5:
>>>                     outer_prog.update_info(f'The info panel gives detailed updates\nWe are now at step {i}')
>>>             outer_prog.set_postfix(f'Doing step {i}')
>>>             inner_kwargs = dict(
>>>                 total=None if i % 2 == 0 else N_inner,
>>>                 transient=i < 4,
>>>                 time_thresh=delay * 2.3,
>>>                 desc=f'inner loop {i}',
>>>             )
>>>             for j in self.progiter(iter(range(N_inner)), **inner_kwargs):
>>>                 time.sleep(delay)
>>>     grid_prog.update_info(f'Finished test item')

Example

>>> # xdoctest: +REQUIRES(module:rich)
>>> # Demo manual usage
>>> from kwutil.util_progress import ProgressManager
>>> from kwutil import util_progress
>>> import time
>>> pman = ProgressManager()
>>> pman.start()
>>> task1 = pman.progiter(desc='task1', total=100)
>>> task2 = pman.progiter(desc='task2')
>>> for i in range(100):
>>>     task1.update()
>>>     task2.update(2)
>>>     time.sleep(0.001)
>>> ProgressManager.stopall()

Example

>>> # Demo manual usage (progiter backend)
>>> from kwutil.util_progress import ProgressManager
>>> from kwutil import util_progress
>>> import time
>>> pman = ProgressManager(backend='progiter', adjust=0, freq=1)
>>> pman.start()
>>> task1 = pman.progiter(desc='task1', total=12)
>>> task2 = pman.progiter(desc='task2')
>>> task1.update()
>>> task2.update()
>>> for i in range(10):
>>>     time.sleep(0.001)
>>>     task1.update()
>>>     time.sleep(0.001)
>>>     task2.update(2)
>>> ProgressManager.stopall()
ProgIter(*args, **kw)
property _is_main_manager
progiter(*args, **kw)[source]
start()[source]
stop(*args, **kwargs)[source]
classmethod stopall()[source]

Stop all background progress threads (likely only 1 exists)

update_info(text)[source]
class kwutil.Superlock(lock_fpath=NoParam, thread_key=NoParam)[source]

Bases: object

A thread and/or process lock

The lockiest lock that ever did lock… or at least an attempt at it.

This is experimental and not well tested.

If lock_fpath is NoParam, uses a global shared process lock. If None, then no process lock is used.

If thread_key is NoParam, uses a global shared thread lock. If None, then no thread lock is used.

Otherwise locks with the same process_fpath OR thread_key will not execute concurrently, up to system limitations of the locking mechanisms.

Uses [Fasteners] for the process-based file-locks, which do have fundamental issues [OnFileLocks].

TODO: Evaluate [FileLock] as an alternative.

References

SeeAlso:

Example

>>> # xdoctest: +REQUIRES(module:fasteners)
>>> self = Superlock()
>>> with self:
>>>     print('non-concurent code')

Example

>>> # xdoctest: +REQUIRES(module:fasteners)
>>> from kwutil.util_locks import *  # NOQA
>>> import ubelt as ub
>>> lock1 = Superlock()
>>> lock2 = Superlock()
>>> assert lock1.acquire(timeout=10)
>>> assert not lock2.acquire(timeout=0.01)
>>> lock1.release()
>>> assert lock2.acquire()
>>> lock2.release()

Example

>>> # Demonstrate a real world case with thread locks
>>> # xdoctest: +REQUIRES(module:fasteners)
>>> import time
>>> from kwutil.util_locks import Superlock
>>> import ubelt as ub
>>> #
>>> shared_counter = []
>>> #
>>> def task(i):
...     with Superlock():
...         # simulate work inside critical section
...         current_len = len(shared_counter)
>>>         print(f'current_len={current_len}')
...         time.sleep(0.05)
...         shared_counter.append(i)
...         # ensure no concurrent execution by checking counter length did not change during sleep
...         assert len(shared_counter) == current_len + 1
...     return i
>>> #
>>> with ub.Executor(mode='thread', max_workers=4) as executor:
...     results = list(executor.map(task, range(8)))
>>> #
>>> sorted(results) == list(range(8))
True
>>> len(shared_counter) == 8
True

Example

>>> # Demonstrate a real world case with process locks
>>> # xdoctest: +REQUIRES(module:fasteners)
>>> # xdoctest: +SKIP('xdoctest does not support pickled functions yet')
>>> import time
>>> import ubelt as ub
>>> from pathlib import Path
>>> #
>>> dpath = ub.Path.appdir('kwutil/tests/superlock').ensuredir()
>>> counter_fpath = dpath / 'shared_counter.txt'
>>> counter_fpath.write_text('0')
>>> #
>>> def task(i):
...     import time
...     from pathlib import Path
...     from kwutil.util_locks import Superlock
...     lock = Superlock()
...     counter_fpath = Path(ub.Path.appdir('kwutil/tests/superlock') / 'shared_counter.txt')
...     with lock:
...         current = int(counter_fpath.read_text())
...         time.sleep(0.05)  # simulate some work
...         counter_fpath.write_text(str(current + 1))
...     return i
>>> #
>>> with ub.Executor(mode='process', max_workers=4) as executor:
...     results = list(executor.map(task, range(8)))
>>> #
>>> sorted(results) == list(range(8))
True
>>> final_value = int(counter_fpath.read_text())
>>> final_value == 8
True
GLOBAL_APPNAME = 'fasteners_ext/file_locks'
GLOBAL_LOCK_FNAME = 'superlock.lock'
GLOBAL_THREAD_KEY = '__GLOBAL_THREAD_LOCK__'
THREAD_LOCKS = <WeakValueDictionary>
_debug(msg)[source]
acquire(blocking=True, timeout=None, delay=0.01, max_delay=0.1)[source]
property global_lock_fpath
release()[source]
class kwutil.XML[source]

Bases: object

A thin wrapper around [xmltodict].

References

Example

>>> # xdoctest: +REQUIRES(module:xmltodict)
>>> from kwutil.util_xml import *  # NOQA
>>> import ubelt as ub
>>> text = ub.codeblock(
    '''
    <mydocument has="an attribute">
      <and>
        <many>elements</many>
        <many>more elements</many>
      </and>
      <plus a="complex">
        element as well
      </plus>
    </mydocument>
    ''')
>>> data = XML.loads(text)
>>> print(f'data = {ub.urepr(data, nl=-1)}')
>>> recon = XML.dumps(data, pretty=True)
>>> print(recon)
static dump(data, fp, pretty=False, backend='xmltodict')[source]
static dumps(data, pretty=False, backend='xmltodict')[source]
static load(file, process_namespaces=False, backend='xmltodict')[source]
static loads(text, process_namespaces=False, backend='xmltodict')[source]
class kwutil.Yaml[source]

Bases: object

Namespace for yaml functions

Example

>>> # xdoctest: +REQUIRES(module:ruamel.yaml)
>>> from kwutil.util_yaml import Yaml
>>> import ubelt as ub
>>> data = {
>>>     'a': 'hello world',
>>>     'b': ub.udict({'a': 3})
>>> }
>>> text1 = Yaml.dumps(data, backend='ruamel')
>>> # Coerce is idempotent and resolves the input to nested Python
>>> # structures.
>>> resolved1 = Yaml.coerce(data)
>>> resolved2 = Yaml.coerce(text1)
>>> resolved3 = Yaml.coerce(resolved2)
>>> assert resolved1 == resolved2 == resolved3 == data
>>> # with ruamel
>>> data2 = Yaml.loads(text1)
>>> assert data2 == data
>>> # with pyyaml
>>> data2 = Yaml.loads(text1, backend='pyyaml')
>>> assert data2 == data
static CodeBlock(text)[source]
static Dict(data)[source]

Get a ruamel-enhanced dictionary

Example

>>> # xdoctest: +REQUIRES(module:pyyaml)
>>> # xdoctest: +REQUIRES(module:ruamel.yaml)
>>> data = {'a': 'avalue', 'b': 'bvalue'}
>>> data = Yaml.Dict(data)
>>> data.yaml_set_start_comment('hello')
>>> # Note: not working https://sourceforge.net/p/ruamel-yaml/tickets/400/
>>> data.yaml_set_comment_before_after_key('a', before='a comment', indent=2)
>>> data.yaml_set_comment_before_after_key('b', 'b comment')
>>> print(Yaml.dumps(data))
static InlineList(items)[source]

References

static coerce(data, backend='ruamel', path_policy='existing_file_with_extension')[source]

Attempt to convert input into a parsed yaml / json data structure. If the data looks like a path, it tries to load and parse file contents. If the data looks like a yaml/json string it tries to parse it. If the data looks like parsed data, then it returns it as-is.

Parameters:
  • data (str | PathLike | dict | list)

  • backend (str) – either ruamel or pyyaml

  • path_policy (str) – Determines how we determine if something looks like a path. Pre 0.3.2 behavior is from path_policy=’existing_file’. Default is ‘existing_file_with_extension’. Can also be ‘never’ to disable the path feature and decrease ambiguity.

Returns:

parsed yaml data

Return type:

object

Note

The input to the function cannot distinguish a string that should be loaded and a string that should be parsed. If it looks like a file that exists it will read it. To avoid this coerner case use this only for data where you expect the output is a List or Dict.

References

https://stackoverflow.com/questions/528281/how-can-i-include-a-yaml-file-inside-another

Example

>>> # xdoctest: +REQUIRES(module:pyyaml)
>>> # xdoctest: +REQUIRES(module:ruamel.yaml)
>>> from kwutil.util_yaml import Yaml
>>> text = ub.codeblock(
    '''
    - !!float nan
    - !!float inf
    - nan
    - inf
    # Seems to break older ruamel.yaml 0.17.21
    # - .nan
    # - .inf
    - null
    ''')
>>> Yaml.coerce(text, backend='pyyaml')
>>> Yaml.coerce(text, backend='ruamel')

Example

>>> # xdoctest: +REQUIRES(module:pyyaml)
>>> # xdoctest: +REQUIRES(module:ruamel.yaml)
>>> Yaml.coerce('"[1, 2, 3]"')
[1, 2, 3]
>>> fpath = ub.Path.appdir('cmd_queue/tests/util_yaml').ensuredir() / 'file.yaml'
>>> fpath.write_text(Yaml.dumps([4, 5, 6]))
>>> Yaml.coerce(fpath)
[4, 5, 6]
>>> Yaml.coerce(str(fpath))
[4, 5, 6]
>>> dict(Yaml.coerce('{a: b, c: d}'))
{'a': 'b', 'c': 'd'}
>>> Yaml.coerce(None)
None

Example

>>> # xdoctest: +REQUIRES(module:pyyaml)
>>> # xdoctest: +REQUIRES(module:ruamel.yaml)
>>> assert Yaml.coerce('') is None

Example

>>> # xdoctest: +REQUIRES(module:pyyaml)
>>> # xdoctest: +REQUIRES(module:ruamel.yaml)
>>> dpath = ub.Path.appdir('cmd_queue/tests/util_yaml').ensuredir()
>>> fpath = dpath / 'external.yaml'
>>> fpath.write_text(Yaml.dumps({'foo': 'bar'}))
>>> text = ub.codeblock(
>>>    f'''
>>>    items:
>>>        - !include {dpath}/external.yaml
>>>    ''')
>>> data = Yaml.coerce(text, backend='ruamel')
>>> print(Yaml.dumps(data, backend='ruamel'))
items:
- foo: bar
>>> text = ub.codeblock(
>>>    f'''
>>>    items:
>>>        !include [{dpath}/external.yaml, blah, 1, 2, 3]
>>>    ''')
>>> data = Yaml.coerce(text, backend='ruamel')
>>> print('data = {}'.format(ub.urepr(data, nl=1)))
>>> print(Yaml.dumps(data, backend='ruamel'))
static dumps(data, backend='ruamel', version=None)[source]

Dump yaml to a string representation (and account for some of our use-cases)

Parameters:
  • data (Any) – yaml representable data

  • backend (str) – either ruamel or pyyaml

  • version (str) – version of YAML spec to use. (e.g. ‘1.1’)

Returns:

yaml text

Return type:

str

Example

>>> # xdoctest: +REQUIRES(module:pyyaml)
>>> # xdoctest: +REQUIRES(module:ruamel.yaml)
>>> import ubelt as ub
>>> data = {
>>>     'a': 'hello world',
>>>     'b': ub.udict({'a': 3})
>>> }
>>> text2 = Yaml.dumps(data, backend='pyyaml')
>>> print(text2)
>>> text1 = Yaml.dumps(data, backend='ruamel')
>>> print(text1)
>>> assert text1 == text2
>>> print(Yaml.dumps({'key': 'on'}, backend='ruamel', version='1.1'))
%YAML 1.1
---
key: 'on'
static load(file, backend='ruamel', version=None)[source]

Load yaml from a file

Parameters:
  • file (io.TextIOBase | PathLike | str) – yaml file path or file object

  • backend (str) – either ruamel or pyyaml

Returns:

object

Example

>>> # xdoctest: +REQUIRES(module:pyyaml)
>>> # xdoctest: +REQUIRES(module:ruamel.yaml)
>>> from kwutil.util_yaml import Yaml
>>> import ubelt as ub
>>> data = {
>>>     'a': 'hello world',
>>>     'b': ub.udict({'a': 3})
>>> }
>>> text1 = Yaml.dumps(data, backend='ruamel')
>>> import io
>>> # with ruamel
>>> file = io.StringIO(text1)
>>> data2 = Yaml.load(file)
>>> assert data2 == data
>>> # with pyyaml
>>> file = io.StringIO(text1)
>>> data2 = Yaml.load(file, backend='pyyaml')
>>> assert data2 == data
static loads(text, backend='ruamel', version=None)[source]

Load yaml from a text

Parameters:
  • text (str) – yaml text

  • backend (str) – either ruamel or pyyaml

Returns:

object

Example

>>> # xdoctest: +REQUIRES(module:pyyaml)
>>> # xdoctest: +REQUIRES(module:ruamel.yaml)
>>> import ubelt as ub
>>> data = {
>>>     'a': 'hello world',
>>>     'b': ub.udict({'a': 3})
>>> }
>>> print('data = {}'.format(ub.urepr(data, nl=1)))
>>> print('---')
>>> text = Yaml.dumps(data)
>>> print(ub.highlight_code(text, 'yaml'))
>>> print('---')
>>> data2 = Yaml.loads(text)
>>> assert data == data2
>>> data3 = Yaml.loads(text, backend='pyyaml')
>>> print('data2 = {}'.format(ub.urepr(data2, nl=1)))
>>> print('data3 = {}'.format(ub.urepr(data3, nl=1)))
>>> assert data == data3
kwutil.coerce_num_workers(num_workers='auto', minimum=0)[source]

Return some number of CPUs based on a chosen heuristic

Parameters:
  • num_workers (int | str) – A special string code, or an exact number of cpus

  • minimum (int) – minimum workers we are allowed to return

Returns:

number of available cpus based on request parameters

Return type:

int

CommandLine

xdoctest -m kwutil.util_parallel coerce_num_workers

Example

>>> # xdoctest: +REQUIRES(module:psutil)
>>> from kwutil.util_parallel import *  # NOQA
>>> print(coerce_num_workers('all'))
>>> print(coerce_num_workers('avail'))
>>> print(coerce_num_workers('auto'))
>>> print(coerce_num_workers('all-2'))
>>> print(coerce_num_workers('avail-2'))
>>> print(coerce_num_workers('all/2'))
>>> print(coerce_num_workers('min(all,2)'))
>>> #print(coerce_num_workers('[max(all,2)][0]'))
>>> import pytest
>>> with pytest.raises(Exception):
>>>     print(coerce_num_workers('all + 1' + (' + 1' * 100)))
>>> total_cpus = coerce_num_workers('all')
>>> assert coerce_num_workers('all-2') == max(total_cpus - 2, 0)
>>> assert coerce_num_workers('all-100') == max(total_cpus - 100, 0)
>>> assert coerce_num_workers('avail') <= coerce_num_workers('all')
>>> assert coerce_num_workers(3) == max(3, 0)
kwutil.ensure_rng(rng=None, api='python')[source]

Coerces input into a random number generator.

This function is useful for ensuring that your code uses a controlled internal random state that is independent of other modules.

If the input is None, then a global random state is returned.

If the input is a numeric value, then that is used as a seed to construct a random state.

If the input is a random number generator, then another random number generator with the same state is returned. Depending on the api, this random state is either return as-is, or used to construct an equivalent random state with the requested api.

Parameters:
  • rng (int | float | None | numpy.random.RandomState | random.Random) – if None, then defaults to the global rng. Otherwise this can be an integer or a RandomState class. Defaults to the global random.

  • api (str) – specify the type of random number generator to use. This can either be ‘numpy’ for a numpy.random.RandomState object or ‘python’ for a random.Random object. Defaults to numpy.

Returns:

rng - either a numpy or python random number generator, depending on the setting of api.

Return type:

(numpy.random.RandomState | random.Random)

Example

>>> # xdoctest: +REQUIRES(module:numpy)
>>> from kwutil.util_random import *  # NOQA
>>> from kwutil.util_random import ensure_rng
>>> rng = ensure_rng(None)
>>> ensure_rng(0, 'python').randint(0, 1000)
864
>>> # xdoctest: +REQUIRES(module:numpy)
>>> import numpy as np
>>> ensure_rng(np.random.RandomState(1)).randint(0, 1000)
427

Example

>>> from kwutil.util_random import *  # NOQA
>>> from kwutil.util_random import ensure_rng
>>> num = 4
>>> print('--- Python as PYTHON ---')
>>> py_rng = random.Random(0)
>>> pp_nums = [py_rng.random() for _ in range(num)]
>>> print(pp_nums)
>>> print('--- Numpy as PYTHON ---')
>>> # xdoctest: +REQUIRES(module:numpy)
>>> import numpy as np
>>> np_rng = ensure_rng(random.Random(0), api='numpy')
>>> np_nums = [np_rng.rand() for _ in range(num)]
>>> print(np_nums)
>>> print('--- Numpy as NUMPY---')
>>> np_rng = np.random.RandomState(seed=0)
>>> nn_nums = [np_rng.rand() for _ in range(num)]
>>> print(nn_nums)
>>> print('--- Python as NUMPY---')
>>> py_rng = ensure_rng(np.random.RandomState(seed=0), api='python')
>>> pn_nums = [py_rng.random() for _ in range(num)]
>>> print(pn_nums)
>>> assert np_nums == pp_nums
>>> assert pn_nums == nn_nums

Example

>>> # Test that random modules can be coerced
>>> # xdoctest: +REQUIRES(module:numpy)
>>> from kwutil.util_random import *  # NOQA
>>> import random
>>> import numpy as np
>>> ensure_rng(random, api='python')
>>> ensure_rng(random, api='numpy')
>>> ensure_rng(np.random, api='python')
>>> ensure_rng(np.random, api='numpy')
kwutil.envflag(key, default=None, environ=None)[source]

Determine if an environment variable is specified and truthy or falsy.

Parameters:
  • key (str) – the environment variable name to check

  • default (Any) – the default value to return if the environment variable is not specified.

  • environ (None | Dict) – Uses this to get the environment variable. If unspecified, defaults to os.environ.

Returns:

True if the environment variable exist and matches a truthy pattern. (e.g. true, on, yes, 1, or t). Otherwise returns False.

Note

This will return false on any setting of the environ that is not truthy. (e.g. YESPLEASE is not a registered TRUTHY_ENVIRON so it will return False).

Example

>>> from kwutil import util_environ
>>> environ = {
>>>     'foo': '1',
>>>     'bar': 'YES',
>>>     'baz': '0',
>>>     'biz': '1111',
>>> }
>>> assert util_environ.envflag('foo', 0, environ=environ)
>>> assert util_environ.envflag('bar', 0, environ=environ)
>>> assert not util_environ.envflag('baz', 0, environ=environ)
>>> assert not util_environ.envflag('biz', 0, environ=environ)
>>> assert not util_environ.envflag('buzz', 0, environ=environ)
>>> assert util_environ.envflag('buzz', 1, environ=environ)
kwutil.safeeval(expression, context={}, safenodes=None, addnodes=None, funcs=None, attrs=None)[source]

C-style simplified wrapper, eval() replacement.

Parameters:
  • expr (str) – the expression to evaluate

  • context (dict) – Optional dictionary of variables to make available during evaluation.

  • safenodes (List[str] | None) – Specify the name of allowed AST nodes, if unspecified a default list is used.

  • addnodes (List[str] | None) – List of additional AST node names to allow in addition to safenodes.

  • funcs (List[str]) – list of allowed function names.

  • attrs (List[str]) – list of allowed attribute names.

Returns:

the result of the expression

Return type:

Any

Raises:
  • ExecutionException - if the expression fails to execute

  • CompilationException - if the expression fails to parse

  • ValidationException - if the expression fails safety checks

Example

>>> from kwutil.util_eval import safeeval
>>> safeeval('3 + 2')
5
>>> safeeval('max(3, 2)', addnodes=['Call'], funcs=['max'])
3
>>> safeeval('x * 2', context={'x': 5})
10
>>> safeeval('len(lst)', context={'lst': [1, 2, 3]}, addnodes=['Call'], funcs=['len'])
3
>>> safeeval('obj.value', context={'obj': type("O", (), {"value": 42})()}, addnodes=['Attribute'], attrs=['value'])
42
>>> import pytest
>>> with pytest.raises(ValidationException):
...     safeeval('exec("import os")')
>>> with pytest.raises(ValidationException):
...     safeeval('os.system("ls")', context={'os': __import__('os')}, addnodes=['Call', 'Attribute'], funcs=[])