Skip to content

datamol.utils

All the below functions are accessible under datamol.utils.

datamol.utils.jobs.JobRunner

is_sequential property readonly

Check whether the job is sequential or parallel

__call__(self, *args, **kwargs) special

Run job using the n_jobs attribute to determine regime

Source code in datamol/utils/jobs.py
def __call__(self, *args, **kwargs):
    """
    Run job using the n_jobs attribute to determine regime
    """
    if self.is_sequential:
        return self.sequential(*args, **kwargs)
    return self.parallel(*args, **kwargs)

__init__(self, n_jobs=-1, prefer=None, progress=False, **job_kwargs) special

JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which allows taking advantage of its features, while the progress bar use tqdm

Parameters:

Name Type Description Default
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
prefer str

Choose from ['processes', 'threads'] or None. Default to None. Soft hint to choose the default backend if no specific backend was selected with the parallel_backend context manager. The default process-based backend is 'loky' and the default thread-based backend is 'threading'. Ignored if the backend parameter is specified.

None
progress bool

whether to display progress bar

False
job_kwargs

Any additional keyword argument supported by joblib.Parallel.

{}

Examples:

import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
Source code in datamol/utils/jobs.py
def __init__(
    self,
    n_jobs: Optional[int] = -1,
    prefer: str = None,
    progress: bool = False,
    **job_kwargs,
):
    """
    JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
    allows taking advantage of its features, while the progress bar use tqdm

    Args:
        n_jobs: Number of process. Use 0 or None to force sequential.
            Use -1 to use all the available processors. For details see
            https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        prefer: Choose from ['processes', 'threads'] or None. Default to None.
            Soft hint to choose the default backend if no specific backend
            was selected with the parallel_backend context manager. The
            default process-based backend is 'loky' and the default
            thread-based backend is 'threading'. Ignored if the ``backend``
            parameter is specified.
        progress: whether to display progress bar
        job_kwargs: Any additional keyword argument supported by joblib.Parallel.

    Example:

    ```python
    import datamol as dm
    runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
    results = runner(lambda x: x**2, [1, 2, 3, 4])
    ```
    """
    self.n_jobs = n_jobs
    self.prefer = prefer
    self.job_kwargs = job_kwargs
    self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer)
    self.no_progress = not progress

get_iterator_length(data) staticmethod

Attempt to get the length of an iterator

Source code in datamol/utils/jobs.py
@staticmethod
def get_iterator_length(data):
    """Attempt to get the length of an iterator"""
    total_length = None
    try:
        total_length = len(data)
    except TypeError:
        # most likely a generator, ignore
        pass
    return total_length

parallel(self, callable_fn, data, arg_type=None, **fn_kwargs)

Run job in parallel

Parameters:

Name Type Description Default
callable_fn Callable

function to call

required
data Iterable[Any]

input data

required
arg_type Optional[str]

function argument type ('arg'/None or 'args' or 'kwargs')

None
fn_kwargs dict

optional keyword argument to pass to the callable funciton

{}
Source code in datamol/utils/jobs.py
def parallel(
    self,
    callable_fn: Callable,
    data: Iterable[Any],
    arg_type: Optional[str] = None,
    **fn_kwargs,
):
    r"""
    Run job in parallel

    Args:
        callable_fn (callable): function to call
        data (iterable): input data
        arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
        fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
    """
    runner = JobRunner._parallel_helper(**self.job_kwargs)
    total_length = JobRunner.get_iterator_length(data)
    results = runner(total=total_length, disable=self.no_progress)(
        delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
    )
    return results

sequential(self, callable_fn, data, arg_type=None, **fn_kwargs)

Run job in sequential version

Parameters:

Name Type Description Default
callable_fn Callable

function to call

required
data Iterable[Any]

input data

required
arg_type Optional[str]

function argument type ('arg'/None or 'args' or 'kwargs')

None
fn_kwargs dict

optional keyword argument to pass to the callable funciton

{}
Source code in datamol/utils/jobs.py
def sequential(
    self,
    callable_fn: Callable,
    data: Iterable[Any],
    arg_type: Optional[str] = None,
    **fn_kwargs,
):
    r"""
    Run job in sequential version

    Args:
        callable_fn (callable): function to call
        data (iterable): input data
        arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
        fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
    """
    total_length = JobRunner.get_iterator_length(data)
    res = [
        JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
        for dt in tqdm(data, total=total_length, disable=self.no_progress)
    ]
    return res

wrap_fn(fn, arg_type=None, **fn_kwargs) staticmethod

Small wrapper around a callable to properly format it's argument

Source code in datamol/utils/jobs.py
@staticmethod
def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
    """Small wrapper around a callable to properly format it's argument"""
    # EN probably use something like (moms.utils.commons.is_callable) ?
    def _run(args: Any):
        if arg_type == "kwargs":
            fn_kwargs.update(**args)
            return fn(**fn_kwargs)
        elif arg_type == "args":
            return fn(*args, **fn_kwargs)
        return fn(args, **fn_kwargs)

    return _run

datamol.utils.decorators.decorators

disable_on_os(os_names)

A decorator to disable a function raising an error if the OS detected is not supported.

Parameters:

Name Type Description Default
os_names Union[str, List[str]]

OS names to disable this function. Valid OS names are: ["linux", "osx", "win"].

required
Source code in datamol/utils/decorators.py
def disable_on_os(os_names: Union[str, List[str]]):
    """A decorator to disable a function raising an error if the OS detected is not supported.

    Args:
        os_names: OS names to disable this function. Valid OS names are: `["linux", "osx", "win"]`.
    """

    if isinstance(os_names, str):
        os_names = [os_names]

    valid_os_names = []
    for os_name in os_names:
        if os_name == "linux":
            valid_os_names.append("Linux")
        elif os_name == "win":
            valid_os_names.append("Windows")
        elif os_name == "osx":
            valid_os_names.append("Darwin")
        else:
            valid_os_names.append(os_name)

    def real_decorator(function: Callable):
        @wraps(function)
        def wrapper(*args, **kwargs):

            if platform.system() not in valid_os_names:
                retval = function(*args, **kwargs)
                return retval
            else:
                raise NotImplementedError(
                    f"The function {function.__name__} is not supported"
                    f" for the platform '{platform.system()}'."
                )

        return wrapper

    return real_decorator

datamol.utils.fs.fs

copy_file(source, destination)

Copy a file to another path using fsspec.

Parameters:

Name Type Description Default
source Union[str, os.PathLike]

Path to a file to copy from (remote or local).

required
destination Union[str, os.PathLike]

Path to a file to copy to (remote or local).

required
Source code in datamol/utils/fs.py
def copy_file(
    source: Union[str, os.PathLike],
    destination: Union[str, os.PathLike],
):
    """Copy a file to another path using fsspec.

    Args:
        source: Path to a file to copy from (remote or local).
        destination: Path to a file to copy to (remote or local).
    """
    with fsspec.open(destination, "wb") as f_dest:
        with fsspec.open(source, "rb") as f_source:
            f_dest.write(f_source.read())

datamol.utils.jobs.jobs

JobRunner

is_sequential property readonly

Check whether the job is sequential or parallel

__call__(self, *args, **kwargs) special

Run job using the n_jobs attribute to determine regime

Source code in datamol/utils/jobs.py
def __call__(self, *args, **kwargs):
    """
    Run job using the n_jobs attribute to determine regime
    """
    if self.is_sequential:
        return self.sequential(*args, **kwargs)
    return self.parallel(*args, **kwargs)
__init__(self, n_jobs=-1, prefer=None, progress=False, **job_kwargs) special

JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which allows taking advantage of its features, while the progress bar use tqdm

Parameters:

Name Type Description Default
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
prefer str

Choose from ['processes', 'threads'] or None. Default to None. Soft hint to choose the default backend if no specific backend was selected with the parallel_backend context manager. The default process-based backend is 'loky' and the default thread-based backend is 'threading'. Ignored if the backend parameter is specified.

None
progress bool

whether to display progress bar

False
job_kwargs

Any additional keyword argument supported by joblib.Parallel.

{}

Examples:

import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
Source code in datamol/utils/jobs.py
def __init__(
    self,
    n_jobs: Optional[int] = -1,
    prefer: str = None,
    progress: bool = False,
    **job_kwargs,
):
    """
    JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
    allows taking advantage of its features, while the progress bar use tqdm

    Args:
        n_jobs: Number of process. Use 0 or None to force sequential.
            Use -1 to use all the available processors. For details see
            https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        prefer: Choose from ['processes', 'threads'] or None. Default to None.
            Soft hint to choose the default backend if no specific backend
            was selected with the parallel_backend context manager. The
            default process-based backend is 'loky' and the default
            thread-based backend is 'threading'. Ignored if the ``backend``
            parameter is specified.
        progress: whether to display progress bar
        job_kwargs: Any additional keyword argument supported by joblib.Parallel.

    Example:

    ```python
    import datamol as dm
    runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
    results = runner(lambda x: x**2, [1, 2, 3, 4])
    ```
    """
    self.n_jobs = n_jobs
    self.prefer = prefer
    self.job_kwargs = job_kwargs
    self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer)
    self.no_progress = not progress
get_iterator_length(data) staticmethod

Attempt to get the length of an iterator

Source code in datamol/utils/jobs.py
@staticmethod
def get_iterator_length(data):
    """Attempt to get the length of an iterator"""
    total_length = None
    try:
        total_length = len(data)
    except TypeError:
        # most likely a generator, ignore
        pass
    return total_length
parallel(self, callable_fn, data, arg_type=None, **fn_kwargs)

Run job in parallel

Parameters:

Name Type Description Default
callable_fn Callable

function to call

required
data Iterable[Any]

input data

required
arg_type Optional[str]

function argument type ('arg'/None or 'args' or 'kwargs')

None
fn_kwargs dict

optional keyword argument to pass to the callable funciton

{}
Source code in datamol/utils/jobs.py
def parallel(
    self,
    callable_fn: Callable,
    data: Iterable[Any],
    arg_type: Optional[str] = None,
    **fn_kwargs,
):
    r"""
    Run job in parallel

    Args:
        callable_fn (callable): function to call
        data (iterable): input data
        arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
        fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
    """
    runner = JobRunner._parallel_helper(**self.job_kwargs)
    total_length = JobRunner.get_iterator_length(data)
    results = runner(total=total_length, disable=self.no_progress)(
        delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
    )
    return results
sequential(self, callable_fn, data, arg_type=None, **fn_kwargs)

Run job in sequential version

Parameters:

Name Type Description Default
callable_fn Callable

function to call

required
data Iterable[Any]

input data

required
arg_type Optional[str]

function argument type ('arg'/None or 'args' or 'kwargs')

None
fn_kwargs dict

optional keyword argument to pass to the callable funciton

{}
Source code in datamol/utils/jobs.py
def sequential(
    self,
    callable_fn: Callable,
    data: Iterable[Any],
    arg_type: Optional[str] = None,
    **fn_kwargs,
):
    r"""
    Run job in sequential version

    Args:
        callable_fn (callable): function to call
        data (iterable): input data
        arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
        fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
    """
    total_length = JobRunner.get_iterator_length(data)
    res = [
        JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
        for dt in tqdm(data, total=total_length, disable=self.no_progress)
    ]
    return res
wrap_fn(fn, arg_type=None, **fn_kwargs) staticmethod

Small wrapper around a callable to properly format it's argument

Source code in datamol/utils/jobs.py
@staticmethod
def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
    """Small wrapper around a callable to properly format it's argument"""
    # EN probably use something like (moms.utils.commons.is_callable) ?
    def _run(args: Any):
        if arg_type == "kwargs":
            fn_kwargs.update(**args)
            return fn(**fn_kwargs)
        elif arg_type == "args":
            return fn(*args, **fn_kwargs)
        return fn(args, **fn_kwargs)

    return _run

parallelized(fn, inputs_list, scheduler='processes', n_jobs=-1, progress=False, arg_type='arg')

Run a function in parallel.

Parameters:

Name Type Description Default
fn Callable

The function to run in parallel.

required
inputs_list Iterable[Any]

List of inputs to pass to fn.

required
scheduler str

Choose between ["processes", "threads"]. Defaults to None which uses the default joblib "loky" scheduler.

'processes'
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
progress bool

Display a progress bar. Defaults to False.

False
arg_type str

One of ["arg", "args", "kwargs]: - "arg": the input is passed as an argument: fn(arg) (default). - "args": the input is passed as a list: fn(*args). - "kwargs": the input is passed as a map: fn(**kwargs).

'arg'

Returns:

Type Description
Optional[List[Any]]

The results of the execution as a list.

Source code in datamol/utils/jobs.py
def parallelized(
    fn: Callable,
    inputs_list: Iterable[Any],
    scheduler: str = "processes",
    n_jobs: Optional[int] = -1,
    progress: bool = False,
    arg_type: str = "arg",
) -> Optional[List[Any]]:
    """Run a function in parallel.

    Args:
        fn: The function to run in parallel.
        inputs_list: List of inputs to pass to `fn`.
        scheduler: Choose between ["processes", "threads"]. Defaults
            to None which uses the default joblib "loky" scheduler.
        n_jobs: Number of process. Use 0 or None to force sequential.
                Use -1 to use all the available processors. For details see
                https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        progress: Display a progress bar. Defaults to False.
        arg_type: One of ["arg", "args", "kwargs]:
            - "arg": the input is passed as an argument: `fn(arg)` (default).
            - "args": the input is passed as a list: `fn(*args)`.
            - "kwargs": the input is passed as a map: `fn(**kwargs)`.

    Returns:
        The results of the execution as a list.
    """

    runner = JobRunner(n_jobs=n_jobs, progress=progress, prefer=scheduler)
    return runner(fn, inputs_list, arg_type=arg_type)

datamol.utils.jobs.parallelized(fn, inputs_list, scheduler='processes', n_jobs=-1, progress=False, arg_type='arg')

Run a function in parallel.

Parameters:

Name Type Description Default
fn Callable

The function to run in parallel.

required
inputs_list Iterable[Any]

List of inputs to pass to fn.

required
scheduler str

Choose between ["processes", "threads"]. Defaults to None which uses the default joblib "loky" scheduler.

'processes'
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
progress bool

Display a progress bar. Defaults to False.

False
arg_type str

One of ["arg", "args", "kwargs]: - "arg": the input is passed as an argument: fn(arg) (default). - "args": the input is passed as a list: fn(*args). - "kwargs": the input is passed as a map: fn(**kwargs).

'arg'

Returns:

Type Description
Optional[List[Any]]

The results of the execution as a list.

Source code in datamol/utils/jobs.py
def parallelized(
    fn: Callable,
    inputs_list: Iterable[Any],
    scheduler: str = "processes",
    n_jobs: Optional[int] = -1,
    progress: bool = False,
    arg_type: str = "arg",
) -> Optional[List[Any]]:
    """Run a function in parallel.

    Args:
        fn: The function to run in parallel.
        inputs_list: List of inputs to pass to `fn`.
        scheduler: Choose between ["processes", "threads"]. Defaults
            to None which uses the default joblib "loky" scheduler.
        n_jobs: Number of process. Use 0 or None to force sequential.
                Use -1 to use all the available processors. For details see
                https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        progress: Display a progress bar. Defaults to False.
        arg_type: One of ["arg", "args", "kwargs]:
            - "arg": the input is passed as an argument: `fn(arg)` (default).
            - "args": the input is passed as a list: `fn(*args)`.
            - "kwargs": the input is passed as a map: `fn(**kwargs)`.

    Returns:
        The results of the execution as a list.
    """

    runner = JobRunner(n_jobs=n_jobs, progress=progress, prefer=scheduler)
    return runner(fn, inputs_list, arg_type=arg_type)