datamol.utils
¶
Parallelization helpers¶
parallelized(fn, inputs_list, scheduler='processes', n_jobs=-1, progress=False, arg_type='arg', tqdm_kwargs=None, **job_kwargs)
¶
Run a function in parallel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
Callable |
The function to run in parallel. |
required |
inputs_list |
Iterable[Any] |
List of inputs to pass to |
required |
scheduler |
str |
Choose between ["processes", "threads"]. Defaults to None which uses the default joblib "loky" scheduler. |
'processes' |
n_jobs |
Optional[int] |
Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation |
-1 |
progress |
bool |
Display a progress bar. Defaults to False. |
False |
arg_type |
str |
One of ["arg", "args", "kwargs]:
- "arg": the input is passed as an argument: |
'arg' |
tqdm_kwargs |
dict |
Any additional arguments supported by the |
None |
job_kwargs |
|
Any additional arguments supported by |
{} |
Returns:
Type | Description |
---|---|
Optional[List[Any]] |
The results of the execution as a list. |
Source code in datamol/utils/jobs.py
def parallelized(
fn: Callable,
inputs_list: Iterable[Any],
scheduler: str = "processes",
n_jobs: Optional[int] = -1,
progress: bool = False,
arg_type: str = "arg",
tqdm_kwargs: dict = None,
**job_kwargs,
) -> Optional[List[Any]]:
"""Run a function in parallel.
Args:
fn: The function to run in parallel.
inputs_list: List of inputs to pass to `fn`.
scheduler: Choose between ["processes", "threads"]. Defaults
to None which uses the default joblib "loky" scheduler.
n_jobs: Number of process. Use 0 or None to force sequential.
Use -1 to use all the available processors. For details see
https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
progress: Display a progress bar. Defaults to False.
arg_type: One of ["arg", "args", "kwargs]:
- "arg": the input is passed as an argument: `fn(arg)` (default).
- "args": the input is passed as a list: `fn(*args)`.
- "kwargs": the input is passed as a map: `fn(**kwargs)`.
tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
job_kwargs: Any additional arguments supported by `joblib.Parallel`.
Returns:
The results of the execution as a list.
"""
runner = JobRunner(
n_jobs=n_jobs,
progress=progress,
prefer=scheduler,
tqdm_kwargs=tqdm_kwargs,
**job_kwargs,
)
return runner(fn, inputs_list, arg_type=arg_type)
JobRunner
¶
is_sequential
property
readonly
¶
Check whether the job is sequential or parallel
__call__(self, *args, **kwargs)
special
¶
Run job using the n_jobs attribute to determine regime
Source code in datamol/utils/jobs.py
def __call__(self, *args, **kwargs):
"""
Run job using the n_jobs attribute to determine regime
"""
if self.is_sequential:
return self.sequential(*args, **kwargs)
return self.parallel(*args, **kwargs)
__init__(self, n_jobs=-1, prefer=None, progress=False, tqdm_kwargs=None, **job_kwargs)
special
¶
JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which allows taking advantage of its features, while the progress bar use tqdm
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_jobs |
Optional[int] |
Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation |
-1 |
prefer |
str |
Choose from ['processes', 'threads'] or None. Default to None.
Soft hint to choose the default backend if no specific backend
was selected with the parallel_backend context manager. The
default process-based backend is 'loky' and the default
thread-based backend is 'threading'. Ignored if the |
None |
progress |
bool |
whether to display progress bar |
False |
tqdm_kwargs |
dict |
Any additional arguments supported by the |
None |
job_kwargs |
|
Any additional arguments supported by |
{} |
Examples:
import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
Source code in datamol/utils/jobs.py
def __init__(
self,
n_jobs: Optional[int] = -1,
prefer: str = None,
progress: bool = False,
tqdm_kwargs: dict = None,
**job_kwargs,
):
"""
JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
allows taking advantage of its features, while the progress bar use tqdm
Args:
n_jobs: Number of process. Use 0 or None to force sequential.
Use -1 to use all the available processors. For details see
https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
prefer: Choose from ['processes', 'threads'] or None. Default to None.
Soft hint to choose the default backend if no specific backend
was selected with the parallel_backend context manager. The
default process-based backend is 'loky' and the default
thread-based backend is 'threading'. Ignored if the ``backend``
parameter is specified.
progress: whether to display progress bar
tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
job_kwargs: Any additional arguments supported by `joblib.Parallel`.
Example:
```python
import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
```
"""
self.n_jobs = n_jobs
self.prefer = prefer
self.job_kwargs = job_kwargs
self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer)
self.no_progress = not progress
self.tqdm_kwargs = tqdm_kwargs or {}
get_iterator_length(data)
staticmethod
¶
Attempt to get the length of an iterator
Source code in datamol/utils/jobs.py
@staticmethod
def get_iterator_length(data):
"""Attempt to get the length of an iterator"""
total_length = None
try:
total_length = len(data)
except TypeError:
# most likely a generator, ignore
pass
return total_length
parallel(self, callable_fn, data, arg_type=None, **fn_kwargs)
¶
Run job in parallel
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callable_fn |
Callable |
function to call |
required |
data |
Iterable[Any] |
input data |
required |
arg_type |
Optional[str] |
function argument type ('arg'/None or 'args' or 'kwargs') |
None |
fn_kwargs |
dict |
optional keyword argument to pass to the callable funciton |
{} |
Source code in datamol/utils/jobs.py
def parallel(
self,
callable_fn: Callable,
data: Iterable[Any],
arg_type: Optional[str] = None,
**fn_kwargs,
):
r"""
Run job in parallel
Args:
callable_fn (callable): function to call
data (iterable): input data
arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
"""
runner = JobRunner._parallel_helper(**self.job_kwargs)
total_length = JobRunner.get_iterator_length(data)
results = runner(total=total_length, disable=self.no_progress)(
delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
)
return results
sequential(self, callable_fn, data, arg_type=None, **fn_kwargs)
¶
Run job in sequential version
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callable_fn |
Callable |
function to call |
required |
data |
Iterable[Any] |
input data |
required |
arg_type |
Optional[str] |
function argument type ('arg'/None or 'args' or 'kwargs') |
None |
fn_kwargs |
dict |
optional keyword argument to pass to the callable funciton |
{} |
Source code in datamol/utils/jobs.py
def sequential(
self,
callable_fn: Callable,
data: Iterable[Any],
arg_type: Optional[str] = None,
**fn_kwargs,
):
r"""
Run job in sequential version
Args:
callable_fn (callable): function to call
data (iterable): input data
arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
"""
total_length = JobRunner.get_iterator_length(data)
res = [
JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
for dt in tqdm(data, total=total_length, disable=self.no_progress, **self.tqdm_kwargs)
]
return res
wrap_fn(fn, arg_type=None, **fn_kwargs)
staticmethod
¶
Small wrapper around a callable to properly format it's argument
Source code in datamol/utils/jobs.py
@staticmethod
def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
"""Small wrapper around a callable to properly format it's argument"""
# EN probably use something like (moms.utils.commons.is_callable) ?
def _run(args: Any):
if arg_type == "kwargs":
fn_kwargs.update(**args)
return fn(**fn_kwargs)
elif arg_type == "args":
return fn(*args, **fn_kwargs)
return fn(args, **fn_kwargs)
return _run
Filesystem related utility functions¶
Build on top of fsspec
to manipulate and work with remote or local paths in a transparent manner.
The below functions are available under datamol.utils.fs
.
get_extension(path)
¶
Get the extension of a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def get_extension(path: Union[str, os.PathLike]):
"""Get the extension of a file.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
basename = get_basename(path)
return basename.split(".")[-1]
exists(path)
¶
Check whether a file or a directory exists.
Important: File-like object always exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def exists(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
"""Check whether a file or a directory exists.
Important: File-like object always exists.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
return is_file(path) or is_dir(path)
is_file(path)
¶
Check whether a file exists.
Important: File-like object always exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def is_file(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
"""Check whether a file exists.
Important: File-like object always exists.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
if isinstance(path, fsspec.core.OpenFile):
return path.fs.isfile(path.path)
elif isinstance(path, (str, pathlib.Path)):
mapper = get_mapper(str(path))
return mapper.fs.isfile(path)
else:
return True
is_dir(path)
¶
Check whether a file exists.
Important: File-like object always exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def is_dir(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
"""Check whether a file exists.
Important: File-like object always exists.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
if isinstance(path, fsspec.core.OpenFile):
return path.fs.isdir(path.path)
elif isinstance(path, (str, pathlib.Path)):
mapper = get_mapper(str(path))
return mapper.fs.isdir(path)
else:
return False
get_protocol(path)
¶
Return the name of the path protocol.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def get_protocol(path: Union[str, os.PathLike]):
"""Return the name of the path protocol.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
mapper = get_mapper(path)
protocol = mapper.fs.protocol
if "s3" in protocol:
return "s3"
elif "gs" in protocol:
return "gs"
else:
return protocol
is_local_path(path)
¶
Check whether a path is local.
Source code in datamol/utils/fs.py
def is_local_path(path: Union[str, os.PathLike]):
"""Check whether a path is local."""
return get_protocol(str(path)) == "file"
join(*paths)
¶
Join paths together. The first element determine the filesystem to use (and so the separator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
paths |
|
a list of paths supported by |
() |
Source code in datamol/utils/fs.py
def join(*paths):
"""Join paths together. The first element determine the
filesystem to use (and so the separator.
Args:
paths: a list of paths supported by `fsspec` such as local, s3, gcs, etc.
"""
paths = [str(path).rstrip("/") for path in paths]
source_path = paths[0]
fs = get_mapper(source_path).fs
full_path = fs.sep.join(paths)
return full_path
get_size(file)
¶
Get the size of a file given its path. Return None if the size can't be retrieved.
Source code in datamol/utils/fs.py
def get_size(file: Union[str, os.PathLike, io.IOBase, fsspec.core.OpenFile]) -> Optional[int]:
"""Get the size of a file given its path. Return None if the
size can't be retrieved.
"""
if isinstance(file, io.IOBase) and hasattr(file, "name"):
fs_local = fsspec.filesystem("file")
file_size = fs_local.size(getattr(file, "name"))
elif isinstance(file, (str, pathlib.Path)):
fs = get_mapper(str(file)).fs
file_size = fs.size(str(file))
elif isinstance(file, fsspec.core.OpenFile):
file_size = file.fs.size(file.path)
else:
file_size = None
return file_size
copy_file(source, destination, chunk_size=None, force=False, progress=False, leave_progress=True)
¶
Copy one file to another location across different filesystem (local, S3, GCS, etc).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile] |
path or file-like object to copy from. |
required |
destination |
Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile] |
path or file-like object to copy to. |
required |
chunk_size |
int |
the chunk size to use. If progress is enabled the chunk
size is |
None |
force |
bool |
whether to overwrite the destination file it it exists. |
False |
progress |
bool |
whether to display a progress bar. |
False |
leave_progress |
bool |
whether to hide the progress bar once the copy is done. |
True |
Source code in datamol/utils/fs.py
def copy_file(
source: Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile],
destination: Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile],
chunk_size: int = None,
force: bool = False,
progress: bool = False,
leave_progress: bool = True,
):
"""Copy one file to another location across different filesystem (local, S3, GCS, etc).
Args:
source: path or file-like object to copy from.
destination: path or file-like object to copy to.
chunk_size: the chunk size to use. If progress is enabled the chunk
size is `None`, it is set to 2048.
force: whether to overwrite the destination file it it exists.
progress: whether to display a progress bar.
leave_progress: whether to hide the progress bar once the copy is done.
"""
if progress and chunk_size is None:
chunk_size = 2048
if isinstance(source, (str, pathlib.Path)):
source_file = fsspec.open(str(source), "rb")
else:
source_file = source
if isinstance(destination, (str, pathlib.Path)):
# adapt the file mode of the destination depending on the source file.
destination_mode = "wb"
if hasattr(source_file, "mode"):
destination_mode = "wb" if "b" in getattr(source_file, "mode") else "w"
elif isinstance(source_file, io.BytesIO):
destination_mode = "wb"
elif isinstance(source_file, io.StringIO):
destination_mode = "w"
destination_file = fsspec.open(str(destination), destination_mode)
else:
destination_file = destination
if not is_file(source_file):
raise ValueError(f"The file being copied does not exist: {source}")
if not force and is_file(destination_file):
raise ValueError(f"The destination file to copy already exists: {destination}")
with source_file as source_stream:
with destination_file as destination_stream:
if chunk_size is None:
# copy without chunks
destination_stream.write(source_stream.read())
else:
# copy with chunks
# determine the size of the source file
source_size = None
if progress:
source_size = get_size(source)
pbar = None
if progress:
tqdm = _import_tqdm()
if tqdm is None:
raise ImportError(
"If the progress bar is enabled, you must have `tqdm` "
"installed: `conda install tqdm`."
)
else:
# init progress bar
pbar = tqdm(
total=source_size,
leave=leave_progress,
disable=not progress,
unit="B",
unit_divisor=1024,
unit_scale=True,
)
# start the loop
while True:
data = source_stream.read(chunk_size)
if not data:
break
destination_stream.write(data)
if pbar is not None:
pbar.update(chunk_size)
if pbar is not None:
pbar.close()
md5(filepath)
¶
Return the md5 hash of a file.
NOTE(hadim): Use fsspec caching here maybe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filepath |
Union[str, os.PathLike] |
The path to the file to compute the MD5 hash on. |
required |
Source code in datamol/utils/fs.py
def md5(filepath: Union[str, os.PathLike]):
"""Return the md5 hash of a file.
NOTE(hadim): Use fsspec caching here maybe.
Args:
filepath: The path to the file to compute the MD5 hash on.
"""
with fsspec.open(filepath) as f:
file_hash = hashlib.md5()
file_hash.update(f.read())
file_hash = file_hash.hexdigest()
return file_hash
glob(path, **kwargs)
¶
Find files by glob-matching.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
A glob-style path. |
required |
Source code in datamol/utils/fs.py
def glob(path: str, **kwargs) -> List[str]:
"""Find files by glob-matching.
Args:
path: A glob-style path.
"""
# Get the list of paths
fs = get_mapper(path).fs
data_paths = fs.glob(path, **kwargs)
# Append path prefix if needed
if fs.protocol not in ["file", "https", "http"]:
protocol = fs.protocol
if isinstance(protocol, tuple):
protocol = fs.protocol[0]
data_paths = [f"{protocol}://{d}" for d in data_paths]
return data_paths
get_cache_dir(app_name, suffix=None, create=True)
¶
Get a local cache directory for a given application name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
app_name |
str |
The name of the application. |
required |
suffix |
str |
A subdirectory appended to the cache dir. |
None |
create |
bool |
Whether to create the directory and its parents if it does not already exist. |
True |
Source code in datamol/utils/fs.py
def get_cache_dir(app_name: str, suffix: str = None, create: bool = True):
"""Get a local cache directory for a given application name.
Args:
app_name: The name of the application.
suffix: A subdirectory appended to the cache dir.
create: Whether to create the directory and its parents if it does not
already exist.
"""
appdirs = _import_appdirs()
if appdirs is None:
raise ImportError(
"To use `dm.utils.fs.get_cache_dir()`, you must have `appdirs` "
"installed: `conda install appdirs`."
)
cache_dir = pathlib.Path(appdirs.user_cache_dir(appname=app_name))
if suffix is not None:
cache_dir /= suffix
if create:
cache_dir.mkdir(exist_ok=True, parents=True)
return cache_dir
get_mapper(path)
¶
Get the fsspec mapper.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def get_mapper(path: Union[str, os.PathLike]):
"""Get the fsspec mapper.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
return fsspec.get_mapper(str(path))
get_basename(path)
¶
Get the basename of a file or a folder.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, os.PathLike] |
a path supported by |
required |
Source code in datamol/utils/fs.py
def get_basename(path: Union[str, os.PathLike]):
"""Get the basename of a file or a folder.
Args:
path: a path supported by `fsspec` such as local, s3, gcs, etc.
"""
path = str(path)
mapper = get_mapper(path)
clean_path = path.rstrip(mapper.fs.sep)
return str(clean_path).split(mapper.fs.sep)[-1]