Skip to content

datamol.utils

Parallelization helpers

parallelized(fn, inputs_list, scheduler='processes', n_jobs=-1, batch_size=None, progress=False, arg_type='arg', tqdm_kwargs=None, **job_kwargs)

Run a function in parallel.

Parameters:

Name Type Description Default
fn Callable

The function to run in parallel.

required
inputs_list Iterable[Any]

List of inputs to pass to fn.

required
scheduler str

Choose between ["processes", "threads"]. Defaults to None which uses the default joblib "loky" scheduler.

'processes'
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
batch_size int

Whether to automatically batch inputs_list. You should only use it when the length of inputs_list is very large (>100k elements). The length of inputs_list must also be defined.

None
progress bool

Display a progress bar. Defaults to False.

False
arg_type str

One of ["arg", "args", "kwargs]: - "arg": the input is passed as an argument: fn(arg) (default). - "args": the input is passed as a list: fn(*args). - "kwargs": the input is passed as a map: fn(**kwargs).

'arg'
tqdm_kwargs dict

Any additional arguments supported by the tqdm progress bar.

None
job_kwargs

Any additional arguments supported by joblib.Parallel.

{}

Returns:

Type Description
Sequence[Optional[Any]]

The results of the execution as a list.

Source code in datamol/utils/jobs.py
def parallelized(
    fn: Callable,
    inputs_list: Iterable[Any],
    scheduler: str = "processes",
    n_jobs: Optional[int] = -1,
    batch_size: int = None,
    progress: bool = False,
    arg_type: str = "arg",
    tqdm_kwargs: dict = None,
    **job_kwargs,
) -> Sequence[Optional[Any]]:
    """Run a function in parallel.

    Args:
        fn: The function to run in parallel.
        inputs_list: List of inputs to pass to `fn`.
        scheduler: Choose between ["processes", "threads"]. Defaults
            to None which uses the default joblib "loky" scheduler.
        n_jobs: Number of process. Use 0 or None to force sequential.
                Use -1 to use all the available processors. For details see
                https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        batch_size: Whether to automatically batch `inputs_list`. You should only use it when the length
            of `inputs_list` is very large (>100k elements). The length of `inputs_list` must also be
            defined.
        progress: Display a progress bar. Defaults to False.
        arg_type: One of ["arg", "args", "kwargs]:
            - "arg": the input is passed as an argument: `fn(arg)` (default).
            - "args": the input is passed as a list: `fn(*args)`.
            - "kwargs": the input is passed as a map: `fn(**kwargs)`.
        tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
        job_kwargs: Any additional arguments supported by `joblib.Parallel`.

    Returns:
        The results of the execution as a list.
    """

    runner = JobRunner(
        n_jobs=n_jobs,
        batch_size=batch_size,
        progress=progress,
        prefer=scheduler,
        tqdm_kwargs=tqdm_kwargs,
        **job_kwargs,
    )
    return runner(fn, inputs_list, arg_type=arg_type)

JobRunner

is_sequential property readonly

Check whether the job is sequential or parallel

__call__(self, *args, **kwargs) special

Run job using the n_jobs attribute to determine regime

Source code in datamol/utils/jobs.py
def __call__(self, *args, **kwargs):
    """
    Run job using the n_jobs attribute to determine regime
    """
    if self.is_sequential:
        return self.sequential(*args, **kwargs)
    return self.parallel(*args, **kwargs)
__init__(self, n_jobs=-1, batch_size=None, prefer=None, progress=False, tqdm_kwargs=None, **job_kwargs) special

JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which allows taking advantage of its features, while the progress bar use tqdm

Parameters:

Name Type Description Default
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
batch_size int

Whether to batch inputs_list. You can specify batch_size when the length of inputs_list is very large (>100k elements). By default, the auto batching of joblib is used.

None
prefer str

Choose from ['processes', 'threads'] or None. Default to None. Soft hint to choose the default backend if no specific backend was selected with the parallel_backend context manager. The default process-based backend is 'loky' and the default thread-based backend is 'threading'. Ignored if the backend parameter is specified.

None
progress bool

whether to display progress bar

False
tqdm_kwargs dict

Any additional arguments supported by the tqdm progress bar.

None
job_kwargs

Any additional arguments supported by joblib.Parallel.

{}

Examples:

import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
Source code in datamol/utils/jobs.py
def __init__(
    self,
    n_jobs: Optional[int] = -1,
    batch_size: int = None,
    prefer: str = None,
    progress: bool = False,
    tqdm_kwargs: dict = None,
    **job_kwargs,
):
    """
    JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
    allows taking advantage of its features, while the progress bar use tqdm

    Args:
        n_jobs: Number of process. Use 0 or None to force sequential.
            Use -1 to use all the available processors. For details see
            https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        batch_size: Whether to  batch `inputs_list`. You can specify batch_size when the length
            of `inputs_list` is very large (>100k elements). By default, the auto batching of joblib is used.
        prefer: Choose from ['processes', 'threads'] or None. Default to None.
            Soft hint to choose the default backend if no specific backend
            was selected with the parallel_backend context manager. The
            default process-based backend is 'loky' and the default
            thread-based backend is 'threading'. Ignored if the ``backend``
            parameter is specified.
        progress: whether to display progress bar
        tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
        job_kwargs: Any additional arguments supported by `joblib.Parallel`.

    Example:

    ```python
    import datamol as dm
    runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
    results = runner(lambda x: x**2, [1, 2, 3, 4])
    ```
    """

    self.n_jobs = n_jobs
    self.batch_size = batch_size or "auto"
    self.prefer = prefer
    self.job_kwargs = job_kwargs
    self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer, batch_size=self.batch_size)
    self.no_progress = not progress
    self.tqdm_kwargs = tqdm_kwargs or {}
get_iterator_length(data) staticmethod

Attempt to get the length of an iterator

Source code in datamol/utils/jobs.py
@staticmethod
def get_iterator_length(data):
    """Attempt to get the length of an iterator"""
    total_length = None
    try:
        total_length = len(data)
    except TypeError:
        # most likely a generator, ignore
        pass
    return total_length
parallel(self, callable_fn, data, arg_type=None, **fn_kwargs)

Run job in parallel

Parameters:

Name Type Description Default
callable_fn Callable

function to call

required
data Iterable[Any]

input data

required
arg_type Optional[str]

function argument type ('arg'/None or 'args' or 'kwargs')

None
fn_kwargs dict

optional keyword argument to pass to the callable funciton

{}
Source code in datamol/utils/jobs.py
def parallel(
    self,
    callable_fn: Callable,
    data: Iterable[Any],
    arg_type: Optional[str] = None,
    **fn_kwargs,
):
    """
    Run job in parallel

    Args:
        callable_fn (callable): function to call
        data (iterable): input data
        arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
        fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
    """

    total_length = JobRunner.get_iterator_length(data)

    runner = JobRunner._parallel_helper(**self.job_kwargs)
    results = runner(total=total_length, disable=self.no_progress, **self.tqdm_kwargs)(
        delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
    )

    return results
sequential(self, callable_fn, data, arg_type=None, **fn_kwargs)

Run job in sequential version

Parameters:

Name Type Description Default
callable_fn Callable

function to call

required
data Iterable[Any]

input data

required
arg_type Optional[str]

function argument type ('arg'/None or 'args' or 'kwargs')

None
fn_kwargs dict

optional keyword argument to pass to the callable funciton

{}
Source code in datamol/utils/jobs.py
def sequential(
    self,
    callable_fn: Callable,
    data: Iterable[Any],
    arg_type: Optional[str] = None,
    **fn_kwargs,
):
    """
    Run job in sequential version

    Args:
        callable_fn (callable): function to call
        data (iterable): input data
        arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
        fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
    """
    total_length = JobRunner.get_iterator_length(data)

    results = [
        JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
        for dt in tqdm(data, total=total_length, disable=self.no_progress, **self.tqdm_kwargs)
    ]
    return results
wrap_fn(fn, arg_type=None, **fn_kwargs) staticmethod

Small wrapper around a callable to properly format it's argument

Source code in datamol/utils/jobs.py
@staticmethod
def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
    """Small wrapper around a callable to properly format it's argument"""

    def _run(args: Any):
        if arg_type == "kwargs":
            fn_kwargs.update(**args)
            return fn(**fn_kwargs)
        elif arg_type == "args":
            return fn(*args, **fn_kwargs)
        return fn(args, **fn_kwargs)

    return _run

Build on top of fsspec to manipulate and work with remote or local paths in a transparent manner.

The below functions are available under datamol.utils.fs.

get_extension(path)

Get the extension of a file.

Parameters:

Name Type Description Default
path Union[str, os.PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
def get_extension(path: Union[str, os.PathLike]):
    """Get the extension of a file.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    basename = get_basename(path)
    return basename.split(".")[-1]

exists(path)

Check whether a file or a directory exists.

Important: File-like object always exists.

Parameters:

Name Type Description Default
path Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
def exists(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
    """Check whether a file or a directory exists.

    Important: File-like object always exists.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    return is_file(path) or is_dir(path)

is_file(path)

Check whether a file exists.

Important: File-like object always exists.

Parameters:

Name Type Description Default
path Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
def is_file(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
    """Check whether a file exists.

    Important: File-like object always exists.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    if isinstance(path, fsspec.core.OpenFile):
        return path.fs.isfile(path.path)

    elif isinstance(path, (str, pathlib.Path)):
        mapper = get_mapper(str(path))
        return mapper.fs.isfile(path)

    else:
        return True

is_dir(path)

Check whether a file exists.

Important: File-like object always exists.

Parameters:

Name Type Description Default
path Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
def is_dir(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
    """Check whether a file exists.

    Important: File-like object always exists.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    if isinstance(path, fsspec.core.OpenFile):
        return path.fs.isdir(path.path)

    elif isinstance(path, (str, pathlib.Path)):
        mapper = get_mapper(str(path))
        return mapper.fs.isdir(path)

    else:
        return False

get_protocol(path)

Return the name of the path protocol.

Parameters:

Name Type Description Default
path Union[str, os.PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
def get_protocol(path: Union[str, os.PathLike]):
    """Return the name of the path protocol.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """

    mapper = get_mapper(path)
    protocol = mapper.fs.protocol

    if "s3" in protocol:
        return "s3"
    elif "gs" in protocol:
        return "gs"
    elif isinstance(protocol, (tuple, list)):
        return protocol[0]
    return protocol

is_local_path(path)

Check whether a path is local.

Source code in datamol/utils/fs.py
def is_local_path(path: Union[str, os.PathLike]):
    """Check whether a path is local."""
    return get_protocol(str(path)) == "file"

join(*paths)

Join paths together. The first element determine the filesystem to use (and so the separator.

Parameters:

Name Type Description Default
paths

a list of paths supported by fsspec such as local, s3, gcs, etc.

()
Source code in datamol/utils/fs.py
def join(*paths):
    """Join paths together. The first element determine the
    filesystem to use (and so the separator.

    Args:
        paths: a list of paths supported by `fsspec` such as local, s3, gcs, etc.
    """
    paths = [str(path).rstrip("/") for path in paths]
    source_path = paths[0]
    fs = get_mapper(source_path).fs
    full_path = fs.sep.join(paths)
    return full_path

get_size(file)

Get the size of a file given its path. Return None if the size can't be retrieved.

Source code in datamol/utils/fs.py
def get_size(file: Union[str, os.PathLike, io.IOBase, fsspec.core.OpenFile]) -> Optional[int]:
    """Get the size of a file given its path. Return None if the
    size can't be retrieved.
    """

    if isinstance(file, io.IOBase) and hasattr(file, "name"):
        fs_local = fsspec.filesystem("file")
        file_size = fs_local.size(getattr(file, "name"))

    elif isinstance(file, (str, pathlib.Path)):
        fs = get_mapper(str(file)).fs
        file_size = fs.size(str(file))

    elif isinstance(file, fsspec.core.OpenFile):
        file_size = file.fs.size(file.path)

    else:
        file_size = None

    return file_size

copy_file(source, destination, chunk_size=None, force=False, progress=False, leave_progress=True)

Copy one file to another location across different filesystem (local, S3, GCS, etc).

Parameters:

Name Type Description Default
source Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile]

path or file-like object to copy from.

required
destination Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile]

path or file-like object to copy to.

required
chunk_size int

the chunk size to use. If progress is enabled the chunk size is None, it is set to 2048.

None
force bool

whether to overwrite the destination file it it exists.

False
progress bool

whether to display a progress bar.

False
leave_progress bool

whether to hide the progress bar once the copy is done.

True
Source code in datamol/utils/fs.py
def copy_file(
    source: Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile],
    destination: Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile],
    chunk_size: int = None,
    force: bool = False,
    progress: bool = False,
    leave_progress: bool = True,
):
    """Copy one file to another location across different filesystem (local, S3, GCS, etc).

    Args:
        source: path or file-like object to copy from.
        destination: path or file-like object to copy to.
        chunk_size: the chunk size to use. If progress is enabled the chunk
            size is `None`, it is set to 2048.
        force: whether to overwrite the destination file it it exists.
        progress: whether to display a progress bar.
        leave_progress: whether to hide the progress bar once the copy is done.
    """

    if progress and chunk_size is None:
        chunk_size = 2048

    if isinstance(source, (str, pathlib.Path)):
        source_file = fsspec.open(str(source), "rb")
    else:
        source_file = source

    if isinstance(destination, (str, pathlib.Path)):

        # adapt the file mode of the destination depending on the source file.
        destination_mode = "wb"
        if hasattr(source_file, "mode"):
            destination_mode = "wb" if "b" in getattr(source_file, "mode") else "w"
        elif isinstance(source_file, io.BytesIO):
            destination_mode = "wb"
        elif isinstance(source_file, io.StringIO):
            destination_mode = "w"

        destination_file = fsspec.open(str(destination), destination_mode)
    else:
        destination_file = destination

    if not is_file(source_file):
        raise ValueError(f"The file being copied does not exist: {source}")

    if not force and is_file(destination_file):
        raise ValueError(f"The destination file to copy already exists: {destination}")

    with source_file as source_stream:
        with destination_file as destination_stream:

            if chunk_size is None:
                # copy without chunks
                destination_stream.write(source_stream.read())

            else:
                # copy with chunks

                # determine the size of the source file
                source_size = None
                if progress:
                    source_size = get_size(source)

                pbar = None
                if progress:
                    tqdm = _import_tqdm()

                    if tqdm is None:
                        raise ImportError(
                            "If the progress bar is enabled, you must have `tqdm` "
                            "installed: `conda install tqdm`."
                        )
                    else:
                        # init progress bar
                        pbar = tqdm(
                            total=source_size,
                            leave=leave_progress,
                            disable=not progress,
                            unit="B",
                            unit_divisor=1024,
                            unit_scale=True,
                        )

                # start the loop
                while True:
                    data = source_stream.read(chunk_size)
                    if not data:
                        break
                    destination_stream.write(data)

                    if pbar is not None:
                        pbar.update(chunk_size)

                if pbar is not None:
                    pbar.close()

md5(filepath)

Return the md5 hash of a file.

NOTE(hadim): Use fsspec caching here maybe.

Parameters:

Name Type Description Default
filepath Union[str, os.PathLike]

The path to the file to compute the MD5 hash on.

required
Source code in datamol/utils/fs.py
def md5(filepath: Union[str, os.PathLike]):
    """Return the md5 hash of a file.

    NOTE(hadim): Use fsspec caching here maybe.

    Args:
        filepath: The path to the file to compute the MD5 hash on.
    """
    with fsspec.open(filepath) as f:
        file_hash = hashlib.md5()
        file_hash.update(f.read())
        file_hash = file_hash.hexdigest()
    return file_hash

glob(path, **kwargs)

Find files by glob-matching.

Parameters:

Name Type Description Default
path str

A glob-style path.

required
Source code in datamol/utils/fs.py
def glob(path: str, **kwargs) -> List[str]:
    """Find files by glob-matching.

    Args:
        path: A glob-style path.
    """
    # Get the list of paths
    fs = get_mapper(path).fs
    data_paths = fs.glob(path, **kwargs)
    protocol = get_protocol(path)
    # Append path prefix if needed
    if protocol not in ["file", "https", "http"]:
        data_paths = [f"{protocol}://{d}" for d in data_paths]

    return data_paths

get_cache_dir(app_name, suffix=None, create=True)

Get a local cache directory for a given application name.

Parameters:

Name Type Description Default
app_name str

The name of the application.

required
suffix str

A subdirectory appended to the cache dir.

None
create bool

Whether to create the directory and its parents if it does not already exist.

True
Source code in datamol/utils/fs.py
def get_cache_dir(app_name: str, suffix: str = None, create: bool = True):
    """Get a local cache directory for a given application name.

    Args:
        app_name: The name of the application.
        suffix: A subdirectory appended to the cache dir.
        create: Whether to create the directory and its parents if it does not
            already exist.
    """

    appdirs = _import_appdirs()

    if appdirs is None:
        raise ImportError(
            "To use `dm.utils.fs.get_cache_dir()`, you must have `appdirs` "
            "installed: `conda install appdirs`."
        )
    cache_dir = pathlib.Path(appdirs.user_cache_dir(appname=app_name))

    if suffix is not None:
        cache_dir /= suffix

    if create:
        cache_dir.mkdir(exist_ok=True, parents=True)

    return cache_dir

get_mapper(path)

Get the fsspec mapper.

Parameters:

Name Type Description Default
path Union[str, os.PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
def get_mapper(path: Union[str, os.PathLike]):
    """Get the fsspec mapper.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    return fsspec.get_mapper(str(path))

get_basename(path)

Get the basename of a file or a folder.

Parameters:

Name Type Description Default
path Union[str, os.PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
def get_basename(path: Union[str, os.PathLike]):
    """Get the basename of a file or a folder.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    path = str(path)
    mapper = get_mapper(path)
    clean_path = path.rstrip(mapper.fs.sep)
    return str(clean_path).split(mapper.fs.sep)[-1]