datamol.utils
¶
Parallelization helpers¶
parallelized(fn, inputs_list, scheduler='processes', n_jobs=-1, batch_size=None, progress=False, arg_type='arg', total=None, tqdm_kwargs=None, **job_kwargs)
¶
Run a function in parallel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
Callable |
The function to run in parallel. |
required |
inputs_list |
Iterable[Any] |
List of inputs to pass to |
required |
scheduler |
str |
Choose between ["processes", "threads"]. Defaults to None which uses the default joblib "loky" scheduler. |
'processes' |
n_jobs |
Optional[int] |
Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation |
-1 |
batch_size |
int |
Whether to automatically batch |
None |
progress |
bool |
Display a progress bar. Defaults to False. |
False |
arg_type |
str |
One of ["arg", "args", "kwargs]:
- "arg": the input is passed as an argument: |
'arg' |
total |
int |
The number of elements in the iterator. Only used when |
None |
tqdm_kwargs |
dict |
Any additional arguments supported by the |
None |
job_kwargs |
Any additional arguments supported by |
{} |
Returns:
Type | Description |
---|---|
Sequence[Optional[Any]] |
The results of the execution as a list. |
Source code in datamol/utils/jobs.py
def parallelized(
fn: Callable,
inputs_list: Iterable[Any],
scheduler: str = "processes",
n_jobs: Optional[int] = -1,
batch_size: int = None,
progress: bool = False,
arg_type: str = "arg",
total: int = None,
tqdm_kwargs: dict = None,
**job_kwargs,
) -> Sequence[Optional[Any]]:
"""Run a function in parallel.
Args:
fn: The function to run in parallel.
inputs_list: List of inputs to pass to `fn`.
scheduler: Choose between ["processes", "threads"]. Defaults
to None which uses the default joblib "loky" scheduler.
n_jobs: Number of process. Use 0 or None to force sequential.
Use -1 to use all the available processors. For details see
https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
batch_size: Whether to automatically batch `inputs_list`. You should only use it when the length
of `inputs_list` is very large (>100k elements). The length of `inputs_list` must also be
defined.
progress: Display a progress bar. Defaults to False.
arg_type: One of ["arg", "args", "kwargs]:
- "arg": the input is passed as an argument: `fn(arg)` (default).
- "args": the input is passed as a list: `fn(*args)`.
- "kwargs": the input is passed as a map: `fn(**kwargs)`.
total: The number of elements in the iterator. Only used when `progress` is True.
tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
job_kwargs: Any additional arguments supported by `joblib.Parallel`.
Returns:
The results of the execution as a list.
"""
runner = JobRunner(
n_jobs=n_jobs,
batch_size=batch_size,
progress=progress,
prefer=scheduler,
total=total,
tqdm_kwargs=tqdm_kwargs,
**job_kwargs,
)
return runner(fn, inputs_list, arg_type=arg_type)
JobRunner
¶
Source code in datamol/utils/jobs.py
class JobRunner:
def __init__(
self,
n_jobs: Optional[int] = -1,
batch_size: int = None,
prefer: str = None,
progress: bool = False,
total: int = None,
tqdm_kwargs: dict = None,
**job_kwargs,
):
"""
JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
allows taking advantage of its features, while the progress bar use tqdm
Args:
n_jobs: Number of process. Use 0 or None to force sequential.
Use -1 to use all the available processors. For details see
https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
batch_size: Whether to batch `inputs_list`. You can specify batch_size when the length
of `inputs_list` is very large (>100k elements). By default, the auto batching of joblib is used.
prefer: Choose from ['processes', 'threads'] or None. Default to None.
Soft hint to choose the default backend if no specific backend
was selected with the parallel_backend context manager. The
default process-based backend is 'loky' and the default
thread-based backend is 'threading'. Ignored if the ``backend``
parameter is specified.
progress: whether to display progress bar
total: The number of elements in the iterator. Only used when `progress` is True.
tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
job_kwargs: Any additional arguments supported by `joblib.Parallel`.
Example:
```python
import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
```
"""
self.n_jobs = n_jobs
self.batch_size = batch_size or "auto"
self.prefer = prefer
self.job_kwargs = job_kwargs
self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer, batch_size=self.batch_size)
self.no_progress = not progress
self.total = total
self.tqdm_kwargs = tqdm_kwargs or {}
@property
def is_sequential(self):
"""Check whether the job is sequential or parallel"""
return (self.n_jobs is None) or (self.n_jobs in [0, 1])
@staticmethod
def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
"""Small wrapper around a callable to properly format it's argument"""
def _run(args: Any):
if arg_type == "kwargs":
fn_kwargs.update(**args)
return fn(**fn_kwargs)
elif arg_type == "args":
return fn(*args, **fn_kwargs)
return fn(args, **fn_kwargs)
return _run
def sequential(
self,
callable_fn: Callable,
data: Iterable[Any],
arg_type: Optional[str] = None,
**fn_kwargs,
):
"""
Run job in sequential version
Args:
callable_fn (callable): function to call
data (iterable): input data
arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
"""
total_length = JobRunner.get_iterator_length(data)
if self.total is not None:
self.tqdm_kwargs["total"] = self.total
elif "total" not in self.tqdm_kwargs:
self.tqdm_kwargs["total"] = total_length
if "disable" not in self.tqdm_kwargs:
self.tqdm_kwargs["disable"] = self.no_progress
results = [
JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
for dt in tqdm(data, **self.tqdm_kwargs)
]
return results
def parallel(
self,
callable_fn: Callable,
data: Iterable[Any],
arg_type: Optional[str] = None,
**fn_kwargs,
):
"""
Run job in parallel
Args:
callable_fn (callable): function to call
data (iterable): input data
arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
"""
total_length = JobRunner.get_iterator_length(data)
if self.total is not None:
self.tqdm_kwargs["total"] = self.total
elif "total" not in self.tqdm_kwargs:
self.tqdm_kwargs["total"] = total_length
if "disable" not in self.tqdm_kwargs:
self.tqdm_kwargs["disable"] = self.no_progress
runner = JobRunner._parallel_helper(**self.job_kwargs)
results = runner(**self.tqdm_kwargs)(
delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
)
return results
def __call__(self, *args, **kwargs):
"""
Run job using the n_jobs attribute to determine regime
"""
if self.is_sequential:
return self.sequential(*args, **kwargs)
return self.parallel(*args, **kwargs)
@staticmethod
def _parallel_helper(**joblib_args):
r"""
Parallel helper function for joblib with tqdm support
"""
def run(**tq_args):
def tmp(op_iter):
with _tqdm_callback(tqdm(**tq_args)):
return Parallel(**joblib_args)(op_iter)
return tmp
return run
@staticmethod
def get_iterator_length(data):
"""Attempt to get the length of an iterator"""
total_length = None
try:
total_length = len(data)
except TypeError:
# most likely a generator, ignore
pass
return total_length
is_sequential
property
readonly
¶
Check whether the job is sequential or parallel
__call__(self, *args, **kwargs)
special
¶
Run job using the n_jobs attribute to determine regime
Source code in datamol/utils/jobs.py
def __call__(self, *args, **kwargs):
"""
Run job using the n_jobs attribute to determine regime
"""
if self.is_sequential:
return self.sequential(*args, **kwargs)
return self.parallel(*args, **kwargs)
__init__(self, n_jobs=-1, batch_size=None, prefer=None, progress=False, total=None, tqdm_kwargs=None, **job_kwargs)
special
¶
JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which allows taking advantage of its features, while the progress bar use tqdm
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_jobs |
Optional[int] |
Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation |
-1 |
batch_size |
int |
Whether to batch |
None |
prefer |
str |
Choose from ['processes', 'threads'] or None. Default to None.
Soft hint to choose the default backend if no specific backend
was selected with the parallel_backend context manager. The
default process-based backend is 'loky' and the default
thread-based backend is 'threading'. Ignored if the |
None |
progress |
bool |
whether to display progress bar |
False |
total |
int |
The number of elements in the iterator. Only used when |
None |
tqdm_kwargs |
dict |
Any additional arguments supported by the |
None |
job_kwargs |
Any additional arguments supported by |
{} |
Examples:
import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
Source code in datamol/utils/jobs.py
def __init__(
self,
n_jobs: Optional[int] = -1,
batch_size: int = None,
prefer: str = None,
progress: bool = False,
total: int = None,
tqdm_kwargs: dict = None,
**job_kwargs,
):
"""
JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
allows taking advantage of its features, while the progress bar use tqdm
Args:
n_jobs: Number of process. Use 0 or None to force sequential.
Use -1 to use all the available processors. For details see
https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
batch_size: Whether to batch `inputs_list`. You can specify batch_size when the length
of `inputs_list` is very large (>100k elements). By default, the auto batching of joblib is used.
prefer: Choose from ['processes', 'threads'] or None. Default to None.
Soft hint to choose the default backend if no specific backend
was selected with the parallel_backend context manager. The
default process-based backend is 'loky' and the default
thread-based backend is 'threading'. Ignored if the ``backend``
parameter is specified.
progress: whether to display progress bar
total: The number of elements in the iterator. Only used when `progress` is True.
tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
job_kwargs: Any additional arguments supported by `joblib.Parallel`.
Example:
```python
import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
```
"""
self.n_jobs = n_jobs
self.batch_size = batch_size or "auto"
self.prefer = prefer
self.job_kwargs = job_kwargs
self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer, batch_size=self.batch_size)
self.no_progress = not progress
self.total = total
self.tqdm_kwargs = tqdm_kwargs or {}
get_iterator_length(data)
staticmethod
¶
Attempt to get the length of an iterator
Source code in datamol/utils/jobs.py
@staticmethod
def get_iterator_length(data):
"""Attempt to get the length of an iterator"""
total_length = None
try:
total_length = len(data)
except TypeError:
# most likely a generator, ignore
pass
return total_length
parallel(self, callable_fn, data, arg_type=None, **fn_kwargs)
¶
Run job in parallel
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callable_fn |
callable |
function to call |
required |
data |
iterable |
input data |
required |
arg_type |
str |
function argument type ('arg'/None or 'args' or 'kwargs') |
None |
fn_kwargs |
dict |
optional keyword argument to pass to the callable funciton |
{} |
Source code in datamol/utils/jobs.py
def parallel(
self,
callable_fn: Callable,
data: Iterable[Any],
arg_type: Optional[str] = None,
**fn_kwargs,
):
"""
Run job in parallel
Args:
callable_fn (callable): function to call
data (iterable): input data
arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
"""
total_length = JobRunner.get_iterator_length(data)
if self.total is not None:
self.tqdm_kwargs["total"] = self.total
elif "total" not in self.tqdm_kwargs:
self.tqdm_kwargs["total"] = total_length
if "disable" not in self.tqdm_kwargs:
self.tqdm_kwargs["disable"] = self.no_progress
runner = JobRunner._parallel_helper(**self.job_kwargs)
results = runner(**self.tqdm_kwargs)(
delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
)
return results
sequential(self, callable_fn, data, arg_type=None, **fn_kwargs)
¶
Run job in sequential version
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callable_fn |
callable |
function to call |
required |
data |
iterable |
input data |
required |
arg_type |
str |
function argument type ('arg'/None or 'args' or 'kwargs') |
None |
fn_kwargs |
dict |
optional keyword argument to pass to the callable funciton |
{} |
Source code in datamol/utils/jobs.py
def sequential(
self,
callable_fn: Callable,
data: Iterable[Any],
arg_type: Optional[str] = None,
**fn_kwargs,
):
"""
Run job in sequential version
Args:
callable_fn (callable): function to call
data (iterable): input data
arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
"""
total_length = JobRunner.get_iterator_length(data)
if self.total is not None:
self.tqdm_kwargs["total"] = self.total
elif "total" not in self.tqdm_kwargs:
self.tqdm_kwargs["total"] = total_length
if "disable" not in self.tqdm_kwargs:
self.tqdm_kwargs["disable"] = self.no_progress
results = [
JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
for dt in tqdm(data, **self.tqdm_kwargs)
]
return results
wrap_fn(fn, arg_type=None, **fn_kwargs)
staticmethod
¶
Small wrapper around a callable to properly format it's argument
Source code in datamol/utils/jobs.py
@staticmethod
def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
"""Small wrapper around a callable to properly format it's argument"""
def _run(args: Any):
if arg_type == "kwargs":
fn_kwargs.update(**args)
return fn(**fn_kwargs)
elif arg_type == "args":
return fn(*args, **fn_kwargs)
return fn(args, **fn_kwargs)
return _run
Filesystem related utility functions¶
Build on top of fsspec
to manipulate and work with remote or local paths in a transparent manner.
The below functions are available under datamol.utils.fs
.
get_extension(path)
¶
Get the extension of a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def get_extension(path: Union[str, os.PathLike]):
"""Get the extension of a file.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
basename = get_basename(path)
return basename.split(".")[-1]
exists(path)
¶
Check whether a file or a directory exists.
Important: File-like object always exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def exists(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
"""Check whether a file or a directory exists.
Important: File-like object always exists.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
return is_file(path) or is_dir(path)
is_file(path)
¶
Check whether a file exists.
Important: File-like object always exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def is_file(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
"""Check whether a file exists.
Important: File-like object always exists.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
if isinstance(path, fsspec.core.OpenFile):
return path.fs.isfile(path.path)
elif isinstance(path, (str, pathlib.Path)):
mapper = get_mapper(str(path))
return mapper.fs.isfile(path)
else:
return True
is_dir(path)
¶
Check whether a file exists.
Important: File-like object always exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def is_dir(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
"""Check whether a file exists.
Important: File-like object always exists.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
if isinstance(path, fsspec.core.OpenFile):
return path.fs.isdir(path.path)
elif isinstance(path, (str, pathlib.Path)):
mapper = get_mapper(str(path))
return mapper.fs.isdir(path)
else:
return False
get_protocol(path, fs=None)
¶
Return the name of the path protocol.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def get_protocol(path: Union[str, os.PathLike], fs: fsspec.AbstractFileSystem = None):
"""Return the name of the path protocol.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
if fs is None:
fs = get_mapper(path).fs
protocol = fs.protocol # type: ignore
if "s3" in protocol:
return "s3"
elif "gs" in protocol:
return "gs"
elif isinstance(protocol, (tuple, list)):
return protocol[0]
return protocol
is_local_path(path)
¶
Check whether a path is local.
Source code in datamol/utils/fs.py
def is_local_path(path: Union[str, os.PathLike]):
"""Check whether a path is local."""
return get_protocol(str(path)) == "file"
join(*paths)
¶
Join paths together. The first element determine the filesystem to use (and so the separator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
paths |
a list of paths supported by |
() |
Source code in datamol/utils/fs.py
def join(*paths):
"""Join paths together. The first element determine the
filesystem to use (and so the separator.
Args:
paths: a list of paths supported by `fsspec` such as local, s3, gcs, etc.
"""
paths = [str(path).rstrip("/") for path in paths]
source_path = paths[0]
fs = get_mapper(source_path).fs
full_path = fs.sep.join(paths)
return full_path
get_size(file)
¶
Get the size of a file given its path. Return None if the size can't be retrieved.
Source code in datamol/utils/fs.py
def get_size(file: Union[str, os.PathLike, io.IOBase, fsspec.core.OpenFile]) -> Optional[int]:
"""Get the size of a file given its path. Return None if the
size can't be retrieved.
"""
if isinstance(file, io.IOBase) and hasattr(file, "name"):
fs_local = fsspec.filesystem("file")
file_size = fs_local.size(getattr(file, "name"))
elif isinstance(file, (str, pathlib.Path)):
fs = get_mapper(str(file)).fs
file_size = fs.size(str(file))
elif isinstance(file, fsspec.core.OpenFile):
file_size = file.fs.size(file.path)
else:
file_size = None
return file_size
copy_file(source, destination, chunk_size=None, force=False, progress=False, leave_progress=True)
¶
Copy one file to another location across different filesystem (local, S3, GCS, etc).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile] |
path or file-like object to copy from. |
required |
destination |
Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile] |
path or file-like object to copy to. |
required |
chunk_size |
int |
the chunk size to use. If progress is enabled the chunk
size is |
None |
force |
bool |
whether to overwrite the destination file if it exists. |
False |
progress |
bool |
whether to display a progress bar. |
False |
leave_progress |
bool |
whether to hide the progress bar once the copy is done. |
True |
Source code in datamol/utils/fs.py
def copy_file(
source: Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile],
destination: Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile],
chunk_size: int = None,
force: bool = False,
progress: bool = False,
leave_progress: bool = True,
):
"""Copy one file to another location across different filesystem (local, S3, GCS, etc).
Args:
source: path or file-like object to copy from.
destination: path or file-like object to copy to.
chunk_size: the chunk size to use. If progress is enabled the chunk
size is `None`, it is set to 1MB (1024 * 1024).
force: whether to overwrite the destination file if it exists.
progress: whether to display a progress bar.
leave_progress: whether to hide the progress bar once the copy is done.
"""
if progress and chunk_size is None:
chunk_size = 1024 * 1024
if isinstance(source, (str, pathlib.Path)):
source_file = fsspec.open(str(source), "rb")
else:
source_file = source
if isinstance(destination, (str, pathlib.Path)):
# adapt the file mode of the destination depending on the source file.
destination_mode = "wb"
if hasattr(source_file, "mode"):
destination_mode = "wb" if "b" in getattr(source_file, "mode") else "w"
elif isinstance(source_file, io.BytesIO):
destination_mode = "wb"
elif isinstance(source_file, io.StringIO):
destination_mode = "w"
destination_file = fsspec.open(str(destination), destination_mode)
else:
destination_file = destination
if not is_file(source_file):
raise ValueError(f"The file being copied does not exist or is not a file: {source}")
if not force and is_file(destination_file):
raise ValueError(f"The destination file to copy already exists: {destination}")
with source_file as source_stream:
with destination_file as destination_stream:
if chunk_size is None:
# copy without chunks
destination_stream.write(source_stream.read())
else:
# copy with chunks
# determine the size of the source file
source_size = None
if progress:
source_size = get_size(source)
pbar = None
if progress:
tqdm = _import_tqdm()
if tqdm is None:
raise ImportError(
"If the progress bar is enabled, you must have `tqdm` "
"installed: `conda install tqdm`."
)
else:
# init progress bar
pbar = tqdm(
total=source_size,
leave=leave_progress,
disable=not progress,
unit="B",
unit_divisor=1024,
unit_scale=True,
)
# start the loop
while True:
data = source_stream.read(chunk_size)
if not data:
break
destination_stream.write(data)
if pbar is not None:
pbar.update(chunk_size)
if pbar is not None:
pbar.close()
md5(filepath)
¶
Return the md5 hash of a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath |
Union[str, os.PathLike] |
The path to the file to compute the MD5 hash on. |
required |
Source code in datamol/utils/fs.py
def md5(filepath: Union[str, os.PathLike]):
"""Return the md5 hash of a file.
Args:
filepath: The path to the file to compute the MD5 hash on.
"""
with fsspec.open(filepath) as f:
file_hash = hashlib.md5()
file_hash.update(f.read())
file_hash = file_hash.hexdigest()
return file_hash
glob(path, detail=False, **kwargs)
¶
Find files by glob-matching.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
A glob-style path. |
required |
Source code in datamol/utils/fs.py
def glob(path: str, detail: bool = False, **kwargs) -> List[str]:
"""Find files by glob-matching.
Args:
path: A glob-style path.
"""
# Get the list of paths
fs = get_mapper(path).fs
paths = fs.glob(path, detail=detail, **kwargs)
paths = [fsspec.utils._unstrip_protocol(d, fs) for d in paths]
return paths
get_cache_dir(app_name, suffix=None, create=True)
¶
Get a local cache directory for a given application name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app_name |
str |
The name of the application. |
required |
suffix |
str |
A subdirectory appended to the cache dir. |
None |
create |
bool |
Whether to create the directory and its parents if it does not already exist. |
True |
Source code in datamol/utils/fs.py
def get_cache_dir(app_name: str, suffix: str = None, create: bool = True):
"""Get a local cache directory for a given application name.
Args:
app_name: The name of the application.
suffix: A subdirectory appended to the cache dir.
create: Whether to create the directory and its parents if it does not
already exist.
"""
appdirs = _import_appdirs()
if appdirs is None:
raise ImportError(
"To use `dm.utils.fs.get_cache_dir()`, you must have `appdirs` "
"installed: `conda install appdirs`."
)
cache_dir = pathlib.Path(appdirs.user_cache_dir(appname=app_name))
if suffix is not None:
cache_dir /= suffix
if create:
cache_dir.mkdir(exist_ok=True, parents=True)
return cache_dir
get_mapper(path)
¶
Get the fsspec mapper.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def get_mapper(path: Union[str, os.PathLike]):
"""Get the fsspec mapper.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
return fsspec.get_mapper(str(path))
get_basename(path)
¶
Get the basename of a file or a folder.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def get_basename(path: Union[str, os.PathLike]):
"""Get the basename of a file or a folder.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
path = str(path)
mapper = get_mapper(path)
clean_path = path.rstrip(mapper.fs.sep)
return str(clean_path).split(mapper.fs.sep)[-1]
copy_dir(source, destination, force=False, progress=False, leave_progress=True, file_progress=False, file_leave_progress=False, chunk_size=None)
¶
Copy one directory to another location across different filesystem (local, S3, GCS, etc).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
Union[str, pathlib.Path] |
Path to the source directory. |
required |
destination |
Union[str, pathlib.Path] |
Path to the destination directory. |
required |
chunk_size |
int |
the chunk size to use. If progress is enabled the chunk
size is |
None |
force |
bool |
whether to overwrite the destination directory if it exists. |
False |
progress |
bool |
Whether to display a progress bar. |
False |
leave_progress |
bool |
Whether to hide the progress bar once the copy is done. |
True |
file_progress |
bool |
Whether to display a progress bar for each file. |
False |
file_leave_progress |
bool |
Whether to hide the progress bar once a file copy is done. |
False |
chunk_size |
int |
See |
None |
Source code in datamol/utils/fs.py
def copy_dir(
source: Union[str, pathlib.Path],
destination: Union[str, pathlib.Path],
force: bool = False,
progress: bool = False,
leave_progress: bool = True,
file_progress: bool = False,
file_leave_progress: bool = False,
chunk_size: int = None,
):
"""Copy one directory to another location across different filesystem (local, S3, GCS, etc).
Args:
source: Path to the source directory.
destination: Path to the destination directory.
chunk_size: the chunk size to use. If progress is enabled the chunk
size is `None`, it is set to 2048.
force: whether to overwrite the destination directory if it exists.
progress: Whether to display a progress bar.
leave_progress: Whether to hide the progress bar once the copy is done.
file_progress: Whether to display a progress bar for each file.
file_leave_progress: Whether to hide the progress bar once a file copy is done.
chunk_size: See `dm.utils.fs.copy_file`.
"""
source = str(source)
destination = str(destination)
source_fs = get_mapper(source).fs
destination_fs = get_mapper(destination).fs
# Sanity check
if not is_dir(source):
raise ValueError(
f"The directory being copied does not exist or is not a directory: {source}"
)
if not force and is_dir(destination):
raise ValueError(f"The destination folder to copy already exists: {destination}")
# Get all input paths with details
# NOTE(hadim): we could have use `.glob(..., detail=True)` here but that API is inconsistent
# between the backends resulting in different object types being returned (dict, list, etc).
detailed_paths = source_fs.find(source, withdirs=True, detail=True)
detailed_paths = list(detailed_paths.values())
# Get list of input types
input_types = [d["type"] for d in detailed_paths]
# Get list of input path + add protocol if needed
input_paths = [d["name"] for d in detailed_paths]
input_paths = [fsspec.utils._unstrip_protocol(p, source_fs) for p in input_paths]
# Build all the output paths
output_paths: List[str] = fsspec.utils.other_paths(input_paths, destination) # type: ignore
def _copy_source_to_destination(input_path, input_type, output_path):
# A directory
if input_type == "directory":
destination_fs.mkdir(output_path)
# A file
else:
copy_file(
input_path,
output_path,
force=force,
progress=file_progress,
leave_progress=file_leave_progress,
chunk_size=chunk_size,
)
# Copy source files/directories to destination in parallel
parallelized(
_copy_source_to_destination,
inputs_list=list(zip(input_paths, input_types, output_paths)),
arg_type="args",
progress=progress,
tqdm_kwargs=dict(leave=leave_progress),
scheduler="threads",
)
mkdir(dir_path, exist_ok=False)
¶
Create a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path |
Union[str, os.PathLike] |
The path of the directory to create. |
required |
exist_ok |
bool |
Whether to ignore the error if the directory already exists. |
False |
Source code in datamol/utils/fs.py
def mkdir(dir_path: Union[str, os.PathLike], exist_ok: bool = False):
"""Create a directory.
Args:
dir_path: The path of the directory to create.
exist_ok: Whether to ignore the error if the directory
already exists.
"""
fs = get_mapper(str(dir_path)).fs
fs.mkdirs(str(dir_path), exist_ok=exist_ok)