"""
Filesystem Operation Managers
This module defines:
* :class:`CopyManager`
* :class:`MoveManager`
* :class:`DeleteManager`
which can perform their respective operations using multiple threads / workers.
TODO:
- [ ] In CopyManager and MoveManager add option to check that no files are
written to the same logical location (ignore symlink physical
location problems, which would require a more expensive check).
"""
import ubelt as ub
[docs]
class _FilesystemOperationManager:
"""
Abstract class for shared components of Copy / Move / Delete manager.
Each of these managerw will have a queue of unsubmitted jobs that haven't
started to be worked on. They will also have a pool where they can submit
jobs. Lastly, they will all have an eager or non-eager mode where in eager
mode they will start a job as soon as they get it (i.e. there will be no
unsubmitted jobs), but in non-eager mode, the develoepr will have to
explicitly call run.
"""
_worker_func = NotImplemented
_operation_name = NotImplemented
def __init__(self, workers=0, mode='thread', eager=False):
self._pool = ub.JobPool(mode=mode, max_workers=workers)
self.eager = eager
self._unsubmitted = []
def __enter__(self):
self._pool.__enter__()
return self
def __len__(self) -> int:
return len(self._unsubmitted) + len(self._pool)
def __exit__(self, ex_type, ex_value, ex_traceback):
"""
Args:
ex_type (Type[BaseException] | None):
ex_value (BaseException | None):
ex_traceback (TracebackType | None):
Returns:
bool | None
"""
return self._pool.__exit__(ex_type, ex_value, ex_traceback)
# def submit(self):
# raise NotImplementedError
[docs]
def run(self, desc=None, verbose=1, pman=None):
"""
Args:
desc (str | None): description for progress bars
verbsoe (int): verbosity level
"""
from kwutil import util_progress
_our_pman = None
if pman is None:
pman = _our_pman = util_progress.ProgressManager(
backend='progiter')
_our_pman.__enter__()
if verbose:
print(f'Run {self.__class__._operation_name}')
_worker_func = self.__class__._worker_func
try:
for task in self._unsubmitted:
self._pool.submit(_worker_func, **task)
self._unsubmitted.clear()
job_iter = self._pool.as_completed()
desc = desc or self.__class__._operation_name
prog = pman.progiter(
job_iter, desc=desc, total=len(self._pool), verbose=verbose)
for job in prog:
job.result()
except Exception as ex:
if _our_pman is not None:
_our_pman.__exit__(ex, type(ex), None)
raise
finally:
if _our_pman is not None:
_our_pman.__exit__(None, None, None)
[docs]
def _copy_worker(src, dst, skip_existing, overwrite, follow_file_symlinks, follow_dir_symlinks, meta):
"""
Args:
str (PathLike | str):
dst (PathLike | str):
overwrite (bool):
skip_existing (bool):
"""
src = ub.Path(src)
dst = ub.Path(dst)
if not skip_existing or not dst.exists():
dst.parent.ensuredir()
src.copy(dst, overwrite=overwrite,
follow_file_symlinks=follow_file_symlinks,
follow_dir_symlinks=follow_dir_symlinks, meta=meta)
[docs]
def _move_worker(src, dst, follow_file_symlinks, follow_dir_symlinks, meta):
"""
Args:
str (PathLike | str):
dst (PathLike | str):
"""
src = ub.Path(src)
dst = ub.Path(dst)
dst.parent.ensuredir()
src.move(dst, follow_file_symlinks=follow_file_symlinks,
follow_dir_symlinks=follow_dir_symlinks, meta=meta)
[docs]
class DeleteManager(_FilesystemOperationManager):
"""
Helper to execute multiple delete operations on a local filesystem.
Note:
The topic of deleting a lot of files quickly is interesting.
See: https://unix.stackexchange.com/questions/37329/efficiently-delete-large-directory-containing-thousands-of-files
Example:
>>> import ubelt as ub
>>> from kwutil.fsops_managers import DeleteManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'delete_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>> fpath.touch()
>>> deleteman = DeleteManager(workers=0, eager=False)
>>> for fpath in src_fpaths:
>>> deleteman.submit(fpath)
>>> assert len(src_dpath.ls()) == 10
>>> deleteman.run()
>>> assert len(src_dpath.ls()) == 0
"""
_worker_func = ub.delete
_operation_name = 'delete'
def __init__(self, workers=0, mode='thread', eager=False,
overwrite=False, skip_existing=False):
"""
Args:
workers (int): number of parallel workers to use
mode (str): thread, process, or serial
eager (bool):
if True starts copying as soon as a job is submitted, otherwise it
wait until run is called.
overwrite (bool):
if True will overwrite the file if it exists, otherwise it will
error unless skip_existing is True. Defaults to False.
skip_existing (bool):
if jobs where the destination already exists should be skipped by
default. Default=False
"""
super().__init__(workers=workers, mode=mode, eager=eager)
self.skip_existing = skip_existing
self.overwrite = overwrite
[docs]
def submit_many(self, paths):
# Do we want to just do a type check on the submit method?
for p in paths:
self.submit(p)
[docs]
def submit(self, path):
"""
Args:
path (str | PathLike): path to delete
"""
task = {
'path': path,
}
if self.eager:
self._pool.submit(ub.delete, **task)
else:
self._unsubmitted.append(task)
[docs]
class CopyManager(_FilesystemOperationManager):
"""
Helper to execute multiple copy operations on a local filesystem.
Notes:
It would be nice for this to support an rsync backend that could sync
at the src/dst pair level. Not sure if this works.
References:
https://unix.stackexchange.com/questions/133995/rsyncing-multiple-src-dest-pairs
https://serverfault.com/questions/163859/using-rsync-as-a-queue
https://unix.stackexchange.com/questions/602606/rsync-source-list-to-destination-list
TODO:
- [ ] Add optional check that all src paths exist
- [ ] Add optional check that all dst paths do not exist (unless overwrite=True or skip_existing=True)
- [ ] Add optional check that that no dst path is or is inside of a src
dpath (would make things ambiguous), the operation graph should be
bipartite.
- [ ] Add backend that uses a fast protocol like rsync (or write one in Rust)
Example:
>>> import ubelt as ub
>>> from kwutil.fsops_managers import CopyManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'copy_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> dst_dpath = (dpath / 'dst').delete()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>> fpath.touch()
>>> # To use a copy manager, iterate through your source and
>>> # destination paths and submit them.
>>> copyman = CopyManager(workers=0)
>>> # by default it will do nothing
>>> # unless you specify eager=True or explicitly call run.
>>> for fpath in src_fpaths:
>>> dst = fpath.augment(dpath=dst_dpath)
>>> copyman.submit(fpath, dst)
>>> report = copyman.report()
>>> print(f'report = {ub.urepr(report, nl=1)}')
>>> copyman.run()
Example:
>>> import ubelt as ub
>>> from kwutil.fsops_managers import CopyManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'copy_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> dst_dpath = (dpath / 'dst').delete()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>> fpath.touch()
>>> copyman = CopyManager(workers=0)
>>> for fpath in src_fpaths:
>>> dst = fpath.augment(dpath=dst_dpath)
>>> copyman.submit(fpath, dst)
>>> copyman.run()
>>> assert len(dst_dpath.ls()) == len(src_dpath.ls())
>>> copyman = CopyManager(workers=0)
>>> for fpath in src_fpaths:
>>> dst = fpath.augment(dpath=dst_dpath)
>>> copyman.submit(fpath, dst)
>>> import pytest
>>> with pytest.raises(FileExistsError):
>>> copyman.run()
>>> copyman = CopyManager(workers=0)
>>> for fpath in src_fpaths:
>>> dst = fpath.augment(dpath=dst_dpath)
>>> copyman.submit(fpath, dst, skip_existing=True)
>>> copyman.run()
"""
_worker_func = _copy_worker
_operation_name = 'copy'
def __init__(self, workers=0, mode='thread', eager=False,
overwrite=False, skip_existing=False):
"""
Args:
workers (int): number of parallel workers to use
mode (str): thread, process, or serial
eager (bool):
if True starts copying as soon as a job is submitted, otherwise it
wait until run is called.
overwrite (bool):
if True will overwrite the file if it exists, otherwise it will
error unless skip_existing is True. Defaults to False.
skip_existing (bool):
if jobs where the destination already exists should be skipped by
default. Default=False
"""
super().__init__(workers=workers, mode=mode, eager=eager)
self.skip_existing = skip_existing
self.overwrite = overwrite
[docs]
def submit(self, src, dst, skip_existing=False, overwrite=None,
follow_file_symlinks=False, follow_dir_symlinks=False,
meta='stats'):
"""
Args:
src (str | PathLike): source file or directory
dst (str | PathLike): destination file or directory
skip_existing (bool | None):
if jobs where the destination already exists should be skipped by
default. If None, then uses the class default. Default=None
overwrite (bool | None):
if True will overwrite the file if it exists, otherwise it will
error unless skip_existing is True. If None, then uses the
class default. Default=None.
follow_file_symlinks (bool):
If True and src is a link, the link will be resolved before
it is copied (i.e. the data is duplicated), otherwise just
the link itself will be copied.
follow_dir_symlinks (bool):
if True when src is a directory and contains symlinks to
other directories, the contents of the linked data are
copied, otherwise when False only the link itself is
copied.
meta (str | None):
Indicates what metadata bits to copy. This can be 'stats' which
tries to copy all metadata (i.e. like :py:func:`shutil.copy2`),
'mode' which copies just the permission bits (i.e. like
:py:func:`shutil.copy`), or None, which ignores all metadata
(i.e. like :py:func:`shutil.copyfile`).
"""
if skip_existing is None:
skip_existing = self.skip_existing
if overwrite is None:
overwrite = self.overwrite
task = {
'src': src,
'dst': dst,
'skip_existing': skip_existing,
'overwrite': overwrite,
'follow_file_symlinks': follow_file_symlinks,
'follow_dir_symlinks': follow_dir_symlinks,
'meta': meta,
}
if self.eager:
self._pool.submit(_copy_worker, **task)
else:
self._unsubmitted.append(task)
[docs]
def report(self):
report = self._unsubmitted_report()
return report
[docs]
def _unsubmitted_report(self):
"""
Build a report on the unsubmitted jobs.
"""
from collections import Counter
from os.path import commonprefix
stats = Counter({
'skips': 0,
'overwrites': 0,
'num_tasks': len(self._unsubmitted),
})
src_stat_accum = Counter()
dst_stat_accum = Counter()
src_paths = [task['src'] for task in self._unsubmitted]
dst_paths = [task['dst'] for task in self._unsubmitted]
meta = {}
duplicate_src_paths = ub.find_duplicates(src_paths)
duplicate_dst_paths = ub.find_duplicates(dst_paths)
meta['common_src_prefix'] = commonprefix(src_paths)
meta['common_dst_prefix'] = commonprefix(dst_paths)
stats['duplicate_src_paths'] = len(duplicate_src_paths)
stats['duplicate_dst_paths'] = len(duplicate_dst_paths)
if 0:
records = []
prog = ub.ProgIter(self._unsubmitted, desc='build report')
for task in prog:
dst_exists = task['dst'].exists()
src_stat = _pathinfo(task['src'])
dst_stat = _pathinfo(task['dst'])
src_stat_accum.update(src_stat['stats'])
dst_stat_accum.update(dst_stat['stats'])
if dst_exists:
if task['skip_existing']:
stats['skips'] += 1
elif task['overwrite']:
stats['overwrites'] += 1
records.append({
'src_stat': src_stat,
'dst_stat': dst_stat,
'task': task
})
report = {**meta, **stats}
return report
[docs]
class MoveManager(_FilesystemOperationManager):
"""
Helper to execute multiple move operations on a local filesystem.
TODO:
- [ ] Add optional check that all src paths exist
- [ ] Add optional check that all dst paths do not exist
- [ ] Add optional check that that no dst path is or is inside of a src
dpath (would make things ambiguous), the operation graph should be
bipartite.
Example:
>>> import ubelt as ub
>>> from kwutil.fsops_managers import MoveManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'move_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> dst_dpath = (dpath / 'dst').delete()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>> fpath.touch()
>>> moveman = MoveManager(workers=0)
>>> for src_fpath in src_fpaths:
>>> dst_fpath = src_fpath.augment(dpath=dst_dpath)
>>> moveman.submit(src_fpath, dst_fpath)
>>> moveman.run()
>>> assert len(dst_dpath.ls()) == len(src_fpaths)
>>> assert len(src_dpath.ls()) == 0
"""
_worker_func = _move_worker
_operation_name = 'move'
def __init__(self, workers=0, mode='thread', eager=False):
"""
Args:
workers (int): number of parallel workers to use
mode (str): thread, process, or serial
eager (bool):
if True starts copying as soon as a job is submitted, otherwise it
wait until run is called.
"""
super().__init__(workers=workers, mode=mode, eager=eager)
[docs]
def submit(self, src, dst, skip_existing=False, follow_file_symlinks=False,
follow_dir_symlinks=False, meta='stats'):
"""
Args:
src (str | PathLike): source file or directory
dst (str | PathLike): destination file or directory
follow_file_symlinks (bool):
If True and src is a link, the link will be resolved before
it is copied (i.e. the data is duplicated), otherwise just
the link itself will be copied.
follow_dir_symlinks (bool):
if True when src is a directory and contains symlinks to
other directories, the contents of the linked data are
copied, otherwise when False only the link itself is
copied.
meta (str | None):
Indicates what metadata bits to copy. This can be 'stats' which
tries to copy all metadata (i.e. like :py:func:`shutil.copy2`),
'mode' which copies just the permission bits (i.e. like
:py:func:`shutil.copy`), or None, which ignores all metadata
(i.e. like :py:func:`shutil.copyfile`).
"""
task = {
'src': src,
'dst': dst,
'follow_file_symlinks': follow_file_symlinks,
'follow_dir_symlinks': follow_dir_symlinks,
'meta': meta,
}
if self.eager:
self._pool.submit(_move_worker, **task)
else:
self._unsubmitted.append(task)
[docs]
def _check(self):
"""
Validate that the set of move tasks looks sane.
Exact logic of this is currently in flux.
"""
tasks = self._unsubmitted
paths = [task['src'] for task in tasks]
dst_paths = [task['dst'] for task in tasks]
dupliate_destinations = ub.find_duplicates(dst_paths)
print('dupliate_destinations = {}'.format(ub.urepr(dupliate_destinations, nl=1)))
assert not dupliate_destinations
src_exist_flags = [task['src'].exists() for task in tasks]
dst_exist_flags = [task['dst'].exists() for task in tasks]
both_exist_flags = [f1 and f2 for f1, f2 in zip(src_exist_flags, dst_exist_flags)]
total = len(tasks)
num_both_exist = sum(both_exist_flags)
num_src_exist = sum(src_exist_flags)
num_dst_exist = sum(dst_exist_flags)
num_src_missing = total - num_src_exist
print(f'num_src_exist={num_src_exist}')
print(f'num_dst_exist={num_dst_exist}')
print(f'num_src_missing={num_src_missing}')
print(f'num_both_exist={num_both_exist}')
print(f'total={total}')
# TODO:
# Check if any tasks are moving directories, and if so, ensure they
if num_both_exist:
raise Exception('A src and dst path both exist! this error cannot be bypassed: {num_both_exist} / {total}')
errors = []
if num_dst_exist > 0:
if num_dst_exist == total:
errors.append('Script seems like it already ran')
else:
errors.append(f'Script seems like it partially ran: {num_dst_exist} / {total}')
if num_src_missing > 0:
errors.append(f'Missing source {num_src_missing} / {total} data')
if errors:
raise Exception('\n'.join(errors))
_unsubmitted_info(paths)
[docs]
def remove_empty_dirs(dpath):
"""
Remove any directories that are empty or only contain (recursively) other
empty directories.
In bash this is similar to
.. code:: bash
# with POSIX find
find . -type d -empty -print
find . -type d -empty -delete
# or with fd-find
fd -u --type empty --type directory
fd -u --type empty --type directory -x rmdir
Args:
dpath (str | PathLike):
directory to remove other empty directories in. If the input
directory is empty it is also removed.
References:
.. [UnixSE46322] https://unix.stackexchange.com/questions/46322/how-can-i-recursively-delete-empty-directories-in-my-home-directory
Example:
>>> import ubelt as ub
>>> from kwutil.fsops_managers import remove_empty_dirs
>>> test_dpath = ub.Path.appdir('kwutil', 'tests', 'remove_empty_dirs')
>>> (test_dpath / 'dir1' / 'dir2' / 'dir3').ensuredir()
>>> dpath = (test_dpath / 'dir1')
>>> assert dpath.exists()
>>> remove_empty_dirs(dpath)
>>> assert not dpath.exists()
"""
# in bash:
import os
empty_dpaths = True
dpath = os.path.abspath(dpath)
while empty_dpaths:
empty_dpaths = []
for r, ds, fs in os.walk(dpath):
if not ds and not fs:
empty_dpaths.append(r)
for d in empty_dpaths:
os.rmdir(d)
[docs]
def _unsubmitted_info(paths):
from kwutil.util_units import byte_str
pathinfos = []
for path in paths:
node_data = _pathinfo(path)
node_data['path'] = path
pathinfos.append(node_data)
sizes = ub.ddict(int)
for info in pathinfos:
for k, v in info['stats'].items():
if k.endswith('.size'):
sizes[k + '.' + info['typelabel']] += v
human_sizes = ub.udict(sizes).map_values(byte_str)
print(f'human_sizes={human_sizes}')
type_hist = ub.dict_hist(p['typelabel'] for p in pathinfos)
print(f'type_hist = {ub.urepr(type_hist, nl=1)}')
[docs]
def _pathinfo(path, with_stats=True, assume_exists=False):
# From xdev DirectoryWalker
import os
node_data = {}
if not assume_exists:
node_data['exists'] = path.exists()
islink = os.path.islink(path)
isfile = os.path.isfile(path)
isdir = os.path.isdir(path)
if islink:
target = os.readlink(path)
isbroken = not isdir and not isfile
node_data['broken'] = isbroken
node_data['target'] = target
if isfile:
node_data['X_ok'] = os.access(path, os.X_OK)
types = []
if islink:
types.append('L')
if isbroken:
types.append('B')
if isfile:
types.append('F')
if isdir:
types.append('D')
typelabel = ''.join(types)
node_data['islink'] = islink
node_data['isfile'] = isfile
node_data['isdir'] = isdir
node_data['typelabel'] = typelabel
if with_stats:
ext = path.suffix
prefix = ext.lstrip('.') + '.'
stats = {}
try:
stat_obj = path.stat(follow_symlinks=False)
except FileNotFoundError:
stats['broken_link'] = True
stats['size'] = 0
else:
stats['size'] = stat_obj.st_size
stats['files'] = 1
stats = {prefix + k: v for k, v in stats.items()}
node_data['stats'] = stats
return node_data