Source code for kwutil.util_dotdict

"""
Utilities for dictionaries where dots in keys represent nestings

Related Work:
    https://pypi.org/project/python-benedict/

    https://pypi.org/project/jmespath/

    https://pypi.org/project/jmespath/

    https://pypi.org/project/dotmap/

    https://pypi.org/project/glom/

    https://pypi.org/project/dpath/

    https://pypi.org/project/python-box/

    https://pypi.org/project/omegaconf/
"""
import ubelt as ub
import weakref
from functools import cached_property


[docs] class DotDict(ub.UDict): """ A flat dictionary representation of a nested structure. This provides some similar functionality to benedict Example: >>> from kwutil.util_dotdict import * # NOQA >>> self = DotDict({ >>> 'proc1.param1': 1, >>> 'proc1.param2': 2, >>> 'proc2.param1': 3, >>> 'proc2.param2': 4, >>> 'proc3.param1': 5, >>> 'proc3.param2': 6, >>> 'proc4.part1.param1': 7, >>> 'proc4.part1.param2': 8, >>> 'proc4.part2.param2': 9, >>> }) >>> self.prefix_subdict('proc4', strip=True) {'part1.param1': 7, 'part1.param2': 8, 'part2.param2': 9} >>> nested = self.to_nested() >>> recon = DotDict.from_nested(nested) >>> assert nested != self >>> assert recon == self """ # def __init__(self, /, *args, **kwargs): # super().__init__(*args, **kwargs)
[docs] @cached_property def _trie_cache(self): # The prefix tree backend allows us to efficiently query items as if # the flat data structure was nested. # FIXME: we do not invalidate the cache on mutations yet. # NOTE: using the trie backend is almost always slower than a loop, # theoretically it should be faster, but we need a better # implementation return _TrieCache(self)
[docs] @classmethod def from_nested(cls, data, prefix=None): """ Construct this flat representation from a nested one Args: data (Dict): nested data Returns: DotDict Example: >>> data = { >>> 'type': 'process', >>> 'properties': { >>> 'machine': { >>> 'os_name': 'Linux', >>> 'arch': 'x86_64', >>> 'py_impl': 'CPython', >>> } >>> } >>> } >>> from kwutil.util_dotdict import DotDict >>> flat = DotDict.from_nested(data) >>> print(f'flat = {ub.urepr(flat, nl=2)}') flat = { 'type': 'process', 'properties.machine.os_name': 'Linux', 'properties.machine.arch': 'x86_64', 'properties.machine.py_impl': 'CPython', } Example: >>> # By default lists are not flattened, only contiguous nested >>> # dicts >>> data = { >>> 'type': 'measure', >>> 'results': [ >>> {'score': 1}, >>> {'score': 2}, >>> {'score': 3}, >>> ]} >>> from kwutil.util_dotdict import DotDict >>> flat = DotDict.from_nested(data, prefix='my') >>> print(f'flat = {ub.urepr(flat, nl=2, sort=1)}') flat = { 'my.results': [ {'score': 1}, {'score': 2}, {'score': 3}, ], 'my.type': 'measure', } """ # flat = cls() # walker = ub.IndexableWalker(data, list_cls=tuple()) # for path, value in walker: # if not isinstance(value, dict): # spath = list(map(str, path)) # key = '.'.join(spath) # flat[key] = value ### # Slightly faster than the above implementation flat = cls() stack = [(prefix if prefix else "", data)] while stack: prefix, node = stack.pop() if isinstance(node, dict): for k, v in reversed(node.items()): key = f"{prefix}.{k}" if prefix else str(k) stack.append((key, v)) else: flat[prefix] = node return flat
[docs] def to_nested(self): """ Converts this flat DotDict into a nested representation. I.e. keys are broken using the "." separtor, with each separator becoming a new nesting level. Example: >>> from kwutil.util_dotdict import * # NOQA >>> self = DotDict(**{ >>> 'foo.bar.baz': 1, >>> 'foo.bar.biz': 1, >>> 'foo.spam': 1, >>> 'eggs.spam': 1, >>> }) >>> nested = self.to_nested() >>> print(f'nested = {ub.urepr(nested, nl=2)}') nested = { 'foo': { 'bar': {'baz': 1, 'biz': 1}, 'spam': 1, }, 'eggs': { 'spam': 1, }, } """ auto = ub.AutoDict() # Note: this could be much more efficient if we build the tree # incrementally by splitting all keys, and then grouping based on # prefix, and then popping them off as we descend down the tree. This # would prevent repeated walking down the tree with IndexableWalker's # setitem. walker = ub.IndexableWalker(auto) d = self for k, v in d.items(): path = k.split('.') walker[path] = v return auto.to_dict()
[docs] def suffix_subdict(self, suffix, backend='loop'): """ Retrieve all key-value pairs whose keys end with a given dot-suffix. Args: suffix (str | List[str]): one or more dot-separated suffix strings default: fallback if no matches found backend (str): 'trie' or 'loop' Returns: DotDict: subdictionary containing the suffix Example: >>> from kwutil.util_dotdict import * # NOQA >>> self = DotDict({ >>> 'a.b.c': 1, >>> 'x.b.c': 2, >>> 'z.y': 3, >>> 'z.c': 4, >>> }) >>> self.suffix_subdict('b.c') {'a.b.c': 1, 'x.b.c': 2} >>> self.suffix_subdict('c') {'a.b.c': 1, 'x.b.c': 2, 'z.c': 4} Example: >>> from kwutil.util_dotdict import * # NOQA >>> self = DotDict({ >>> 'proc1.param1': 1, >>> 'proc2.param1': 2, >>> 'proc3.param2': 3, >>> 'proc4.part1.param1': 4, >>> 'proc4.part2.param2': 5, >>> }) >>> new = self.suffix_subdict(['param1', 'part2.param2']) >>> print(f'new = {ub.urepr(new, nl=1, sort=1)}') new = { 'proc1.param1': 1, 'proc2.param1': 2, 'proc4.part1.param1': 4, 'proc4.part2.param2': 5, } >>> alt1 = self.suffix_subdict(['param1', 'part2.param2'], backend='loop') >>> # xdoctest: +REQUIRES(module:pygtrie) >>> alt2 = self.suffix_subdict(['param1', 'part2.param2'], backend='trie') >>> assert alt1 == alt2 """ if backend == 'loop': cls = self.__class__ # Normalize to iterable if isinstance(suffix, (list, tuple, set)): exact = set(suffix) dot_suffixes = tuple('.' + s for s in suffix) matches = cls({ k: v for k, v in self.items() if k.endswith(dot_suffixes) or k in exact }) else: dot_suffix = '.' + suffix matches = cls({ k: v for k, v in self.items() if k.endswith(dot_suffix) or k == suffix }) elif backend == 'trie': matches = self._trie_cache.subdict_suffix(suffix) else: raise KeyError(f'Unknown backend={backend}') return matches
[docs] def prefix_subdict(self, prefix, strip=False, backend='loop'): """ Retrieve all key-value pairs whose keys end with a given dot-prefix. Args: prefix (str | List[str]): select a subdictionary with this prefix / prefixes. strip (bool): if True, strip the prefix. If multiple prefixes are given, results may be inconsistent if new keys have duplicates. backend (str): 'trie' or 'loop' Returns: DotDict: subdictionary containing the prefix Example: >>> from kwutil.util_dotdict import * # NOQA >>> self = DotDict(**{ >>> 'foo.bar.baz': 1, >>> 'foo.bar.biz': 1, >>> 'foo.spam': 1, >>> 'eggs.spam': 1, >>> }) >>> self.prefix_subdict('foo', strip=True) {'bar.baz': 1, 'bar.biz': 1, 'spam': 1} >>> self.prefix_subdict('foo', strip=False) {'foo.bar.baz': 1, 'foo.bar.biz': 1, 'foo.spam': 1} Example: >>> from kwutil.util_dotdict import * # NOQA >>> self = DotDict({ >>> 'proc1.param1': 1, >>> 'proc1.param2': 2, >>> 'proc2.param1': 3, >>> 'proc3.param2': 4, >>> 'proc4.part1.param1': 5, >>> 'proc4.part2.param2': 6, >>> }) >>> new = self.prefix_subdict(['proc1', 'proc4.part1']) >>> print(f'new = {ub.urepr(new, nl=1, sort=1)}') new = { 'proc1.param1': 1, 'proc1.param2': 2, 'proc4.part1.param1': 5, } >>> alt1 = self.prefix_subdict(['proc1', 'proc4.part1'], backend='loop') >>> # xdoctest: +REQUIRES(module:pygtrie) >>> alt2 = self.prefix_subdict(['proc1', 'proc4.part1'], backend='trie') >>> assert alt1 == alt2 """ if backend == 'loop': cls = self.__class__ if isinstance(prefix, (list, tuple, set)): prefixes = list(prefix) exact = set(prefixes) dot_prefixes = [(p + '.', len(p) + 1) for p in prefixes] # Sort by length DESC so we strip the most specific (longest) prefix dot_prefixes.sort(key=lambda t: t[1], reverse=True) out_items = [] for k, v in self.items(): if k in exact: if strip: out_items.append(('', v)) # mirrors previous behavior else: out_items.append((k, v)) continue # Check tuple-optimized startswith, but we need the length that matched # so we scan our small list ordered by length. for dp, n in dot_prefixes: if k.startswith(dp): out_items.append((k[n:], v) if strip else (k, v)) break # else: no match; skip matches = cls(out_items) else: prefix_ = prefix + '.' n = len(prefix_) if strip: matches = cls({ (k[n:] if k.startswith(prefix_) else k): v for k, v in self.items() if k.startswith(prefix_) or k == prefix }) else: matches = cls({ k: v for k, v in self.items() if k.startswith(prefix_) or k == prefix }) elif backend == 'trie': matches = self._trie_cache.subdict_prefix(prefix) else: raise KeyError(f'Unknown backend={backend}') return matches
[docs] def insert_prefix(self, prefix, index=0): """ Adds a prefix to all items Args: prefix (str): prefix to insert index (int): the depth to insert the new param Example: >>> from kwutil.util_dotdict import * # NOQA >>> self = DotDict({ >>> 'proc1.param1': 1, >>> 'proc2.param1': 3, >>> 'proc4.part2.param2': 10, >>> }) >>> new = self.insert_prefix('foo', index=0) >>> print('new = {}'.format(ub.urepr(new, nl=1))) new = { 'foo.proc1.param1': 1, 'foo.proc2.param1': 3, 'foo.proc4.part2.param2': 10, } """ if index == 0: # Faster special case new = self.__class__((f'{prefix}.{k}', v) for k, v in self.items()) else: def _generate_new_items(): sep = '.' for k, v in self.items(): path = k.split(sep) path.insert(index, prefix) k2 = sep.join(path) yield k2, v new = self.__class__(_generate_new_items()) return new
[docs] def query_keys(self, col): """ Finds columns where one level has this key Example: >>> from kwutil.util_dotdict import * # NOQA >>> self = DotDict({ >>> 'proc1.param1': 1, >>> 'proc1.param2': 2, >>> 'proc2.param1': 3, >>> 'proc4.part1.param2': 8, >>> 'proc4.part2.param2': 9, >>> 'proc4.part2.param2': 10, >>> }) >>> list(self.query_keys('param1')) Ignore: could use _trie_iteritems trie = self._prefix_trie """ for key in self.keys(): if col in set(key.split('.')): yield key
# def __contains__(self, key): # if super().__contains__(key): # return True # else: # subkeys = [] # subkeys.extend(self._prefix_trie.values(key)) # return bool(subkeys) # def get(self, key, default=ub.NoParam): # if default is ub.NoParam: # return self[key] # else: # try: # return self[key] # except KeyError: # return default # def __getitem__(self, key): # try: # return super().__getitem__(key) # except KeyError: # subkeys = [] # subkeys.extend(self._prefix_trie.values(key)) # return self.__class__([(k, self[k]) for k in subkeys]) class _TrieCache: """ Helper to maintain fast nested lookup caches with prefix trees. Benchmarks seem to indicate that this isn't helpful. """ def __init__(self, parent): self.parent = weakref.proxy(parent) self._prefix_tree = None self._suffix_tree = None def clear(self): self._prefix_tree = None self._suffix_tree = None @property def prefix_tree(self): if self._prefix_tree is None: import pygtrie self._prefix_tree = pygtrie.StringTrie(ub.dzip(self.parent.keys(), self.parent.keys()), separator='.') return self._prefix_tree @property def suffix_tree(self): if self._suffix_tree is None: import pygtrie reversed_keys = { '.'.join(reversed(k.split('.'))): k for k in self.parent.keys() } self._suffix_tree = pygtrie.StringTrie(reversed_keys, separator='.') return self._suffix_tree def prefix_keys(self, prefix): "Return keys that match this prefix" try: return self.prefix_tree.values(prefix) except KeyError: return [] def suffix_keys(self, suffix): "Return keys that match this suffix" reversed_suffix = '.'.join(reversed(suffix.split('.'))) try: return self.suffix_tree.values(reversed_suffix) except KeyError: return [] def subdict_prefix(self, prefix, default=ub.NoParam): if isinstance(prefix, (list, tuple, set)): keys = [] for p in prefix: keys.extend(self.prefix_keys(p)) else: keys = self.prefix_keys(prefix) return self.parent.subdict(keys, default=default) def subdict_suffix(self, suffix, default=ub.NoParam): if isinstance(suffix, (list, tuple, set)): keys = [] for s in suffix: keys.extend(self.suffix_keys(s)) else: keys = self.suffix_keys(suffix) return self.parent.subdict(keys, default=default) def _run_benchmark(): import string import random size = 300 depth = 100 def random_dotkey(depth, chars=string.ascii_lowercase, seglen=1): return '.'.join( ''.join(random.choices(chars, k=seglen)) for _ in range(depth) ) def generate_random_dotdict(size, depth): """ Generate a DotDict with random dot-path keys. Each key is composed of `depth` segments. Args: size (int): number of key-value pairs depth (int): number of dot-separated segments in keys Returns: DotDict """ keys = set() while len(keys) < size: keys.add(random_dotkey(depth)) data = {k: random.random() for k in keys} self = DotDict(data) return self self = generate_random_dotdict(size, depth) from geowatch.utils.util_dotdict import DotDict as DotDictOrig orig = DotDictOrig(self) import timerit ti = timerit.Timerit(1000, bestof=10, verbose=2) for timer in ti.reset('suffix-get-loop'): with timer: result1 = self.suffix_get('a', backend='loop') for timer in ti.reset('suffix-get-trie'): with timer: result2 = self.suffix_get('a', backend='trie') for timer in ti.reset('geowatch-suffix-get-trie'): with timer: result3 = orig.suffix_get('a', backend='trie') assert result2 == result1 assert result3 == result1 import timerit ti = timerit.Timerit(10000, bestof=10, verbose=2) for timer in ti.reset('prefix-get-loop'): with timer: result1 = self.prefix_subdict('a', backend='loop') for timer in ti.reset('prefix-get-trie'): with timer: result2 = self.prefix_subdict('a', backend='trie') for timer in ti.reset('geowatch-prefix-get-trie'): with timer: result3 = orig.prefix_subdict('a')