kwutil package¶
Submodules¶
- kwutil.copy_manager module
- kwutil.fsops_managers module
- kwutil.partial_format module
- kwutil.process_context module
ProcessContextProcessContext._infer_static_properties()ProcessContext._infer_dynamic_properties()ProcessContext.is_runningProcessContext.is_startedProcessContext.dump()ProcessContext.write_invocation()ProcessContext._timestamp()ProcessContext._hostinfo()ProcessContext._osinfo()ProcessContext._pyinfo()ProcessContext._meminfo()ProcessContext._cpuinfo()ProcessContext._gpuinfo()ProcessContext._machine()ProcessContext.start()ProcessContext.flush()ProcessContext.stop()ProcessContext._start_emissions_tracker()ProcessContext._flush_emissions_tracker()ProcessContext._stop_emissions_tracker()ProcessContext._device_info()ProcessContext.add_device_info()ProcessContext.add_disk_info()
jsonify_config()Reconstructionmain()
- kwutil.slugify_ext module
- kwutil.util_environ module
- kwutil.util_eval module
- kwutil.util_exception module
- kwutil.util_hardware module
- kwutil.util_json module
- kwutil.util_locks module
- kwutil.util_parallel module
- kwutil.util_path module
- kwutil.util_pattern module
- kwutil.util_progress module
- kwutil.util_prompt module
- kwutil.util_random module
- kwutil.util_resources module
- kwutil.util_rich module
- kwutil.util_time module
- kwutil.util_units module
- kwutil.util_windows module
- kwutil.util_yaml module
Module contents¶
The kwutil Module¶
Read the docs |
|
Gitlab (main) |
|
Github (mirror) |
|
Pypi |
The Kitware utility module.
This module is for small, pure-python utility functions. Dependencies are allowed, but they must be small and highly standard packages (e.g. rich, psutil, ruamel.yaml).
- class kwutil.CopyManager(workers=0, mode='thread', eager=False, overwrite=False, skip_existing=False)[source]¶
Bases:
_FilesystemOperationManagerHelper to execute multiple copy operations on a local filesystem.
Notes
It would be nice for this to support an rsync backend that could sync at the src/dst pair level. Not sure if this works.
References
https://unix.stackexchange.com/questions/133995/rsyncing-multiple-src-dest-pairs https://serverfault.com/questions/163859/using-rsync-as-a-queue https://unix.stackexchange.com/questions/602606/rsync-source-list-to-destination-list
Todo
[ ] Add optional check that all src paths exist
[ ] Add optional check that all dst paths do not exist (unless overwrite=True or skip_existing=True)
- [ ] Add optional check that that no dst path is or is inside of a src
dpath (would make things ambiguous), the operation graph should be bipartite.
[ ] Add backend that uses a fast protocol like rsync (or write one in Rust)
Example
>>> import ubelt as ub >>> from kwutil.fsops_managers import CopyManager >>> dpath = ub.Path.appdir('kwutil', 'tests', 'copy_manager') >>> src_dpath = (dpath / 'src').ensuredir() >>> dst_dpath = (dpath / 'dst').delete() >>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)] >>> for fpath in src_fpaths: >>> fpath.touch() >>> # To use a copy manager, iterate through your source and >>> # destination paths and submit them. >>> copyman = CopyManager(workers=0) >>> # by default it will do nothing >>> # unless you specify eager=True or explicitly call run. >>> for fpath in src_fpaths: >>> dst = fpath.augment(dpath=dst_dpath) >>> copyman.submit(fpath, dst) >>> report = copyman.report() >>> print(f'report = {ub.urepr(report, nl=1)}') >>> copyman.run()
Example
>>> import ubelt as ub >>> from kwutil.fsops_managers import CopyManager >>> dpath = ub.Path.appdir('kwutil', 'tests', 'copy_manager') >>> src_dpath = (dpath / 'src').ensuredir() >>> dst_dpath = (dpath / 'dst').delete() >>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)] >>> for fpath in src_fpaths: >>> fpath.touch() >>> copyman = CopyManager(workers=0) >>> for fpath in src_fpaths: >>> dst = fpath.augment(dpath=dst_dpath) >>> copyman.submit(fpath, dst) >>> copyman.run() >>> assert len(dst_dpath.ls()) == len(src_dpath.ls()) >>> copyman = CopyManager(workers=0) >>> for fpath in src_fpaths: >>> dst = fpath.augment(dpath=dst_dpath) >>> copyman.submit(fpath, dst) >>> import pytest >>> with pytest.raises(FileExistsError): >>> copyman.run() >>> copyman = CopyManager(workers=0) >>> for fpath in src_fpaths: >>> dst = fpath.augment(dpath=dst_dpath) >>> copyman.submit(fpath, dst, skip_existing=True) >>> copyman.run()
- Parameters:
workers (int) – number of parallel workers to use
mode (str) – thread, process, or serial
eager (bool) – if True starts copying as soon as a job is submitted, otherwise it wait until run is called.
overwrite (bool) – if True will overwrite the file if it exists, otherwise it will error unless skip_existing is True. Defaults to False.
skip_existing (bool) – if jobs where the destination already exists should be skipped by default. Default=False
- _operation_name = 'copy'¶
- _worker_func(dst, skip_existing, overwrite, follow_file_symlinks, follow_dir_symlinks, meta)¶
- Parameters:
str (PathLike | str)
dst (PathLike | str)
overwrite (bool)
skip_existing (bool)
- submit(src, dst, skip_existing=False, overwrite=None, follow_file_symlinks=False, follow_dir_symlinks=False, meta='stats')[source]¶
- Parameters:
src (str | PathLike) – source file or directory
dst (str | PathLike) – destination file or directory
skip_existing (bool | None) – if jobs where the destination already exists should be skipped by default. If None, then uses the class default. Default=None
overwrite (bool | None) – if True will overwrite the file if it exists, otherwise it will error unless skip_existing is True. If None, then uses the class default. Default=None.
follow_file_symlinks (bool) – If True and src is a link, the link will be resolved before it is copied (i.e. the data is duplicated), otherwise just the link itself will be copied.
follow_dir_symlinks (bool) – if True when src is a directory and contains symlinks to other directories, the contents of the linked data are copied, otherwise when False only the link itself is copied.
meta (str | None) – Indicates what metadata bits to copy. This can be ‘stats’ which tries to copy all metadata (i.e. like
shutil.copy2()), ‘mode’ which copies just the permission bits (i.e. likeshutil.copy()), or None, which ignores all metadata (i.e. likeshutil.copyfile()).
- class kwutil.DeleteManager(workers=0, mode='thread', eager=False, overwrite=False, skip_existing=False)[source]¶
Bases:
_FilesystemOperationManagerHelper to execute multiple delete operations on a local filesystem.
Note
The topic of deleting a lot of files quickly is interesting. See: https://unix.stackexchange.com/questions/37329/efficiently-delete-large-directory-containing-thousands-of-files
Example
>>> import ubelt as ub >>> from kwutil.fsops_managers import DeleteManager >>> dpath = ub.Path.appdir('kwutil', 'tests', 'delete_manager') >>> src_dpath = (dpath / 'src').ensuredir() >>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)] >>> for fpath in src_fpaths: >>> fpath.touch() >>> deleteman = DeleteManager(workers=0, eager=False) >>> for fpath in src_fpaths: >>> deleteman.submit(fpath) >>> assert len(src_dpath.ls()) == 10 >>> deleteman.run() >>> assert len(src_dpath.ls()) == 0
- Parameters:
workers (int) – number of parallel workers to use
mode (str) – thread, process, or serial
eager (bool) – if True starts copying as soon as a job is submitted, otherwise it wait until run is called.
overwrite (bool) – if True will overwrite the file if it exists, otherwise it will error unless skip_existing is True. Defaults to False.
skip_existing (bool) – if jobs where the destination already exists should be skipped by default. Default=False
- _operation_name = 'delete'¶
- _worker_func(verbose=False)¶
Removes a file or recursively removes a directory. If a path does not exist, then this is does nothing.
- Parameters:
path (str | PathLike) – file or directory to remove
verbose (bool) – if True prints what is being done
- SeeAlso:
- send2trash -
A cross-platform Python package for sending files to the trash instead of irreversibly deleting them.
ubelt.util_path.Path.delete()
Notes
This can call
os.unlink(),os.rmdir(), orshutil.rmtree(), depending on whatpathreferences on the filesystem. (On windows may also call a customubelt._win32_links._win32_rmtree()).Example
>>> import ubelt as ub >>> from os.path import join >>> base = ub.Path.appdir('ubelt', 'delete_test').ensuredir() >>> dpath1 = ub.ensuredir(join(base, 'dir')) >>> ub.ensuredir(join(base, 'dir', 'subdir')) >>> ub.touch(join(base, 'dir', 'to_remove1.txt')) >>> fpath1 = join(base, 'dir', 'subdir', 'to_remove3.txt') >>> fpath2 = join(base, 'dir', 'subdir', 'to_remove2.txt') >>> ub.touch(fpath1) >>> ub.touch(fpath2) >>> assert all(map(exists, (dpath1, fpath1, fpath2))) >>> ub.delete(fpath1) >>> assert all(map(exists, (dpath1, fpath2))) >>> assert not exists(fpath1) >>> ub.delete(dpath1) >>> assert not any(map(exists, (dpath1, fpath1, fpath2)))
Example
>>> import ubelt as ub >>> from os.path import exists, join >>> dpath = ub.Path.appdir('ubelt', 'delete_test2').ensuredir() >>> dpath1 = ub.ensuredir(join(dpath, 'dir')) >>> fpath1 = ub.touch(join(dpath1, 'to_remove.txt')) >>> assert exists(fpath1) >>> ub.delete(dpath) >>> assert not exists(fpath1)
- class kwutil.Hardware[source]¶
Bases:
objectTODO: class level namespace
References
https://pypi.org/project/hardware/
Example
>>> # xdoctest: +SKIP >>> import kwutil >>> kwutil.Hardware.report()
- class kwutil.Json[source]¶
Bases:
objectSimilar to kwutil.Yaml, the Json class provides a set of helpers to make working with json easier.
Example
>>> from kwutil.util_json import Json >>> import ubelt as ub >>> unserializable_data = { >>> 'a': 'hello world', >>> 'b': ub.udict({'a': 3}), >>> 'c': ub.Path('a/path/object'), >>> } >>> data = Json.ensure_serializable(unserializable_data) >>> text1 = Json.dumps(data, backend='stdlib') >>> # Coerce is idempotent and resolves the input to nested Python >>> # structures. >>> resolved1 = Json.coerce(data) >>> resolved2 = Json.coerce(text1) >>> resolved3 = Json.coerce(resolved2) >>> assert resolved1 == resolved2 == resolved3 == data >>> # with stdlib >>> data2 = Json.loads(text1) >>> assert data2 == data >>> # with ujson >>> # xdoctest: +REQUIRES(module:ujson) >>> data2 = Json.loads(text1, backend='ujson') >>> assert data2 == data
- classmethod coerce(data, backend='stdlib', path_policy='existing_file_with_extension')[source]¶
Example
>>> from kwutil.util_json import Json >>> import ubelt as ub >>> Json.coerce('[1, 2, 3]') [1, 2, 3] >>> fpath = ub.Path.appdir('kwutil/tests/util_json').ensuredir() / 'file.json' >>> fpath.write_text(Json.dumps([4, 5, 6])) >>> Json.coerce(fpath) [4, 5, 6] >>> Json.coerce(str(fpath)) [4, 5, 6] >>> dict(Json.coerce('{"a": "b", "c": "d"}')) {'a': 'b', 'c': 'd'} >>> Json.coerce(None) None
- classmethod debug_unserializable(data, msg='')[source]¶
Raises an exception if the data is not serializable and prints information about it. This is a thin wrapper around
Json.find_unserializable().Example
>>> import kwutil >>> import ubelt as ub >>> data = { >>> 'a': 1, >>> 'b': 2, >>> 'c': ub.Path('/pathlib/object') >>> } >>> try: >>> kwutil.Json.debug_unserializable(data, 'obj had non-json data at: ') >>> except Exception as ex: >>> print(f'Exception: {ex}') Exception: obj had non-json data at: [ {'loc': ['c'], 'data': Path('/pathlib/object')}, ]
- static dump(data, fp, backend='stdlib', **kwargs)[source]¶
Write json data to a file with a chosen backend.
- Parameters:
data (dict | list | int | float | str) – json serializable data.
fp (PathLike | IO) – Where to write the data
backend (str) – stdlib, ujson, or orjson
**kwargs – additional arguments to pass to the specific backend.
- static dumps(data, backend='stdlib', **kwargs)[source]¶
Convert json data to text with a chosen backend.
- Parameters:
data (dict | list | int | float | str) – json serializable data.
backend (str) – stdlib, ujson, or orjson
**kwargs – additional arguments to pass to the specific backend.
- classmethod ensure_serializable(dict_, normalize_containers=False, verbose=0, unhandled_policy='keep')[source]¶
Example
>>> import kwutil >>> import pathlib >>> data = { >>> 'a': 1, >>> 'b': 2, >>> 'c': pathlib.Path('/pathlib/object') >>> } >>> results = kwutil.Json.ensure_serializable(data) >>> print(f'results = {ub.urepr(results, nl=1)}') results = { 'a': 1, 'b': 2, 'c': '/pathlib/object', }
- classmethod find_unserializable(data, quickcheck=False)[source]¶
Example
>>> import kwutil >>> import ubelt as ub >>> data = { >>> 'a': 1, >>> 'b': 2, >>> 'c': ub.Path('/pathlib/object') >>> } >>> results = list(kwutil.Json.find_unserializable(data)) >>> print(f'results = {ub.urepr(results, nl=1)}') results = [ {'loc': ['c'], 'data': Path('/pathlib/object')}, ]
- class kwutil.MoveManager(workers=0, mode='thread', eager=False)[source]¶
Bases:
_FilesystemOperationManagerHelper to execute multiple move operations on a local filesystem.
Todo
[ ] Add optional check that all src paths exist
[ ] Add optional check that all dst paths do not exist
- [ ] Add optional check that that no dst path is or is inside of a src
dpath (would make things ambiguous), the operation graph should be bipartite.
Example
>>> import ubelt as ub >>> from kwutil.fsops_managers import MoveManager >>> dpath = ub.Path.appdir('kwutil', 'tests', 'move_manager') >>> src_dpath = (dpath / 'src').ensuredir() >>> dst_dpath = (dpath / 'dst').delete() >>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)] >>> for fpath in src_fpaths: >>> fpath.touch() >>> moveman = MoveManager(workers=0) >>> for src_fpath in src_fpaths: >>> dst_fpath = src_fpath.augment(dpath=dst_dpath) >>> moveman.submit(src_fpath, dst_fpath) >>> moveman.run() >>> assert len(dst_dpath.ls()) == len(src_fpaths) >>> assert len(src_dpath.ls()) == 0
- Parameters:
workers (int) – number of parallel workers to use
mode (str) – thread, process, or serial
eager (bool) – if True starts copying as soon as a job is submitted, otherwise it wait until run is called.
- _check()[source]¶
Validate that the set of move tasks looks sane.
Exact logic of this is currently in flux.
- _operation_name = 'move'¶
- _worker_func(dst, follow_file_symlinks, follow_dir_symlinks, meta)¶
- Parameters:
str (PathLike | str)
dst (PathLike | str)
- submit(src, dst, skip_existing=False, follow_file_symlinks=False, follow_dir_symlinks=False, meta='stats')[source]¶
- Parameters:
src (str | PathLike) – source file or directory
dst (str | PathLike) – destination file or directory
follow_file_symlinks (bool) – If True and src is a link, the link will be resolved before it is copied (i.e. the data is duplicated), otherwise just the link itself will be copied.
follow_dir_symlinks (bool) – if True when src is a directory and contains symlinks to other directories, the contents of the linked data are copied, otherwise when False only the link itself is copied.
meta (str | None) – Indicates what metadata bits to copy. This can be ‘stats’ which tries to copy all metadata (i.e. like
shutil.copy2()), ‘mode’ which copies just the permission bits (i.e. likeshutil.copy()), or None, which ignores all metadata (i.e. likeshutil.copyfile()).
- class kwutil.MultiPattern(patterns, predicate)[source]¶
Bases:
PatternBase,NiceReprGroups multiple patterns together with an “any” or “all” predicate.
Note
We may remove the idea of a predicate in the future and just use behavior that currently corresponds to the “any” predicate.
Example
>>> import kwutil >>> pat = kwutil.MultiPattern.coerce(['aaa*', 'bbb']) >>> assert not pat.match('aabb') >>> assert pat.match('aaabb') >>> assert pat.match('bbb') >>> assert not pat.match('bbbaaa')
Example
>>> dpath = ub.Path.appdir('xdev/tests/multipattern_paths').ensuredir().delete().ensuredir() >>> (dpath / 'file0.txt').touch() >>> (dpath / 'data0.dat').touch() >>> (dpath / 'other0.txt').touch() >>> ((dpath / 'dir1').ensuredir() / 'file1.txt').touch() >>> ((dpath / 'dir2').ensuredir() / 'file2.txt').touch() >>> ((dpath / 'dir2').ensuredir() / 'file3.txt').touch() >>> ((dpath / 'dir1').ensuredir() / 'data.dat').touch() >>> ((dpath / 'dir2').ensuredir() / 'data.dat').touch() >>> ((dpath / 'dir2').ensuredir() / 'data.dat').touch() >>> pat = MultiPattern.coerce(['*.txt'], 'glob') >>> print(list(pat.paths(cwd=dpath))) >>> pat = MultiPattern.coerce(['*0*', '**/*.txt'], 'glob') >>> print(list(pat.paths(cwd=dpath, recursive=1))) >>> pat = MultiPattern.coerce(['*.txt', '**/*.txt', '**/*.dat'], 'glob') >>> print(list(pat.paths(cwd=dpath)))
- classmethod coerce(data, hint='auto', predicate='any')[source]¶
- Parameters:
data (str | List | Pattern | PathLike | MultiPattern)
hint (str) – can be ‘glob’, ‘regex’, ‘strict’ or ‘auto’. In ‘auto’ we will use ‘glob’ if the input is a string and ‘*’ is in the pattern, otherwise we will use strict. Pattern inputs keep their existing interpretation.
- Returns:
MultiPattern
Example
>>> from kwutil.util_pattern import * # NOQA >>> pat = MultiPattern.coerce('foo*', 'glob') >>> pat2 = MultiPattern.coerce(pat, 'regex') >>> pat3 = MultiPattern.coerce([pat, pat], 'regex') >>> pat4 = MultiPattern.coerce([ub.Path('bar*'), pat], 'regex') >>> print('pat = {}'.format(ub.urepr(pat, nl=1))) >>> print('pat2 = {}'.format(ub.urepr(pat2, nl=1))) >>> print('pat3 = {!r}'.format(pat3)) >>> print('pat4 = {!r}'.format(pat4))
>>> pat00 = MultiPattern.coerce('foo', 'glob') >>> pat01 = MultiPattern.coerce('foo*', 'glob') >>> pat02 = MultiPattern.coerce('foo*', 'regex') >>> pat5 = MultiPattern.coerce(['foo', 'foo*', pat, pat00, pat01, pat02]) >>> print(f'pat5={pat5}')
Example
>>> # Test all acceptable input types >>> from kwutil.util_pattern import * # NOQA >>> import itertools as it >>> str_pat = 'pattern*' >>> scalar_inputs = { >>> 'str': str_pat, >>> 'path': ub.Path(str_pat), >>> 'pat': Pattern.coerce(str_pat), >>> 'mpat': MultiPattern.coerce(str_pat) >>> } >>> # Test scalar input types >>> scalar_outputs = {} >>> for k, v in scalar_inputs.items(): >>> scalar_outputs[k] = MultiPattern.coerce(v) >>> print('scalar_outputs = {}'.format(ub.urepr(scalar_outputs, nl=1))) >>> # >>> # Test iterable input types >>> multi_outputs = [] >>> for v in it.combinations(scalar_inputs.values(), 2): >>> multi_outputs.append(MultiPattern.coerce(v)) >>> for v in it.combinations(scalar_inputs.values(), 3): >>> multi_outputs.append(MultiPattern.coerce(v)) >>> # Higher order nesting test >>> higher_order_output = MultiPattern.coerce(multi_outputs) >>> print('higher_order_output = {}'.format(ub.urepr(higher_order_output, nl=1)))
- match(text)[source]¶
Check if a string matches this multipattern
- Parameters:
text (str) – text to check matches against
Example
>>> # xdoctest: +REQUIRES(module:parse) >>> import kwutil >>> self = kwutil.MultiPattern.coerce([ >>> kwutil.Pattern.coerce('{key1}={val1},{key2}={val2}', hint='parse') >>> ]) >>> text = 'aaa=bbb,ccc=ddd' >>> result = self.match(text) >>> assert result >>> assert result.named['val1'] == 'bbb'
- class kwutil.Pattern(pattern, backend)[source]¶
Bases:
PatternBase,NiceReprProvides a common API to several common pattern matching syntaxes.
A general patterns class, which can use a backend from BACKENDS
- Parameters:
pattern (str | object) – The pattern text or a precompiled backend pattern object
backend (str) – Code indicating what backend the pattern text should be interpreted with. See BACKENDS for available choices.
Notes
# BACKENDS
The glob backend uses the
fnmatchmodule [fnmatch_docs]. The regex backend uses the Pythonremodule. The strict backend uses the “==” string equality testing. The parse backend uses theparsemodule.References
Example
>>> # The most flexible way to define a pattern is using the >>> # coerce method with a prefixed pattern string. >>> import kwutil
>>> # Glob pattern: matches filenames ending in .jpg >>> pat = kwutil.Pattern.coerce('glob:*.jpg') >>> assert pat.match('image.jpg') >>> assert not pat.match('image.png')
>>> # Regex pattern: similar logic using regular expressions >>> pat = kwutil.Pattern.coerce(r'regex:.*\.jpg') >>> assert pat.match('photo.jpg') >>> assert not pat.match('photo.jpeg')
>>> # Strict pattern: exact string match >>> pat = kwutil.Pattern.coerce('strict:hello.jpg') >>> assert pat.match('hello.jpg') >>> assert not pat.match('hello2.jpg')
>>> # Parse pattern: extract named groups from string >>> # xdoctest: +REQUIRES(module:parse) >>> pat = kwutil.Pattern.coerce('parse:{name}.jpg') >>> assert pat.match('cat.jpg').named == {'name': 'cat'} >>> assert pat.match('cat.png') is None
Example
>>> # But you can also explicitly define the backend with a hint. >>> # Test Regex backend >>> repat = Pattern.coerce('foo.*', 'regex') >>> assert repat.match('foobar') >>> assert not repat.match('barfoo') >>> match = repat.search('baz-biz-foobar') >>> match = repat.match('baz-biz-foobar') >>> # Test Glob backend >>> globpat = Pattern.coerce('foo*', 'glob') >>> assert globpat.match('foobar') >>> assert not globpat.match('barfoo') >>> globpat = Pattern.coerce('[foo|bar]', 'glob') >>> assert not globpat.match('foo')
Example
>>> # xdoctest: +REQUIRES(module:parse) >>> # Test parse backend >>> pattern1 = Pattern.coerce('A {adjective} pattern', 'parse') >>> result1 = pattern1.match('A cool pattern') >>> print(f'result1.named = {ub.urepr(result1.named, nl=1)}') >>> pattern2 = pattern1.to_regex() >>> result2 = pattern2.match('A cool pattern')
- _prefix_mappings = {'exact:': 'strict', 'glob:': 'glob', 'parse:': 'parse', 'regex:': 'regex', 'strict:': 'strict'}¶
- classmethod coerce(data, hint='auto')[source]¶
Attempt to automatically interpret the input data with the appropriate pattern backend. If it cannot be determined, then fallback to the hint.
- Parameters:
data (str | Pattern | PathLike) – an input string or existing object
hint (str) – can be ‘glob’, ‘regex’, ‘strict’ or ‘auto’. In ‘auto’ we will use ‘glob’ if the input is a string and ‘*’ is in the pattern, otherwise we will use strict. Pattern inputs keep their existing interpretation.
Example
>>> import kwutil >>> # Coerce assumes glob if there is a star >>> pat = kwutil.Pattern.coerce('foo*') >>> bool(pat.match('foobar')) True >>> # Otherwise it is a strict match >>> pat = kwutil.Pattern.coerce('foo') >>> bool(pat.match('foobar')) False
Example
>>> # xdoctest: +REQUIRES(module:parse) >>> import kwutil >>> # The hint can explicitly specify the backend to use >>> pat1 = kwutil.Pattern.coerce('foo.*', 'glob') >>> pat2 = kwutil.Pattern.coerce('foo.*', 'regex') >>> pat3 = kwutil.Pattern.coerce('foo.{}*', 'parse') >>> inputs = ['spam', 'foobar', 'foo.bar', 'foo.bar*'] >>> print([bool(pat1.match(x)) for x in inputs]) >>> print([bool(pat2.match(x)) for x in inputs]) >>> print([bool(pat3.match(x)) for x in inputs]) [False, False, True, True] [False, True, True, True] [False, False, False, True]
Example
>>> # The hint can explicitly specify the backend to use >>> import kwutil >>> pat = kwutil.Pattern.coerce('foo*', 'glob') >>> # A hint is ignored if the input data is not a string >>> pat2 = kwutil.Pattern.coerce(pat, 'regex') >>> assert pat2.backend == 'glob'
Example
>>> from kwutil.util_pattern import * # NOQA >>> assert Pattern.coerce('glob:*.jpg').backend == 'glob' >>> assert Pattern.coerce('regex:.*\.jpg').backend == 'regex' >>> assert Pattern.coerce('exact:hello.jpg').backend == 'strict' >>> assert Pattern.coerce('strict:hello.jpg').backend == 'strict' >>> assert Pattern.coerce('hello*.jpg').backend == 'glob' >>> assert Pattern.coerce('hello.jpg').backend == 'strict' >>> assert Pattern.coerce('nopat:data').backend == 'strict' >>> assert Pattern.coerce('foo').backend == 'strict' >>> assert Pattern.coerce('foo*').backend == 'glob' >>> assert Pattern.coerce(re.compile('foo*')) .backend == 'regex' >>> # xdoctest: +REQUIRES(module:parse) >>> assert Pattern.coerce('parse:hello.jpg').backend == 'parse'
- classmethod coerce_backend(data, hint='auto')[source]¶
Example
>>> assert Pattern.coerce_backend('foo', hint='auto')[1] == 'strict' >>> assert Pattern.coerce_backend('foo*', hint='auto')[1] == 'glob' >>> assert Pattern.coerce_backend(re.compile('foo*'), hint='auto')[1] == 'regex'
- classmethod from_regex(data, flags=0, multiline=False, dotall=False, ignorecase=False)[source]¶
Create a Pattern object with a regex backend.
- paths(cwd=None, recursive=False)[source]¶
Find paths in the filesystem that match this pattern
- Yields:
ub.Path
- sub(repl, text, count=-1)[source]¶
- Parameters:
repl (str) – text to insert in place of pattern
text (str) – text to be searched and modified
count (int) – if non-negative, the maximum number of replacements that will be made.
- to_regex()[source]¶
Returns an equivalent pattern with the regular expression backend
- Returns:
Pattern
Example
>>> globpat = Pattern.coerce('foo*', 'glob') >>> strictpat = Pattern.coerce('foo*', 'strict') >>> repat1 = strictpat.to_regex() >>> repat2 = globpat.to_regex() >>> print(f'repat1={repat1}') >>> print(f'repat2={repat2}')
- class kwutil.ProcessContext(name=None, type='process', args=None, config=None, extra=None, track_emissions=False, request_all_telemetry=True, request_most_telemetry=True, output_dpath=None, output_fpath=None)[source]¶
Bases:
objectContext manager to track the context under which a result was computed.
This tracks things like start / end time. The command line that can reproduce the process (assuming an appropriate environment. The configuration the process was run with. The machine details the process was run on. The power usage / carbon emissions the process used, and other information.
- Parameters:
args (str | List[str]) – This should be the sys.argv or the command line string that can be used to rerun the process
config (Dict) – This should be a configuration dictionary (likely based on sys.argv)
name (str) – the name of this process
type (str) – The type of this process (usually keep the default of process)
request_all_telemetry (bool) – if False, telemetry is disabled. This is forced to False if PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY is in the environment.
request_most_telemetry (bool) – if False, telemetry is disabled. This is forced to False if PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY is in the environment.
Note
This module provides telemetry, which records user-identifiable information. While useful, it does raise ethical concerns about user privacy, and the people running this code have a right to know about it and opt out. Notably, this module simply records the information, but does not send it anywhere. As such, a default opt-in is reasonable, but any future work that sends this information anywhere must be opt-out by default.
Note
There are two levels of telemetry.
Environment telemetry. These are things like the machine the code was run on. Use PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY=0 to opt-out.
The start / stop / sys.argv / config objects are necessary for mlops to do anything. But these can leak information by containing system paths. Emissions is also in this category. Use PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY to opt out.
CommandLine
xdoctest -m kwutil.process_context ProcessContext
Example
>>> # xdoctest: +REQUIRES(module:psutil) >>> from kwutil.process_context import * >>> import rich >>> # Adding things like disk info an tracking emission usage >>> self = ProcessContext(track_emissions='offline') >>> obj1 = self.start().stop() >>> self.add_disk_info('.') >>> # >>> # Telemetry can be mostly disabled >>> self = ProcessContext(track_emissions='offline', request_most_telemetry=False) >>> obj2 = self.start().stop() >>> self.add_disk_info('.') >>> # Telemetry can be completely disabled >>> self = ProcessContext(track_emissions='offline', request_all_telemetry=False) >>> obj3 = self.start().stop() >>> self.add_disk_info('.') >>> rich.print('full_telemetry = {}'.format(ub.urepr(obj1, nl=3))) >>> rich.print('some_telemetry = {}'.format(ub.urepr(obj2, nl=3))) >>> rich.print('no_telemetry = {}'.format(ub.urepr(obj3, nl=3)))
Example
>>> # xdoctest: +REQUIRES(module:psutil) >>> from kwutil.process_context import * >>> # flush can measure intermediate progress >>> self = ProcessContext(track_emissions=True) >>> self.add_disk_info('.') >>> obj1 = self.start().flush() >>> obj1_orig = obj1.copy() >>> obj2 = self.stop()
- add_device_info(device)[source]¶
Add information about a torch device that was used in this process.
Does nothing if telemetry is disabled.
- Parameters:
device (torch.device) – torch device to add info about
Example
>>> # xdoctest: +REQUIRES(module:torch) >>> from kwutil.process_context import * >>> import torch >>> import rich >>> device = torch.device(0) if torch.cuda.is_available() else torch.device('cpu') >>> # Adding things like disk info an tracking emission usage >>> self = ProcessContext(track_emissions='offline') >>> obj1 = self.start().stop() >>> self.add_disk_info('.') >>> self.add_device_info(device) >>> # >>> # Telemetry can be mostly disabled >>> self = ProcessContext(track_emissions='offline', request_most_telemetry=False) >>> obj2 = self.start().stop() >>> self.add_disk_info('.') >>> self.add_device_info(device) >>> # Telemetry can be completely disabled >>> self = ProcessContext(track_emissions='offline', request_all_telemetry=False) >>> obj3 = self.start().stop() >>> self.add_disk_info('.') >>> self.add_device_info(device) >>> rich.print('full_telemetry = {}'.format(ub.urepr(obj1, nl=3))) >>> rich.print('some_telemetry = {}'.format(ub.urepr(obj2, nl=3))) >>> rich.print('no_telemetry = {}'.format(ub.urepr(obj3, nl=3)))
- add_disk_info(path)[source]¶
Add information about a storage disk that was used in this process
Does nothing if telemetry is disabled.
- property is_running¶
Has the context object started and not yet been stopped?
- property is_started¶
Has the context object ever started? This can still return True if it has stopped.
- class kwutil.ProgressManager(backend='rich', **kwargs)[source]¶
Bases:
BaseProgIterManagerA progress manager.
Manage multiple progress bars, either with rich or ProgIter.
CommandLine
xdoctest -m kwutil.util_progress ProgressManager:0 xdoctest -m kwutil.util_progress ProgressManager:1 xdoctest -m kwutil.util_progress ProgressManager:2
Example
>>> from kwutil.util_progress import ProgressManager >>> from progiter import progiter >>> # Can use plain progiter or rich >>> # The usecase for plain progiter is when threads / live output >>> # is not desirable and you just want plain stdout progress >>> pman = ProgressManager(backend='progiter') >>> with pman: >>> oprog = pman.progiter(range(20), desc='outer loop', verbose=3) >>> for i in oprog: >>> oprog.set_postfix(f'Doing step {i}', refresh=False) >>> for i in pman.progiter(range(100), desc=f'inner loop {i}'): >>> pass >>> # xdoctest: +REQUIRES(module:rich) >>> self = pman = ProgressManager(backend='rich') >>> pman = ProgressManager(backend='rich') >>> with pman: >>> oprog = pman.progiter(range(20), desc='outer loop', verbose=3) >>> for i in oprog: >>> oprog.set_postfix(f'Doing step {i}', refresh=False) >>> for i in pman.progiter(range(100), desc=f'inner loop {i}'): >>> pass
Example
>>> # xdoctest: +REQUIRES(module:rich) >>> # A fairly complex example >>> from kwutil.util_progress import ProgressManager >>> import time >>> delay = 0.00005 >>> N_inner = 300 >>> N_outer = 11 >>> self = pman = ProgressManager(backend='rich') >>> with pman: >>> oprog = pman(range(N_outer), desc='outer loop') >>> for i in oprog: >>> if i > 7: >>> self.update_info(f'The info panel gives detailed updates\nWe are now at step {i}\nWe are just about done now') >>> elif i > 5: >>> self.update_info(f'The info panel gives detailed updates\nWe are now at step {i}') >>> oprog.set_postfix(f'Doing step {i}') >>> N = 1000 >>> for j in pman(iter(range(N_inner)), total=None if i % 2 == 0 else N_inner, desc=f'inner loop {i}', transient=i < 4): >>> time.sleep(delay)
Example
>>> # xdoctest: +REQUIRES(module:rich) >>> # Test complex example over a grid of parameters >>> from kwutil.util_progress import ProgressManager, ProgIter2 >>> import time >>> delay = 0.000005 >>> N_inner = 300 >>> N_outer = 11 >>> basis = { >>> 'with_info': [0, 1], >>> 'backend': ['progiter', 'rich'], >>> 'enabled': [0, 1], >>> #'with_info': [1], >>> } >>> grid = list(ub.named_product(basis)) >>> grid_prog = ProgIter2(grid, desc='Test cases over grid', verbose=3) >>> grid_prog.update_info('Here we go') >>> for item in grid: >>> grid_prog.ensure_newline() >>> grid_prog.update_info(f'Running grid test {ub.urepr(item, nl=1)}') >>> print('\n\n') >>> self = ProgressManager(backend=item['backend'], enabled=item['enabled']) >>> with self: >>> outer_prog = self.progiter(range(N_outer), desc='outer loop') >>> for i in outer_prog: >>> if item['with_info']: >>> if i > 7: >>> outer_prog.update_info(f'The info panel gives detailed updates\nWe are now at step {i}\nWe are just about done now') >>> elif i > 5: >>> outer_prog.update_info(f'The info panel gives detailed updates\nWe are now at step {i}') >>> outer_prog.set_postfix(f'Doing step {i}') >>> inner_kwargs = dict( >>> total=None if i % 2 == 0 else N_inner, >>> transient=i < 4, >>> time_thresh=delay * 2.3, >>> desc=f'inner loop {i}', >>> ) >>> for j in self.progiter(iter(range(N_inner)), **inner_kwargs): >>> time.sleep(delay) >>> grid_prog.update_info(f'Finished test item')
Example
>>> # xdoctest: +REQUIRES(module:rich) >>> # Demo manual usage >>> from kwutil.util_progress import ProgressManager >>> from kwutil import util_progress >>> import time >>> pman = ProgressManager() >>> pman.start() >>> task1 = pman.progiter(desc='task1', total=100) >>> task2 = pman.progiter(desc='task2') >>> for i in range(100): >>> task1.update() >>> task2.update(2) >>> time.sleep(0.001) >>> ProgressManager.stopall()
Example
>>> # Demo manual usage (progiter backend) >>> from kwutil.util_progress import ProgressManager >>> from kwutil import util_progress >>> import time >>> pman = ProgressManager(backend='progiter', adjust=0, freq=1) >>> pman.start() >>> task1 = pman.progiter(desc='task1', total=12) >>> task2 = pman.progiter(desc='task2') >>> task1.update() >>> task2.update() >>> for i in range(10): >>> time.sleep(0.001) >>> task1.update() >>> time.sleep(0.001) >>> task2.update(2) >>> ProgressManager.stopall()
- ProgIter(*args, **kw)¶
- property _is_main_manager¶
- class kwutil.Superlock(lock_fpath=NoParam, thread_key=NoParam)[source]¶
Bases:
objectA thread and/or process lock
The lockiest lock that ever did lock… or at least an attempt at it.
This is experimental and not well tested.
If lock_fpath is NoParam, uses a global shared process lock. If None, then no process lock is used.
If thread_key is NoParam, uses a global shared thread lock. If None, then no thread lock is used.
Otherwise locks with the same process_fpath OR thread_key will not execute concurrently, up to system limitations of the locking mechanisms.
Uses [Fasteners] for the process-based file-locks, which do have fundamental issues [OnFileLocks].
TODO: Evaluate [FileLock] as an alternative.
References
SeeAlso:
Example
>>> # xdoctest: +REQUIRES(module:fasteners) >>> self = Superlock() >>> with self: >>> print('non-concurent code')
Example
>>> # xdoctest: +REQUIRES(module:fasteners) >>> from kwutil.util_locks import * # NOQA >>> import ubelt as ub >>> lock1 = Superlock() >>> lock2 = Superlock() >>> assert lock1.acquire(timeout=10) >>> assert not lock2.acquire(timeout=0.01) >>> lock1.release() >>> assert lock2.acquire() >>> lock2.release()
Example
>>> # Demonstrate a real world case with thread locks >>> # xdoctest: +REQUIRES(module:fasteners) >>> import time >>> from kwutil.util_locks import Superlock >>> import ubelt as ub >>> # >>> shared_counter = [] >>> # >>> def task(i): ... with Superlock(): ... # simulate work inside critical section ... current_len = len(shared_counter) >>> print(f'current_len={current_len}') ... time.sleep(0.05) ... shared_counter.append(i) ... # ensure no concurrent execution by checking counter length did not change during sleep ... assert len(shared_counter) == current_len + 1 ... return i >>> # >>> with ub.Executor(mode='thread', max_workers=4) as executor: ... results = list(executor.map(task, range(8))) >>> # >>> sorted(results) == list(range(8)) True >>> len(shared_counter) == 8 True
Example
>>> # Demonstrate a real world case with process locks >>> # xdoctest: +REQUIRES(module:fasteners) >>> # xdoctest: +SKIP('xdoctest does not support pickled functions yet') >>> import time >>> import ubelt as ub >>> from pathlib import Path >>> # >>> dpath = ub.Path.appdir('kwutil/tests/superlock').ensuredir() >>> counter_fpath = dpath / 'shared_counter.txt' >>> counter_fpath.write_text('0') >>> # >>> def task(i): ... import time ... from pathlib import Path ... from kwutil.util_locks import Superlock ... lock = Superlock() ... counter_fpath = Path(ub.Path.appdir('kwutil/tests/superlock') / 'shared_counter.txt') ... with lock: ... current = int(counter_fpath.read_text()) ... time.sleep(0.05) # simulate some work ... counter_fpath.write_text(str(current + 1)) ... return i >>> # >>> with ub.Executor(mode='process', max_workers=4) as executor: ... results = list(executor.map(task, range(8))) >>> # >>> sorted(results) == list(range(8)) True >>> final_value = int(counter_fpath.read_text()) >>> final_value == 8 True
- GLOBAL_APPNAME = 'fasteners_ext/file_locks'¶
- GLOBAL_LOCK_FNAME = 'superlock.lock'¶
- GLOBAL_THREAD_KEY = '__GLOBAL_THREAD_LOCK__'¶
- THREAD_LOCKS = <WeakValueDictionary>¶
- property global_lock_fpath¶
- class kwutil.XML[source]¶
Bases:
objectA thin wrapper around [xmltodict].
References
Example
>>> # xdoctest: +REQUIRES(module:xmltodict) >>> from kwutil.util_xml import * # NOQA >>> import ubelt as ub >>> text = ub.codeblock( ''' <mydocument has="an attribute"> <and> <many>elements</many> <many>more elements</many> </and> <plus a="complex"> element as well </plus> </mydocument> ''') >>> data = XML.loads(text) >>> print(f'data = {ub.urepr(data, nl=-1)}') >>> recon = XML.dumps(data, pretty=True) >>> print(recon)
- class kwutil.Yaml[source]¶
Bases:
objectNamespace for yaml functions
Example
>>> # xdoctest: +REQUIRES(module:ruamel.yaml) >>> from kwutil.util_yaml import Yaml >>> import ubelt as ub >>> data = { >>> 'a': 'hello world', >>> 'b': ub.udict({'a': 3}) >>> } >>> text1 = Yaml.dumps(data, backend='ruamel') >>> # Coerce is idempotent and resolves the input to nested Python >>> # structures. >>> resolved1 = Yaml.coerce(data) >>> resolved2 = Yaml.coerce(text1) >>> resolved3 = Yaml.coerce(resolved2) >>> assert resolved1 == resolved2 == resolved3 == data >>> # with ruamel >>> data2 = Yaml.loads(text1) >>> assert data2 == data >>> # with pyyaml >>> data2 = Yaml.loads(text1, backend='pyyaml') >>> assert data2 == data
- static Dict(data)[source]¶
Get a ruamel-enhanced dictionary
Example
>>> # xdoctest: +REQUIRES(module:pyyaml) >>> # xdoctest: +REQUIRES(module:ruamel.yaml) >>> data = {'a': 'avalue', 'b': 'bvalue'} >>> data = Yaml.Dict(data) >>> data.yaml_set_start_comment('hello') >>> # Note: not working https://sourceforge.net/p/ruamel-yaml/tickets/400/ >>> data.yaml_set_comment_before_after_key('a', before='a comment', indent=2) >>> data.yaml_set_comment_before_after_key('b', 'b comment') >>> print(Yaml.dumps(data))
- static coerce(data, backend='ruamel', path_policy='existing_file_with_extension')[source]¶
Attempt to convert input into a parsed yaml / json data structure. If the data looks like a path, it tries to load and parse file contents. If the data looks like a yaml/json string it tries to parse it. If the data looks like parsed data, then it returns it as-is.
- Parameters:
data (str | PathLike | dict | list)
backend (str) – either ruamel or pyyaml
path_policy (str) – Determines how we determine if something looks like a path. Pre 0.3.2 behavior is from path_policy=’existing_file’. Default is ‘existing_file_with_extension’. Can also be ‘never’ to disable the path feature and decrease ambiguity.
- Returns:
parsed yaml data
- Return type:
Note
The input to the function cannot distinguish a string that should be loaded and a string that should be parsed. If it looks like a file that exists it will read it. To avoid this coerner case use this only for data where you expect the output is a List or Dict.
References
https://stackoverflow.com/questions/528281/how-can-i-include-a-yaml-file-inside-another
Example
>>> # xdoctest: +REQUIRES(module:pyyaml) >>> # xdoctest: +REQUIRES(module:ruamel.yaml) >>> from kwutil.util_yaml import Yaml >>> text = ub.codeblock( ''' - !!float nan - !!float inf - nan - inf # Seems to break older ruamel.yaml 0.17.21 # - .nan # - .inf - null ''') >>> Yaml.coerce(text, backend='pyyaml') >>> Yaml.coerce(text, backend='ruamel')
Example
>>> # xdoctest: +REQUIRES(module:pyyaml) >>> # xdoctest: +REQUIRES(module:ruamel.yaml) >>> Yaml.coerce('"[1, 2, 3]"') [1, 2, 3] >>> fpath = ub.Path.appdir('cmd_queue/tests/util_yaml').ensuredir() / 'file.yaml' >>> fpath.write_text(Yaml.dumps([4, 5, 6])) >>> Yaml.coerce(fpath) [4, 5, 6] >>> Yaml.coerce(str(fpath)) [4, 5, 6] >>> dict(Yaml.coerce('{a: b, c: d}')) {'a': 'b', 'c': 'd'} >>> Yaml.coerce(None) None
Example
>>> # xdoctest: +REQUIRES(module:pyyaml) >>> # xdoctest: +REQUIRES(module:ruamel.yaml) >>> assert Yaml.coerce('') is None
Example
>>> # xdoctest: +REQUIRES(module:pyyaml) >>> # xdoctest: +REQUIRES(module:ruamel.yaml) >>> dpath = ub.Path.appdir('cmd_queue/tests/util_yaml').ensuredir() >>> fpath = dpath / 'external.yaml' >>> fpath.write_text(Yaml.dumps({'foo': 'bar'})) >>> text = ub.codeblock( >>> f''' >>> items: >>> - !include {dpath}/external.yaml >>> ''') >>> data = Yaml.coerce(text, backend='ruamel') >>> print(Yaml.dumps(data, backend='ruamel')) items: - foo: bar
>>> text = ub.codeblock( >>> f''' >>> items: >>> !include [{dpath}/external.yaml, blah, 1, 2, 3] >>> ''') >>> data = Yaml.coerce(text, backend='ruamel') >>> print('data = {}'.format(ub.urepr(data, nl=1))) >>> print(Yaml.dumps(data, backend='ruamel'))
- static dumps(data, backend='ruamel', version=None)[source]¶
Dump yaml to a string representation (and account for some of our use-cases)
- Parameters:
data (Any) – yaml representable data
backend (str) – either ruamel or pyyaml
version (str) – version of YAML spec to use. (e.g. ‘1.1’)
- Returns:
yaml text
- Return type:
Example
>>> # xdoctest: +REQUIRES(module:pyyaml) >>> # xdoctest: +REQUIRES(module:ruamel.yaml) >>> import ubelt as ub >>> data = { >>> 'a': 'hello world', >>> 'b': ub.udict({'a': 3}) >>> } >>> text2 = Yaml.dumps(data, backend='pyyaml') >>> print(text2) >>> text1 = Yaml.dumps(data, backend='ruamel') >>> print(text1) >>> assert text1 == text2 >>> print(Yaml.dumps({'key': 'on'}, backend='ruamel', version='1.1')) %YAML 1.1 --- key: 'on'
- static load(file, backend='ruamel', version=None)[source]¶
Load yaml from a file
- Parameters:
file (io.TextIOBase | PathLike | str) – yaml file path or file object
backend (str) – either ruamel or pyyaml
- Returns:
object
Example
>>> # xdoctest: +REQUIRES(module:pyyaml) >>> # xdoctest: +REQUIRES(module:ruamel.yaml) >>> from kwutil.util_yaml import Yaml >>> import ubelt as ub >>> data = { >>> 'a': 'hello world', >>> 'b': ub.udict({'a': 3}) >>> } >>> text1 = Yaml.dumps(data, backend='ruamel') >>> import io >>> # with ruamel >>> file = io.StringIO(text1) >>> data2 = Yaml.load(file) >>> assert data2 == data >>> # with pyyaml >>> file = io.StringIO(text1) >>> data2 = Yaml.load(file, backend='pyyaml') >>> assert data2 == data
- static loads(text, backend='ruamel', version=None)[source]¶
Load yaml from a text
- Parameters:
text (str) – yaml text
backend (str) – either ruamel or pyyaml
- Returns:
object
Example
>>> # xdoctest: +REQUIRES(module:pyyaml) >>> # xdoctest: +REQUIRES(module:ruamel.yaml) >>> import ubelt as ub >>> data = { >>> 'a': 'hello world', >>> 'b': ub.udict({'a': 3}) >>> } >>> print('data = {}'.format(ub.urepr(data, nl=1))) >>> print('---') >>> text = Yaml.dumps(data) >>> print(ub.highlight_code(text, 'yaml')) >>> print('---') >>> data2 = Yaml.loads(text) >>> assert data == data2 >>> data3 = Yaml.loads(text, backend='pyyaml') >>> print('data2 = {}'.format(ub.urepr(data2, nl=1))) >>> print('data3 = {}'.format(ub.urepr(data3, nl=1))) >>> assert data == data3
- kwutil.coerce_num_workers(num_workers='auto', minimum=0)[source]¶
Return some number of CPUs based on a chosen heuristic
- Parameters:
num_workers (int | str) – A special string code, or an exact number of cpus
minimum (int) – minimum workers we are allowed to return
- Returns:
number of available cpus based on request parameters
- Return type:
CommandLine
xdoctest -m kwutil.util_parallel coerce_num_workers
Example
>>> # xdoctest: +REQUIRES(module:psutil) >>> from kwutil.util_parallel import * # NOQA >>> print(coerce_num_workers('all')) >>> print(coerce_num_workers('avail')) >>> print(coerce_num_workers('auto')) >>> print(coerce_num_workers('all-2')) >>> print(coerce_num_workers('avail-2')) >>> print(coerce_num_workers('all/2')) >>> print(coerce_num_workers('min(all,2)')) >>> #print(coerce_num_workers('[max(all,2)][0]')) >>> import pytest >>> with pytest.raises(Exception): >>> print(coerce_num_workers('all + 1' + (' + 1' * 100))) >>> total_cpus = coerce_num_workers('all') >>> assert coerce_num_workers('all-2') == max(total_cpus - 2, 0) >>> assert coerce_num_workers('all-100') == max(total_cpus - 100, 0) >>> assert coerce_num_workers('avail') <= coerce_num_workers('all') >>> assert coerce_num_workers(3) == max(3, 0)
- kwutil.ensure_rng(rng=None, api='python')[source]¶
Coerces input into a random number generator.
This function is useful for ensuring that your code uses a controlled internal random state that is independent of other modules.
If the input is None, then a global random state is returned.
If the input is a numeric value, then that is used as a seed to construct a random state.
If the input is a random number generator, then another random number generator with the same state is returned. Depending on the api, this random state is either return as-is, or used to construct an equivalent random state with the requested api.
- Parameters:
rng (int | float | None | numpy.random.RandomState | random.Random) – if None, then defaults to the global rng. Otherwise this can be an integer or a RandomState class. Defaults to the global random.
api (str) – specify the type of random number generator to use. This can either be ‘numpy’ for a
numpy.random.RandomStateobject or ‘python’ for arandom.Randomobject. Defaults to numpy.
- Returns:
rng - either a numpy or python random number generator, depending on the setting of
api.- Return type:
Example
>>> # xdoctest: +REQUIRES(module:numpy) >>> from kwutil.util_random import * # NOQA >>> from kwutil.util_random import ensure_rng >>> rng = ensure_rng(None) >>> ensure_rng(0, 'python').randint(0, 1000) 864 >>> # xdoctest: +REQUIRES(module:numpy) >>> import numpy as np >>> ensure_rng(np.random.RandomState(1)).randint(0, 1000) 427
Example
>>> from kwutil.util_random import * # NOQA >>> from kwutil.util_random import ensure_rng >>> num = 4 >>> print('--- Python as PYTHON ---') >>> py_rng = random.Random(0) >>> pp_nums = [py_rng.random() for _ in range(num)] >>> print(pp_nums) >>> print('--- Numpy as PYTHON ---') >>> # xdoctest: +REQUIRES(module:numpy) >>> import numpy as np >>> np_rng = ensure_rng(random.Random(0), api='numpy') >>> np_nums = [np_rng.rand() for _ in range(num)] >>> print(np_nums) >>> print('--- Numpy as NUMPY---') >>> np_rng = np.random.RandomState(seed=0) >>> nn_nums = [np_rng.rand() for _ in range(num)] >>> print(nn_nums) >>> print('--- Python as NUMPY---') >>> py_rng = ensure_rng(np.random.RandomState(seed=0), api='python') >>> pn_nums = [py_rng.random() for _ in range(num)] >>> print(pn_nums) >>> assert np_nums == pp_nums >>> assert pn_nums == nn_nums
Example
>>> # Test that random modules can be coerced >>> # xdoctest: +REQUIRES(module:numpy) >>> from kwutil.util_random import * # NOQA >>> import random >>> import numpy as np >>> ensure_rng(random, api='python') >>> ensure_rng(random, api='numpy') >>> ensure_rng(np.random, api='python') >>> ensure_rng(np.random, api='numpy')
- kwutil.envflag(key, default=None, environ=None)[source]¶
Determine if an environment variable is specified and truthy or falsy.
- Parameters:
key (str) – the environment variable name to check
default (Any) – the default value to return if the environment variable is not specified.
environ (None | Dict) – Uses this to get the environment variable. If unspecified, defaults to
os.environ.
- Returns:
True if the environment variable exist and matches a truthy pattern. (e.g. true, on, yes, 1, or t). Otherwise returns False.
Note
This will return false on any setting of the environ that is not truthy. (e.g. YESPLEASE is not a registered TRUTHY_ENVIRON so it will return False).
Example
>>> from kwutil import util_environ >>> environ = { >>> 'foo': '1', >>> 'bar': 'YES', >>> 'baz': '0', >>> 'biz': '1111', >>> } >>> assert util_environ.envflag('foo', 0, environ=environ) >>> assert util_environ.envflag('bar', 0, environ=environ) >>> assert not util_environ.envflag('baz', 0, environ=environ) >>> assert not util_environ.envflag('biz', 0, environ=environ) >>> assert not util_environ.envflag('buzz', 0, environ=environ) >>> assert util_environ.envflag('buzz', 1, environ=environ)
- kwutil.safeeval(expression, context={}, safenodes=None, addnodes=None, funcs=None, attrs=None)[source]¶
C-style simplified wrapper, eval() replacement.
- Parameters:
expr (str) – the expression to evaluate
context (dict) – Optional dictionary of variables to make available during evaluation.
safenodes (List[str] | None) – Specify the name of allowed AST nodes, if unspecified a default list is used.
addnodes (List[str] | None) – List of additional AST node names to allow in addition to safenodes.
funcs (List[str]) – list of allowed function names.
attrs (List[str]) – list of allowed attribute names.
- Returns:
the result of the expression
- Return type:
Any
- Raises:
ExecutionException - if the expression fails to execute –
CompilationException - if the expression fails to parse –
ValidationException - if the expression fails safety checks –
Example
>>> from kwutil.util_eval import safeeval >>> safeeval('3 + 2') 5 >>> safeeval('max(3, 2)', addnodes=['Call'], funcs=['max']) 3 >>> safeeval('x * 2', context={'x': 5}) 10 >>> safeeval('len(lst)', context={'lst': [1, 2, 3]}, addnodes=['Call'], funcs=['len']) 3 >>> safeeval('obj.value', context={'obj': type("O", (), {"value": 42})()}, addnodes=['Attribute'], attrs=['value']) 42 >>> import pytest >>> with pytest.raises(ValidationException): ... safeeval('exec("import os")') >>> with pytest.raises(ValidationException): ... safeeval('os.system("ls")', context={'os': __import__('os')}, addnodes=['Call', 'Attribute'], funcs=[])