kwutil.fsops_managers module

Filesystem Operation Managers

This module defines:

which can perform their respective operations using multiple threads / workers.

Todo

  • [ ] In CopyManager and MoveManager add option to check that no files are

    written to the same logical location (ignore symlink physical location problems, which would require a more expensive check).

class kwutil.fsops_managers._FilesystemOperationManager(workers=0, mode='thread', eager=False)[source]

Bases: object

Abstract class for shared components of Copy / Move / Delete manager.

Each of these managerw will have a queue of unsubmitted jobs that haven’t started to be worked on. They will also have a pool where they can submit jobs. Lastly, they will all have an eager or non-eager mode where in eager mode they will start a job as soon as they get it (i.e. there will be no unsubmitted jobs), but in non-eager mode, the develoepr will have to explicitly call run.

_worker_func = NotImplemented
_operation_name = NotImplemented
run(desc=None, verbose=1, pman=None)[source]
Parameters:
  • desc (str | None) – description for progress bars

  • verbsoe (int) – verbosity level

kwutil.fsops_managers._copy_worker(src, dst, skip_existing, overwrite, follow_file_symlinks, follow_dir_symlinks, meta)[source]
Parameters:
  • str (PathLike | str)

  • dst (PathLike | str)

  • overwrite (bool)

  • skip_existing (bool)

kwutil.fsops_managers._move_worker(src, dst, follow_file_symlinks, follow_dir_symlinks, meta)[source]
Parameters:
  • str (PathLike | str)

  • dst (PathLike | str)

class kwutil.fsops_managers.DeleteManager(workers=0, mode='thread', eager=False, overwrite=False, skip_existing=False)[source]

Bases: _FilesystemOperationManager

Helper to execute multiple delete operations on a local filesystem.

Example

>>> import ubelt as ub
>>> from kwutil.fsops_managers import DeleteManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'delete_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>>     fpath.touch()
>>> deleteman = DeleteManager(workers=0, eager=False)
>>> for fpath in src_fpaths:
>>>     deleteman.submit(fpath)
>>> assert len(src_dpath.ls()) == 10
>>> deleteman.run()
>>> assert len(src_dpath.ls()) == 0
Parameters:
  • workers (int) – number of parallel workers to use

  • mode (str) – thread, process, or serial

  • eager (bool) – if True starts copying as soon as a job is submitted, otherwise it wait until run is called.

  • overwrite (bool) – if True will overwrite the file if it exists, otherwise it will error unless skip_existing is True. Defaults to False.

  • skip_existing (bool) – if jobs where the destination already exists should be skipped by default. Default=False

_worker_func(verbose=False)

Removes a file or recursively removes a directory. If a path does not exist, then this is does nothing.

Parameters:
  • path (str | PathLike) – file or directory to remove

  • verbose (bool) – if True prints what is being done

SeeAlso:
send2trash -

A cross-platform Python package for sending files to the trash instead of irreversibly deleting them.

ubelt.util_path.Path.delete()

Notes

This can call os.unlink(), os.rmdir(), or shutil.rmtree(), depending on what path references on the filesystem. (On windows may also call a custom ubelt._win32_links._win32_rmtree()).

Example

>>> import ubelt as ub
>>> from os.path import join
>>> base = ub.Path.appdir('ubelt', 'delete_test').ensuredir()
>>> dpath1 = ub.ensuredir(join(base, 'dir'))
>>> ub.ensuredir(join(base, 'dir', 'subdir'))
>>> ub.touch(join(base, 'dir', 'to_remove1.txt'))
>>> fpath1 = join(base, 'dir', 'subdir', 'to_remove3.txt')
>>> fpath2 = join(base, 'dir', 'subdir', 'to_remove2.txt')
>>> ub.touch(fpath1)
>>> ub.touch(fpath2)
>>> assert all(map(exists, (dpath1, fpath1, fpath2)))
>>> ub.delete(fpath1)
>>> assert all(map(exists, (dpath1, fpath2)))
>>> assert not exists(fpath1)
>>> ub.delete(dpath1)
>>> assert not any(map(exists, (dpath1, fpath1, fpath2)))

Example

>>> import ubelt as ub
>>> from os.path import exists, join
>>> dpath = ub.Path.appdir('ubelt', 'delete_test2').ensuredir()
>>> dpath1 = ub.ensuredir(join(dpath, 'dir'))
>>> fpath1 = ub.touch(join(dpath1, 'to_remove.txt'))
>>> assert exists(fpath1)
>>> ub.delete(dpath)
>>> assert not exists(fpath1)
_operation_name = 'delete'
submit_many(paths)[source]
submit(path)[source]
Parameters:

path (str | PathLike) – path to delete

class kwutil.fsops_managers.CopyManager(workers=0, mode='thread', eager=False, overwrite=False, skip_existing=False)[source]

Bases: _FilesystemOperationManager

Helper to execute multiple copy operations on a local filesystem.

Notes

It would be nice for this to support an rsync backend that could sync at the src/dst pair level. Not sure if this works.

References

https://unix.stackexchange.com/questions/133995/rsyncing-multiple-src-dest-pairs https://serverfault.com/questions/163859/using-rsync-as-a-queue https://unix.stackexchange.com/questions/602606/rsync-source-list-to-destination-list

Todo

  • [ ] Add optional check that all src paths exist

  • [ ] Add optional check that all dst paths do not exist (unless overwrite=True or skip_existing=True)

  • [ ] Add optional check that that no dst path is or is inside of a src

    dpath (would make things ambiguous), the operation graph should be bipartite.

  • [ ] Add backend that uses a fast protocol like rsync (or write one in Rust)

Example

>>> import ubelt as ub
>>> from kwutil.fsops_managers import CopyManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'copy_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> dst_dpath = (dpath / 'dst').delete()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>>     fpath.touch()
>>> # To use a copy manager, iterate through your source and
>>> # destination paths and submit them.
>>> copyman = CopyManager(workers=0)
>>> # by default it will do nothing
>>> # unless you specify eager=True or explicitly call run.
>>> for fpath in src_fpaths:
>>>     dst = fpath.augment(dpath=dst_dpath)
>>>     copyman.submit(fpath, dst)
>>> report = copyman.report()
>>> print(f'report = {ub.urepr(report, nl=1)}')
>>> copyman.run()

Example

>>> import ubelt as ub
>>> from kwutil.fsops_managers import CopyManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'copy_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> dst_dpath = (dpath / 'dst').delete()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>>     fpath.touch()
>>> copyman = CopyManager(workers=0)
>>> for fpath in src_fpaths:
>>>     dst = fpath.augment(dpath=dst_dpath)
>>>     copyman.submit(fpath, dst)
>>> copyman.run()
>>> assert len(dst_dpath.ls()) == len(src_dpath.ls())
>>> copyman = CopyManager(workers=0)
>>> for fpath in src_fpaths:
>>>     dst = fpath.augment(dpath=dst_dpath)
>>>     copyman.submit(fpath, dst)
>>> import pytest
>>> with pytest.raises(FileExistsError):
>>>     copyman.run()
>>> copyman = CopyManager(workers=0)
>>> for fpath in src_fpaths:
>>>     dst = fpath.augment(dpath=dst_dpath)
>>>     copyman.submit(fpath, dst, skip_existing=True)
>>> copyman.run()
Parameters:
  • workers (int) – number of parallel workers to use

  • mode (str) – thread, process, or serial

  • eager (bool) – if True starts copying as soon as a job is submitted, otherwise it wait until run is called.

  • overwrite (bool) – if True will overwrite the file if it exists, otherwise it will error unless skip_existing is True. Defaults to False.

  • skip_existing (bool) – if jobs where the destination already exists should be skipped by default. Default=False

_worker_func(dst, skip_existing, overwrite, follow_file_symlinks, follow_dir_symlinks, meta)
Parameters:
  • str (PathLike | str)

  • dst (PathLike | str)

  • overwrite (bool)

  • skip_existing (bool)

_operation_name = 'copy'
submit(src, dst, skip_existing=False, overwrite=None, follow_file_symlinks=False, follow_dir_symlinks=False, meta='stats')[source]
Parameters:
  • src (str | PathLike) – source file or directory

  • dst (str | PathLike) – destination file or directory

  • skip_existing (bool | None) – if jobs where the destination already exists should be skipped by default. If None, then uses the class default. Default=None

  • overwrite (bool | None) – if True will overwrite the file if it exists, otherwise it will error unless skip_existing is True. If None, then uses the class default. Default=None.

  • follow_file_symlinks (bool) – If True and src is a link, the link will be resolved before it is copied (i.e. the data is duplicated), otherwise just the link itself will be copied.

  • follow_dir_symlinks (bool) – if True when src is a directory and contains symlinks to other directories, the contents of the linked data are copied, otherwise when False only the link itself is copied.

  • meta (str | None) – Indicates what metadata bits to copy. This can be ‘stats’ which tries to copy all metadata (i.e. like shutil.copy2()), ‘mode’ which copies just the permission bits (i.e. like shutil.copy()), or None, which ignores all metadata (i.e. like shutil.copyfile()).

report()[source]
_unsubmitted_report()[source]

Build a report on the unsubmitted jobs.

class kwutil.fsops_managers.MoveManager(workers=0, mode='thread', eager=False)[source]

Bases: _FilesystemOperationManager

Helper to execute multiple move operations on a local filesystem.

Todo

  • [ ] Add optional check that all src paths exist

  • [ ] Add optional check that all dst paths do not exist

  • [ ] Add optional check that that no dst path is or is inside of a src

    dpath (would make things ambiguous), the operation graph should be bipartite.

Example

>>> import ubelt as ub
>>> from kwutil.fsops_managers import MoveManager
>>> dpath = ub.Path.appdir('kwutil', 'tests', 'move_manager')
>>> src_dpath = (dpath / 'src').ensuredir()
>>> dst_dpath = (dpath / 'dst').delete()
>>> src_fpaths = [src_dpath / 'file{}.txt'.format(i) for i in range(10)]
>>> for fpath in src_fpaths:
>>>     fpath.touch()
>>> moveman = MoveManager(workers=0)
>>> for src_fpath in src_fpaths:
>>>     dst_fpath = src_fpath.augment(dpath=dst_dpath)
>>>     moveman.submit(src_fpath, dst_fpath)
>>> moveman.run()
>>> assert len(dst_dpath.ls()) == len(src_fpaths)
>>> assert len(src_dpath.ls()) == 0
Parameters:
  • workers (int) – number of parallel workers to use

  • mode (str) – thread, process, or serial

  • eager (bool) – if True starts copying as soon as a job is submitted, otherwise it wait until run is called.

_worker_func(dst, follow_file_symlinks, follow_dir_symlinks, meta)
Parameters:
  • str (PathLike | str)

  • dst (PathLike | str)

_operation_name = 'move'
submit(src, dst, skip_existing=False, follow_file_symlinks=False, follow_dir_symlinks=False, meta='stats')[source]
Parameters:
  • src (str | PathLike) – source file or directory

  • dst (str | PathLike) – destination file or directory

  • follow_file_symlinks (bool) – If True and src is a link, the link will be resolved before it is copied (i.e. the data is duplicated), otherwise just the link itself will be copied.

  • follow_dir_symlinks (bool) – if True when src is a directory and contains symlinks to other directories, the contents of the linked data are copied, otherwise when False only the link itself is copied.

  • meta (str | None) – Indicates what metadata bits to copy. This can be ‘stats’ which tries to copy all metadata (i.e. like shutil.copy2()), ‘mode’ which copies just the permission bits (i.e. like shutil.copy()), or None, which ignores all metadata (i.e. like shutil.copyfile()).

_check()[source]

Validate that the set of move tasks looks sane.

Exact logic of this is currently in flux.

kwutil.fsops_managers.remove_empty_dirs(dpath)[source]

Remove any directories that are empty or only contain (recursively) other empty directories.

In bash this is similar to

Parameters:

dpath (str | PathLike) – directory to remove other empty directories in. If the input directory is empty it is also removed.

References

Example

>>> import ubelt as ub
>>> from kwutil.fsops_managers import remove_empty_dirs
>>> test_dpath = ub.Path.appdir('kwutil', 'tests', 'remove_empty_dirs')
>>> (test_dpath / 'dir1' / 'dir2' / 'dir3').ensuredir()
>>> dpath = (test_dpath / 'dir1')
>>> assert dpath.exists()
>>> remove_empty_dirs(dpath)
>>> assert not dpath.exists()
kwutil.fsops_managers._unsubmitted_info(paths)[source]
kwutil.fsops_managers._pathinfo(path, with_stats=True, assume_exists=False)[source]