"""
Helpers to get information about the available hardware. Request info about
things like cpus, gpus, memory, and disk.
Backed by external modules:
* :mod:`cpuinfo`.
* :mod:`psutil`.
* and various linux cli calls via :func:`ubelt.cmd`.
"""
import ubelt as ub
# Used in: ./process_context.py
[docs]
def get_cpu_mem_info():
cpu_info = get_cpu_info()
mem_info = get_mem_info()
system_info = {
'cpu_info': cpu_info,
'mem_info': mem_info,
}
return system_info
[docs]
def get_cpu_info():
import cpuinfo
cpu_info = cpuinfo.get_cpu_info()
return cpu_info
[docs]
def get_mem_info(with_units=False):
"""
Memory info is returned in bytes.
TODO:
- [ ] Should we use pint to give these numbers units?
References:
https://psutil.readthedocs.io/en/latest/#psutil.virtual_memory
Example:
>>> # xdoctest: +REQUIRES(module:psutil)
>>> from kwutil import util_hardware
>>> import ubelt as ub
>>> mem_info = util_hardware.get_mem_info(with_units=1)
>>> print(f'mem_info = {ub.urepr(mem_info, nl=1)}')
total = mem_info['used']
tmp = (
# mem_info['cached'] +
# mem_info['buffers'] +
# mem_info['inactive'] +
mem_info['slab'])
green = total - tmp
print(f'green={green}')
percent = ((green / mem_info['total']) * 100).m
print(f'percent={percent}')
"""
import psutil
svmem_info = psutil.virtual_memory()
mem_info = dict(zip(svmem_info._fields, svmem_info))
if 0:
# Measure memory used by this process and its children
import psutil
import os
this_process = psutil.Process(os.getpid())
this_total_pcnt = 0
this_total_pcnt += this_process.memory_percent()
for child in this_process.children():
this_total_pcnt += child.memory_percent()
if 0:
# Measure memory explicitly used by userland processes
total_vms = 0
total_pcnt = 0
for proc in psutil.process_iter():
total_pcnt += proc.memory_percent()
total_vms += proc.memory_info().vms
if with_units:
from kwutil import util_units
ureg = util_units.unit_registry()
bytes_keys = [
'total', 'available', 'used', 'free', 'active', 'inactive',
'buffers', 'cached', 'shared', 'slab', 'wired',
]
for key in bytes_keys:
if key in mem_info:
mem_info[key] = (mem_info[key] * ureg.bytes).to('gigabytes')
return mem_info
[docs]
def disk_info_of_path(path):
"""
Get disk info wrt where a file lives
WIP - needs more work
CommandLine:
xdoctest -m kwutil.util_hardware disk_info_of_path
Returns:
dict - dictionary of information
Example:
>>> from kwutil.util_hardware import * # NOQA
>>> path = '.'
>>> x = disk_info_of_path(path)
>>> import ubelt as ub
>>> print(ub.urepr(x))
TODO:
- [ ] Handle btrfs
- [ ] Handle whatever AWS uses
- [ ] Use udisksctl or udevadm
Ignore:
lsblk /dev/nvme1n1
lsblk -afs /dev/mapper/vgubuntu-root
lsblk -nasd /dev/mapper/vgubuntu-root
df $HOME --output=source,fstype
df $HOME/data/dvc-repos/smart_watch_dvc --output=source,fstype
df $HOME/data/dvc-repos/smart_watch_dvc-hdd --output=source,fstype
df . --output=source,fstype,itotal,iused,iavail,ipcent,size,used,avail,pcent,file,target
References:
https://askubuntu.com/questions/609708/how-to-find-hard-drive-brand-name-or-model
https://stackoverflow.com/questions/38615464/how-to-get-device-name-on-which-a-file-is-located-from-its-path-in-c
"""
import ubelt as ub
import os
path = ub.Path(path)
path = path.resolve()
# Returns the lvm name: e.g.
# Filesystem Type
# data zfs
# /dev/sde1 ext4
# /dev/md0 ext4
# /dev/mapper/vgubuntu-root ext4
# /dev/nvme1n1 f2fs
info = ub.cmd(f'df {path} --output=source,fstype', check=True)
parts = info['out'].split('\n')[1].rsplit(' ', 1)
source, filesystem = [p.strip() for p in parts]
hwinfo = {
'path': os.fspath(path),
'source': source,
'filesystem': filesystem,
}
if filesystem == 'zfs':
# Use ZFS to get more information
# info = ub.cmd(f'zpool list {source}', verbose=3)
# info = ub.cmd(f'zpool iostat {source}', verbose=3)
# info = ub.cmd(f'zpool list -H {source}', verbose=3)
# info = ub.cmd(f'zpool iostat -H {source}', verbose=3)
try:
zfs_status = _zfs_status(source)
hwinfo['hwtype'] = zfs_status['coarse_type']
except Exception as ex:
print('error in zfs stuff: ex = {}'.format(ub.urepr(ex, nl=1)))
elif filesystem == 'overlay':
# This is the case on AWS. lsblk isnt able to provide us with more info
# I'm not sure how to determine more info.
# References:
# https://docs.kernel.org/filesystems/overlayfs.html
...
else:
try:
if _device_is_hdd(source):
hwinfo['hwtype'] = 'hdd'
else:
hwinfo['hwtype'] = 'ssd'
except Exception as ex:
print('warning: unable to infer disk info: ex = {}'.format(ub.urepr(ex, nl=1)))
try:
import json
info = ub.cmd(f'lsblk -as {source} --json')
lsblk_info = json.loads(info['out'])
walker = ub.IndexableWalker(lsblk_info)
names = []
for path, item in walker:
if isinstance(item, dict):
if 'name' in item:
names.append(item['name'])
hwinfo['names'] = names
except Exception:
pass
# print(ub.Path('/proc/partitions').read_text())
return hwinfo
[docs]
def _device_is_hdd(path):
import ubelt as ub
import json
info = ub.cmd(f'lsblk -as {path} --json -o name,rota')
info.check_returncode()
lsblk_info = json.loads(info['out'])
walker = ub.IndexableWalker(lsblk_info)
rotas = []
for path, item in walker:
if isinstance(item, dict):
if 'rota' in item:
rotas.append(item['rota'])
return any(rotas)
[docs]
def _zfs_status(pool, verbose=0):
"""
Semi-parsable zfs status output. This is a proof-of-concept and needs some
work to handle the nested pool structure.
"""
import ubelt as ub
import re
info = ub.cmd(f'zpool status {pool} -P', verbose=verbose)
info.check_returncode()
splitter = re.compile(r'\s+')
config = ub.ddict(list)
context = None
state = None
header = None
# stack = [] todo
for line in info.stdout.split('\n'):
indentation = line[:len(line) - len(line.lstrip())]
if not line.strip():
continue
if state is None:
if line.strip() == 'config:':
state = 'CONFIG'
elif state == 'CONFIG':
parts = splitter.split(line.strip())
if parts[0] == 'NAME':
state = 'NAME'
header = parts
elif state == 'NAME':
if len(indentation) == 1:
parts = splitter.split(line.strip())
row = ub.dzip(header, parts)
name = parts[0]
row['children'] = []
context = config[name] = row
elif len(indentation) > 1:
parts = splitter.split(line.strip())
row = ub.dzip(header, parts)
context['children'].append(row)
else:
state = None
# hack to get the data we currently need
dev_paths = []
for row in config[pool]['children']:
if row['NAME'].startswith('/dev'):
dev_paths.append(row['NAME'])
is_part_hdd = []
for path in dev_paths:
flag = _device_is_hdd(path)
is_part_hdd.append(flag)
if all(is_part_hdd):
coarse_type = 'hdd'
elif not any(is_part_hdd):
coarse_type = 'ssd'
else:
coarse_type = 'mixed'
output = {
'coarse_type': coarse_type,
'poc_config': config,
}
# print('records = {}'.format(ub.urepr(config, nl=True)))
return output
[docs]
def _torch_device_info(device):
import torch
try:
device_info = {
'device_index': device.index,
'device_type': device.type,
}
try:
device_props = torch.cuda.get_device_properties(device)
capabilities = (device_props.multi_processor_count, device_props.minor)
device_info.update({
'device_name': device_props.name,
'total_vram': device_props.total_memory,
'reserved_vram': torch.cuda.memory_reserved(device),
'allocated_vram': torch.cuda.memory_allocated(device),
'device_capabilities': capabilities,
'device_multi_processor_count': device_props.multi_processor_count,
})
except Exception:
pass
except Exception as ex:
print('Error adding device info: ex = {!r}'.format(ex))
device_info = str(ex)
return device_info
# The following classes are experimental and we are avoiding any signature
# definition that would limit the use of these names in the future in order to
# iterate towards a good design. The initial constructor will be called
# _discover_v1
[docs]
class _HardwareComponent(ub.NiceRepr):
def __nice__(self):
import kwutil
return kwutil.Yaml.dumps(self.summary())
[docs]
def summary(self):
"""
Concise summary of most important information.
"""
raise NotImplementedError('abstract method')
[docs]
class CPUs(_HardwareComponent):
"""
Storage container for info about system CPUs.
"""
def __init__(self):
self._cpu_info = None
[docs]
def summary(self):
return {
'brand': self._cpu_info['brand_raw'],
'num_cores': self._cpu_info['count'],
}
[docs]
@classmethod
def _discover_v1(cls):
self = cls()
self._cpu_info = get_cpu_info()
return self
[docs]
class Memory(_HardwareComponent):
"""
Storage container for info about system RAM.
"""
def __init__(self):
self._memory_info = None
[docs]
def summary(self):
import kwutil
total_gb = (self._memory_info['total'] * kwutil.util_units.ureg.bytes).to('gibibyte')
return {
'total_gb': total_gb.m,
}
[docs]
@classmethod
def _discover_v1(Memory):
self = Memory()
self._memory_info = get_mem_info()
return self
[docs]
class GPUs(_HardwareComponent):
"""
Storage container for info about system GPUS.
"""
def __init__(self):
self._num_gpus = None
self._device_infos = None
[docs]
def summary(self):
return {
'count': self._num_gpus,
'models': self._gpu_model_freq,
}
[docs]
@classmethod
def _discover_v1(cls):
self = cls()
import torch
_num_gpus = torch.cuda.device_count()
device_infos = []
for idx in range(_num_gpus):
device = torch.device(idx)
info = _torch_device_info(device)
device_infos.append(info)
gpu_model_freq = ub.dict_hist([d['device_name'] for d in device_infos])
self._gpu_model_freq = gpu_model_freq
self._num_gpus = _num_gpus
self._device_infos = device_infos
return self
[docs]
class Motherboard(_HardwareComponent):
"""
Storage container for info about system motherboard.
References:
https://askubuntu.com/questions/179958/how-do-i-find-out-my-motherboard-model
"""
def __init__(self):
self._dmi_info = None
[docs]
def summary(self):
return {
'name': self._dmi_info['board_name'],
}
[docs]
@classmethod
def _discover_v1(cls):
self = cls()
# TODO: dmidecode variant, but requires sudo
self._dmi_info = cls._discover_dmi_info_linux_non_admin()
return self
[docs]
def _discover_dmi_info_linux_non_admin():
# Try to discover infow without admin
import ubelt as ub
dpath = ub.Path('/sys/devices/virtual/dmi/id/')
dmi_info = {}
for board_info_fpath in list(dpath.glob('board_*')):
try:
dmi_info[board_info_fpath.name] = board_info_fpath.read_text()
except PermissionError as ex:
dmi_info[board_info_fpath.name] = ex
print(f'info = {ub.urepr(dmi_info, nl=1)}')
return dmi_info
[docs]
class Disks(_HardwareComponent):
"""
Storage container for info about system storage disks.
TODO:
Find all mounted filesystems that correspond with physical disks
via ``cat /proc/mounts`` or ``mount -l``
References:
https://unix.stackexchange.com/questions/24182/how-to-get-the-complete-and-exact-list-of-mounted-filesystems-in-linux
"""
def __init__(self):
self._hardware_diskinfo = None
[docs]
def summary(self):
return {
'num_disks': self._total_disks,
}
[docs]
@classmethod
def _discover_v1(Disks):
import hardware.diskinfo
self = Disks()
_tupinfo = hardware.diskinfo.detect()
self._hardware_diskinfo = _tup_to_nested(_tupinfo)['disk']
self._total_disks = self._hardware_diskinfo['logical']['count']
return self
[docs]
class Networking(_HardwareComponent):
"""
Storage container for info about system networking devices.
"""
def __init__(self):
self._hardware_network_info = None
[docs]
def summary(self):
return {
'num_network_devices': self._num_network_devices,
}
[docs]
@classmethod
def _discover_v1(Networking):
self = Networking()
nested_info = _hardware_nested_system_info()
self._hardware_network_info = nested_info['network']
self._num_network_devices = len(self._hardware_network_info)
return self
[docs]
class Peripherals(_HardwareComponent):
"""
Storage container for info about system peripherals.
"""
def __init__(self):
raise NotImplementedError()
[docs]
@classmethod
def _discover_v1(Peripherals):
self = Peripherals()
return self
[docs]
def _hardware_nested_system_info():
from hardware import system
_hardware_sysinfo = system.detect()
nested_info = _tup_to_nested(_hardware_sysinfo)
return nested_info
[docs]
def _tup_to_nested(tuples):
nested_info = {}
for tup in tuples:
*path, value = tup
curr = nested_info
for p in path[:-1]:
if p not in curr:
curr[p] = {}
curr = curr[p]
final = path[-1]
curr[final] = value
return nested_info
[docs]
class Hardware:
"""
TODO: class level namespace
References:
https://pypi.org/project/hardware/
Example:
>>> # xdoctest: +SKIP
>>> import kwutil
>>> kwutil.Hardware.report()
"""
[docs]
@staticmethod
def report():
"""
Build a high level hardware report
"""
components = {}
components['cpus'] = Hardware.cpus()
components['memory'] = Hardware.memory()
components['gpus'] = Hardware.gpus()
components['disks'] = Hardware.disks()
components['motherboard'] = Hardware.motherboard()
components['networking'] = Hardware.networking()
print(f'components = {ub.urepr(components, nl=2)}')
[docs]
@staticmethod
def cpus():
return CPUs._discover_v1()
[docs]
@staticmethod
def memory():
return Memory._discover_v1()
[docs]
@staticmethod
def gpus():
return GPUs._discover_v1()
[docs]
@staticmethod
def disks():
return Disks._discover_v1()
[docs]
@staticmethod
def motherboard():
return Motherboard._discover_v1()
[docs]
@staticmethod
def networking():
return Networking._discover_v1()
[docs]
@staticmethod
def peripherals():
return Peripherals._discover_v1()