"""
Defines the :class:`ProcessContext` object, which is what mlops expects jobs to
be wrapped in.
TODO:
- [ ] Make "most" telemetry opt-in
"""
import sys
import os
import ubelt as ub
from kwutil import util_environ
PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY = util_environ.envflag('PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY', default=False)
PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY = util_environ.envflag('PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY', default=False)
[docs]
class ProcessContext:
"""
Context manager to track the context under which a result was computed.
This tracks things like start / end time. The command line that can
reproduce the process (assuming an appropriate environment. The
configuration the process was run with. The machine details the process was
run on. The power usage / carbon emissions the process used, and other
information.
Args:
args (str | List[str]):
This should be the sys.argv or the command line string that can be
used to rerun the process
config (Dict):
This should be a configuration dictionary (likely based on
sys.argv)
name (str): the name of this process
type (str): The type of this process
(usually keep the default of process)
request_all_telemetry (bool):
if False, telemetry is disabled. This is forced to False if
PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY is in the environment.
request_most_telemetry (bool):
if False, telemetry is disabled. This is forced to False if
PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY is in the environment.
Note:
This module provides telemetry, which records user-identifiable
information. While useful, it does raise ethical concerns about user
privacy, and the people running this code have a right to know about it
and opt out. Notably, this module simply records the information, but
does not send it anywhere. As such, a default opt-in is reasonable, but
any future work that sends this information anywhere must be opt-out by
default.
Note:
There are two levels of telemetry.
Environment telemetry. These are things like the machine the code was
run on. Use PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY=0 to opt-out.
The start / stop / sys.argv / config objects are necessary for mlops to
do anything. But these can leak information by containing system paths.
Emissions is also in this category. Use
PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY to opt out.
CommandLine:
kernprof -lvp kwutil -m xdoctest -m kwutil.process_context ProcessContext:0
Example:
>>> # xdoctest: +REQUIRES(module:psutil)
>>> from kwutil.process_context import *
>>> import rich
>>> # Adding things like disk info an tracking emission usage
>>> self = ProcessContext(track_emissions='offline')
>>> obj1 = self.start().stop()
>>> self.add_disk_info('.')
>>> #
>>> # Telemetry can be mostly disabled
>>> self = ProcessContext(track_emissions='offline', request_most_telemetry=False)
>>> obj2 = self.start().stop()
>>> self.add_disk_info('.')
>>> # Telemetry can be completely disabled
>>> self = ProcessContext(track_emissions='offline', request_all_telemetry=False)
>>> obj3 = self.start().stop()
>>> self.add_disk_info('.')
>>> rich.print('full_telemetry = {}'.format(ub.urepr(obj1, nl=3)))
>>> rich.print('some_telemetry = {}'.format(ub.urepr(obj2, nl=3)))
>>> rich.print('no_telemetry = {}'.format(ub.urepr(obj3, nl=3)))
Example:
>>> # xdoctest: +REQUIRES(module:psutil)
>>> # xdoctest: +REQUIRES(module:codecarbon)
>>> from kwutil.process_context import *
>>> # flush can measure intermediate progress
>>> self = ProcessContext(track_emissions='offline')
>>> self.add_disk_info('.')
>>> obj1 = self.start().flush()
>>> obj1_orig = obj1.copy()
>>> obj2 = self.stop()
"""
def __init__(self,
name=None,
type='process',
args=None,
config=None,
extra=None,
track_emissions=False,
request_all_telemetry=True,
request_most_telemetry=True,
output_dpath=None,
output_fpath=None,
):
import uuid
if args is None:
args = sys.argv
else:
import warnings
warnings.warn(ub.paragraph(
'''
It is better to leave args unspecified so sys.argv is captured.
Be sure to specify ``config`` as the resolved config.
In the future we may add an extra param for unresolved configs.
'''))
self.properties = {
"name": name,
"args": args,
"config": config,
"machine": None,
"start_timestamp": None,
"stop_timestamp": None,
"duration": None,
"uuid": str(uuid.uuid4()),
"extra": extra,
}
self.obj = {
"type": type,
"properties": self.properties
}
self.track_emissions = track_emissions
self.emissions_tracker = None
self._emission_backend = 'auto'
self._started = False
self._running = False
if PROCESS_CONTEXT_DISABLE_ALL_TELEMETRY:
request_all_telemetry = 0
else:
self.enable_all_telemetry = request_all_telemetry
if PROCESS_CONTEXT_DISABLE_MOST_TELEMETRY:
request_most_telemetry = 0
else:
self.enable_most_telemetry = request_most_telemetry
if not self.enable_all_telemetry:
self.enable_most_telemetry = 0
self.properties.pop('config')
self.properties.pop('args')
self.output_dpath = output_dpath
self.output_fpath = output_fpath
[docs]
def _infer_static_properties(self, func):
props = self.properties
if props['name'] is None:
try:
modname = func.__module__
except AttributeError:
try:
modname = func.__class__.__module__
except AttributeError:
modname = None
if modname is not None:
try:
namespace = ub.modpath_to_modname(sys.modules[modname].__file__)
except AttributeError:
namespace = sys.modules[modname].__name__
if modname == '__main__':
namespace = f'{namespace}.__main__'
props['name'] = f'{namespace}.{func.__name__}'
else:
props['name'] = func.__name__
if self.output_dpath is None:
self.output_dpath = ub.Path('.')
if self.output_fpath is None:
self.output_fpath = ub.Path(self.output_dpath) / '{name}-{uuid}.context'.format(**props)
[docs]
def _infer_dynamic_properties(self, func, args, kwargs):
import kwutil
props = self.properties
jsonified = kwutil.Json.ensure_serializable(kwargs)
props['config'] = jsonified
self.add_disk_info('.')
try:
import torch
device = torch.device(0) if torch.cuda.is_available() else torch.device('cpu')
self.add_device_info(device)
except Exception:
...
@property
def is_running(self):
"""
Has the context object started and not yet been stopped?
"""
return self._running
@property
def is_started(self):
"""
Has the context object ever started? This can still return True if it
has stopped.
"""
return self._started
[docs]
def dump(self):
import json
text = json.dumps(self.obj)
self.output_fpath.parent.ensuredir()
self.output_fpath.write_text(text)
[docs]
def __call__(self, func):
"""
Experimental use as a decorator.
CommandLine:
kernprof -lvp -p kwutil -m xdoctest -m kwutil.process_context ProcessContext.__call__
Example:
>>> # xdoctest: +REQUIRES(module:psutil)
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('kwutil/test/process-context')
>>> #
>>> import kwutil
>>> self = kwutil.ProcessContext(output_dpath=dpath)
>>> def func():
>>> ...
>>> _wrapper = self(func)
>>> _wrapper.context
>>> _wrapper()
Example:
>>> # xdoctest: +REQUIRES(module:psutil)
>>> import kwutil
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('kwutil/test/process-context')
>>> @kwutil.ProcessContext(output_dpath=dpath)
>>> def myfunc():
>>> ...
>>> myfunc()
>>> print(f'myfunc.context.obj = {ub.urepr(myfunc.context.obj, nl=3)}')
"""
import functools
self._infer_static_properties(func)
@functools.wraps(func)
def _wrapper(*args, **kwargs):
self._infer_dynamic_properties(func, args, kwargs)
self.start()
try:
result = func(*args, **kwargs)
finally:
self.stop()
try:
self.dump()
except Exception as ex:
print('Failed to write context')
print(f'ex={ex}')
return result
_wrapper.context = self
return _wrapper
[docs]
def write_invocation(self, invocation_fpath):
"""
Write a helper file that contains a locally reproducible invocation of
this process.
"""
import shlex
command = ' '.join(list(map(shlex.quote, self.properties['args'])))
invocation_fpath = ub.Path(invocation_fpath)
invocation_fpath.write_text(ub.codeblock(
f'''
#!/bin/bash
{command}
'''))
[docs]
def _timestamp(self):
timestamp = ub.timestamp()
return timestamp
[docs]
def _hostinfo(self):
import socket
if not self.enable_most_telemetry:
return {}
return {
"host": socket.gethostname(),
"user": ub.Path.home().name,
'cwd': os.fspath(ub.Path.cwd()),
"userhome": os.fspath(ub.Path.home()),
}
[docs]
def _osinfo(self):
import platform
if not self.enable_most_telemetry:
return {}
(
uname_system,
_,
uname_release,
uname_version,
_,
uname_processor,
) = platform.uname()
return {
"os_name": uname_system,
"os_release": uname_release,
"os_version": uname_version,
"arch": uname_processor,
}
[docs]
def _pyinfo(self):
import platform
if not self.enable_most_telemetry:
return {}
return {
"py_impl": platform.python_implementation(),
"py_version": sys.version.replace("\n", ""),
}
[docs]
def _meminfo(self):
if not self.enable_most_telemetry:
return {}
import psutil
# TODO: could collect memory info at start and stop and intermediate
# stages. Here we just want info that is static wrt to the machine.
# For now, just get the total available.
svmem_info = psutil.virtual_memory()
return {
"mem_total": svmem_info.total,
}
[docs]
def _cpuinfo(self):
if not self.enable_most_telemetry:
return {}
try:
# Calling `get_cpu_info` is very slow because it starts a new
# python process, so to avoid some overhead we cache the result
# It would be nice if Cacher had the expires property.
import ubelt as ub
cacher = ub.Cacher('cpuinfo_cache', appname='kwutil/cache',
verbose=0)
_cpu_info = cacher.tryload()
if _cpu_info is None:
import cpuinfo
_cpu_info = cpuinfo.get_cpu_info()
cacher.save(_cpu_info)
cpu_info = {
"cpu_brand": _cpu_info["brand_raw"],
"cpu_count": _cpu_info["count"],
}
except ImportError as ex:
cpu_info = {
'error': f'ImportError: {ex}',
"cpu_brand": None,
"cpu_count": None,
}
return cpu_info
[docs]
def _gpuinfo(self):
try:
import torch
num_devices = torch.cuda.device_count()
device_infos = []
for device_num in num_devices:
device = torch.device(device_num)
info = self._device_info(device)
device_infos.append(info)
# except ImportError as ex:
except ImportError:
device_infos = []
# f'ImportError: {ex}'
return device_infos
[docs]
def _machine(self):
if not self.enable_most_telemetry:
return {'telemetry_disabled': True}
return ub.dict_union(
self._hostinfo(),
self._meminfo(),
self._cpuinfo(),
self._osinfo(),
self._pyinfo(),
)
[docs]
def start(self):
self._started = True
if not self.enable_all_telemetry:
return self
self._running = True
self.properties.update({
"machine": self._machine(),
"start_timestamp": self._timestamp(),
"stop_timestamp": None,
})
if self.track_emissions:
self._start_emissions_tracker()
return self
[docs]
def flush(self):
if not self._started:
raise Exception("Must start before you flush")
if self.enable_all_telemetry:
self.properties["stop_timestamp"] = self._timestamp()
start_time = ub.timeparse(self.properties["start_timestamp"])
stop_time = ub.timeparse(self.properties["stop_timestamp"])
self.properties["duration"] = str(stop_time - start_time)
if self.emissions_tracker is not None:
try:
self._flush_emissions_tracker()
except Exception as ex:
print(f'warning: issue with emissions ex={ex}')
return self.obj
[docs]
def stop(self):
if not self._started:
raise Exception("Must start before you stop")
if self.enable_all_telemetry:
self.properties["stop_timestamp"] = self._timestamp()
start_time = ub.timeparse(self.properties["start_timestamp"])
stop_time = ub.timeparse(self.properties["stop_timestamp"])
self.properties["duration"] = str(stop_time - start_time)
if self.emissions_tracker is not None:
try:
self._stop_emissions_tracker()
except Exception as ex:
print(f'warning: issue with emissions ex={ex}')
self._running = False
return self.obj
def __enter__(self):
return self.start()
def __exit__(self, a, b, c):
self.stop()
[docs]
def _start_emissions_tracker(self):
if not self.enable_all_telemetry:
return
emissions_tracker = None
if isinstance(self.track_emissions, str):
backend = self.track_emissions
elif self.track_emissions:
backend = 'auto'
if backend == 'auto':
backend = 'online'
if backend == 'online':
try:
from codecarbon import EmissionsTracker
"""
# emissions_tracker = EmissionsTracker(log_level='info')
"""
try:
emissions_tracker = EmissionsTracker(log_level='error', allow_multiple_runs=True)
except Exception:
emissions_tracker = EmissionsTracker(log_level='error')
emissions_tracker.start()
except Exception as ex:
print('ex = {}'.format(ub.urepr(ex, nl=1)))
print('Online emissions tracker is not available. Trying offline')
if self._emission_backend == 'auto':
backend = 'offline'
if backend == 'offline':
try:
# TODO: allow configuration
from codecarbon import OfflineEmissionsTracker
try:
emissions_tracker = OfflineEmissionsTracker(
country_iso_code='USA',
log_level='error',
# region='Virginia',
# cloud_provider='aws',
# cloud_region='us-east-1',
# country_2letter_iso_code='us'
allow_multiple_runs=True
)
except Exception:
emissions_tracker = OfflineEmissionsTracker(
country_iso_code='USA',
log_level='error',
# region='Virginia',
# cloud_provider='aws',
# cloud_region='us-east-1',
# country_2letter_iso_code='us'
)
emissions_tracker.start()
except Exception as ex:
print('Non-Critical Warning: Unable to track carbon emissions ex = {!r}'.format(ex))
self.emissions_tracker = emissions_tracker
[docs]
def _flush_emissions_tracker(self):
if self.emissions_tracker is None:
self.properties['emissions'] = None
return
self.emissions_tracker._measure_power_and_energy()
summary = emissions_data = self.emissions_tracker._prepare_emissions_data()
self.emissions_tracker._persist_data(emissions_data)
co2_kg = summary.emissions
total_kWH = summary.energy_consumed
# summary.cloud_provider
# summary.cloud_region
# summary.duration
# summary.emissions_rate
# summary.cpu_power
# summary.gpu_power
# summary.ram_power
# summary.cpu_energy
# summary.gpu_energy
# summary.ram_energy
emissions = {
'co2_kg': co2_kg,
'total_kWH': total_kWH,
'run_id': str(self.emissions_tracker.run_id),
}
try:
import pint
except Exception as ex:
print('Error stopping emissions tracker: ex = {!r}'.format(ex))
else:
reg = pint.UnitRegistry()
if co2_kg is None:
co2_kg = float('nan')
co2_ton = (co2_kg * reg.kg).to(reg.metric_ton)
dollar_per_ton = 15 / reg.metric_ton # cotap rate
emissions['co2_ton'] = co2_ton.m
emissions['est_dollar_to_offset'] = (co2_ton * dollar_per_ton).m
self.properties['emissions'] = emissions
[docs]
def _stop_emissions_tracker(self):
if self.emissions_tracker is None:
self.properties['emissions'] = None
return
self.emissions_tracker.stop()
summary = self.emissions_tracker.final_emissions_data
co2_kg = summary.emissions
total_kWH = summary.energy_consumed
# summary.cloud_provider
# summary.cloud_region
# summary.duration
# summary.emissions_rate
# summary.cpu_power
# summary.gpu_power
# summary.ram_power
# summary.cpu_energy
# summary.gpu_energy
# summary.ram_energy
emissions = {
'co2_kg': co2_kg,
'total_kWH': total_kWH,
'run_id': str(self.emissions_tracker.run_id),
}
try:
import pint
except Exception as ex:
print('Error stopping emissions tracker: ex = {!r}'.format(ex))
else:
reg = pint.UnitRegistry()
if co2_kg is None:
co2_kg = float('nan')
co2_ton = (co2_kg * reg.kg).to(reg.metric_ton)
dollar_per_ton = 15 / reg.metric_ton # cotap rate
emissions['co2_ton'] = co2_ton.m
emissions['est_dollar_to_offset'] = (co2_ton * dollar_per_ton).m
self.properties['emissions'] = emissions
[docs]
def _device_info(self, device):
import torch
try:
device_info = {
'device_index': device.index,
'device_type': device.type,
}
try:
device_props = torch.cuda.get_device_properties(device)
capabilities = (device_props.multi_processor_count, device_props.minor)
device_info.update({
'device_name': device_props.name,
'total_vram': device_props.total_memory,
'reserved_vram': torch.cuda.memory_reserved(device),
'allocated_vram': torch.cuda.memory_allocated(device),
'device_capabilities': capabilities,
'device_multi_processor_count': device_props.multi_processor_count,
})
except Exception:
pass
except Exception as ex:
print('Error adding device info: ex = {!r}'.format(ex))
device_info = str(ex)
return device_info
[docs]
def add_device_info(self, device):
"""
Add information about a torch device that was used in this process.
Does nothing if telemetry is disabled.
Args:
device (torch.device): torch device to add info about
Example:
>>> # xdoctest: +REQUIRES(module:torch)
>>> from kwutil.process_context import *
>>> import torch
>>> import rich
>>> device = torch.device(0) if torch.cuda.is_available() else torch.device('cpu')
>>> # Adding things like disk info an tracking emission usage
>>> self = ProcessContext(track_emissions='offline')
>>> obj1 = self.start().stop()
>>> self.add_disk_info('.')
>>> self.add_device_info(device)
>>> #
>>> # Telemetry can be mostly disabled
>>> self = ProcessContext(track_emissions='offline', request_most_telemetry=False)
>>> obj2 = self.start().stop()
>>> self.add_disk_info('.')
>>> self.add_device_info(device)
>>> # Telemetry can be completely disabled
>>> self = ProcessContext(track_emissions='offline', request_all_telemetry=False)
>>> obj3 = self.start().stop()
>>> self.add_disk_info('.')
>>> self.add_device_info(device)
>>> rich.print('full_telemetry = {}'.format(ub.urepr(obj1, nl=3)))
>>> rich.print('some_telemetry = {}'.format(ub.urepr(obj2, nl=3)))
>>> rich.print('no_telemetry = {}'.format(ub.urepr(obj3, nl=3)))
"""
if not self.enable_most_telemetry:
return
self.properties['device_info'] = self._device_info(device)
[docs]
def add_disk_info(self, path):
"""
Add information about a storage disk that was used in this process
Does nothing if telemetry is disabled.
"""
if not self.enable_most_telemetry:
return
try:
from kwutil import util_hardware
# Get information about disk used in this process
disk_info = util_hardware.disk_info_of_path(path)
except Exception as ex:
print('ex = {!r}'.format(ex))
print('ex = {!r}'.format(ex))
disk_info = str(ex)
self.properties['disk_info'] = disk_info
# def _test_offline():
# """
# xdoctest -m kwutil.process_context ProcessContext
# """
# from codecarbon import OfflineEmissionsTracker
# emissions_tracker = OfflineEmissionsTracker(
# country_iso_code='USA',
# # region='Virginia',
# region='virginia',
# cloud_provider='aws',
# cloud_region='us-east-1',
# log_level='info',
# # country_2letter_iso_code='us'
# )
# emissions_tracker.start()
# emissions_tracker.stop()
# from codecarbon import EmissionsTracker
# emissions_tracker = EmissionsTracker(log_level='debug')
# emissions_tracker.start()
# emissions_tracker.stop()
# from codecarbon.external.geography import CloudMetadata, GeoMetadata
# geo = GeoMetadata(
# country_iso_code="USA", country_name="United States", region="Illinois"
# )
# self = ProcessContext(track_emissions=True)
# self.start()
# self.stop()
# _ = self.emissions_tracker._data_source.get_country_emissions_data('usa')
[docs]
def jsonify_config(config):
"""
Converts an object to a jsonifiable config as best as possible
"""
from kwutil.util_json import Json
if hasattr(config, 'asdict'):
config = config.asdict()
jsonified_config = Json.ensure_serializable(config)
walker = ub.IndexableWalker(jsonified_config)
for problem in Json.find_unserializable(jsonified_config):
bad_data = problem['data']
walker[problem['loc']] = str(bad_data)
return jsonified_config
[docs]
class Reconstruction:
# TODO
...
[docs]
def main():
"""
Simple CLI to get hardware measurements that process context would provide.
"""
# Adding things like disk info an tracking emission usage
self = ProcessContext(track_emissions=False)
obj = self.start().stop()
self.add_disk_info('.')
try:
import torch
if torch.cuda.is_available():
device = torch.device(0)
self.add_device_info(device)
except ImportError:
...
self.stop()
print('obj = {}'.format(ub.urepr(obj, nl=3)))
# def _codecarbon_mwe():
# from codecarbon import OfflineEmissionsTracker
# self = OfflineEmissionsTracker(
# country_iso_code='USA',
# # cloud_provider='gcp',
# # region='us-east-1',
# # country_2letter_iso_code='us'
# )
# self.start()
# self.flush()
# emissions_data = self._prepare_emissions_data()
# cloud = self._get_cloud_metadata()
# df = self._data_source.get_cloud_emissions_data()
if __name__ == '__main__':
"""
CommandLine:
python -m kwutil.process_context
"""
main()