datamol.utils
¶
All the below functions are accessible under datamol.utils
.
datamol.utils.jobs.JobRunner
¶
is_sequential
property
readonly
¶
Check whether the job is sequential or parallel
__call__(self, *args, **kwargs)
special
¶
Run job using the n_jobs attribute to determine regime
Source code in datamol/utils/jobs.py
def __call__(self, *args, **kwargs):
"""
Run job using the n_jobs attribute to determine regime
"""
if self.is_sequential:
return self.sequential(*args, **kwargs)
return self.parallel(*args, **kwargs)
__init__(self, n_jobs=-1, prefer=None, progress=False, **job_kwargs)
special
¶
JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which allows taking advantage of its features, while the progress bar use tqdm
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_jobs |
Optional[int] |
Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation |
-1 |
prefer |
str |
Choose from ['processes', 'threads'] or None. Default to None.
Soft hint to choose the default backend if no specific backend
was selected with the parallel_backend context manager. The
default process-based backend is 'loky' and the default
thread-based backend is 'threading'. Ignored if the |
None |
progress |
bool |
whether to display progress bar |
False |
job_kwargs |
|
Any additional keyword argument supported by joblib.Parallel. |
{} |
Examples:
import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
Source code in datamol/utils/jobs.py
def __init__(
self,
n_jobs: Optional[int] = -1,
prefer: str = None,
progress: bool = False,
**job_kwargs,
):
"""
JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
allows taking advantage of its features, while the progress bar use tqdm
Args:
n_jobs: Number of process. Use 0 or None to force sequential.
Use -1 to use all the available processors. For details see
https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
prefer: Choose from ['processes', 'threads'] or None. Default to None.
Soft hint to choose the default backend if no specific backend
was selected with the parallel_backend context manager. The
default process-based backend is 'loky' and the default
thread-based backend is 'threading'. Ignored if the ``backend``
parameter is specified.
progress: whether to display progress bar
job_kwargs: Any additional keyword argument supported by joblib.Parallel.
Example:
```python
import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
```
"""
self.n_jobs = n_jobs
self.prefer = prefer
self.job_kwargs = job_kwargs
self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer)
self.no_progress = not progress
get_iterator_length(data)
staticmethod
¶
Attempt to get the length of an iterator
Source code in datamol/utils/jobs.py
@staticmethod
def get_iterator_length(data):
"""Attempt to get the length of an iterator"""
total_length = None
try:
total_length = len(data)
except TypeError:
# most likely a generator, ignore
pass
return total_length
parallel(self, callable_fn, data, arg_type=None, **fn_kwargs)
¶
Run job in parallel
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callable_fn |
Callable |
function to call |
required |
data |
Iterable[Any] |
input data |
required |
arg_type |
Optional[str] |
function argument type ('arg'/None or 'args' or 'kwargs') |
None |
fn_kwargs |
dict |
optional keyword argument to pass to the callable funciton |
{} |
Source code in datamol/utils/jobs.py
def parallel(
self,
callable_fn: Callable,
data: Iterable[Any],
arg_type: Optional[str] = None,
**fn_kwargs,
):
r"""
Run job in parallel
Args:
callable_fn (callable): function to call
data (iterable): input data
arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
"""
runner = JobRunner._parallel_helper(**self.job_kwargs)
total_length = JobRunner.get_iterator_length(data)
results = runner(total=total_length, disable=self.no_progress)(
delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
)
return results
sequential(self, callable_fn, data, arg_type=None, **fn_kwargs)
¶
Run job in sequential version
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callable_fn |
Callable |
function to call |
required |
data |
Iterable[Any] |
input data |
required |
arg_type |
Optional[str] |
function argument type ('arg'/None or 'args' or 'kwargs') |
None |
fn_kwargs |
dict |
optional keyword argument to pass to the callable funciton |
{} |
Source code in datamol/utils/jobs.py
def sequential(
self,
callable_fn: Callable,
data: Iterable[Any],
arg_type: Optional[str] = None,
**fn_kwargs,
):
r"""
Run job in sequential version
Args:
callable_fn (callable): function to call
data (iterable): input data
arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
"""
total_length = JobRunner.get_iterator_length(data)
res = [
JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
for dt in tqdm(data, total=total_length, disable=self.no_progress)
]
return res
wrap_fn(fn, arg_type=None, **fn_kwargs)
staticmethod
¶
Small wrapper around a callable to properly format it's argument
Source code in datamol/utils/jobs.py
@staticmethod
def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
"""Small wrapper around a callable to properly format it's argument"""
# EN probably use something like (moms.utils.commons.is_callable) ?
def _run(args: Any):
if arg_type == "kwargs":
fn_kwargs.update(**args)
return fn(**fn_kwargs)
elif arg_type == "args":
return fn(*args, **fn_kwargs)
return fn(args, **fn_kwargs)
return _run
datamol.utils.decorators.decorators
¶
disable_on_os(os_names)
¶
A decorator to disable a function raising an error if the OS detected is not supported.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
os_names |
Union[str, List[str]] |
OS names to disable this function. Valid OS names are: |
required |
Source code in datamol/utils/decorators.py
def disable_on_os(os_names: Union[str, List[str]]):
"""A decorator to disable a function raising an error if the OS detected is not supported.
Args:
os_names: OS names to disable this function. Valid OS names are: `["linux", "osx", "win"]`.
"""
if isinstance(os_names, str):
os_names = [os_names]
valid_os_names = []
for os_name in os_names:
if os_name == "linux":
valid_os_names.append("Linux")
elif os_name == "win":
valid_os_names.append("Windows")
elif os_name == "osx":
valid_os_names.append("Darwin")
else:
valid_os_names.append(os_name)
def real_decorator(function: Callable):
@wraps(function)
def wrapper(*args, **kwargs):
if platform.system() not in valid_os_names:
retval = function(*args, **kwargs)
return retval
else:
raise NotImplementedError(
f"The function {function.__name__} is not supported"
f" for the platform '{platform.system()}'."
)
return wrapper
return real_decorator
datamol.utils.fs.fs
¶
copy_file(source, destination)
¶
Copy a file to another path using fsspec.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
Union[str, os.PathLike] |
Path to a file to copy from (remote or local). |
required |
destination |
Union[str, os.PathLike] |
Path to a file to copy to (remote or local). |
required |
Source code in datamol/utils/fs.py
def copy_file(
source: Union[str, os.PathLike],
destination: Union[str, os.PathLike],
):
"""Copy a file to another path using fsspec.
Args:
source: Path to a file to copy from (remote or local).
destination: Path to a file to copy to (remote or local).
"""
with fsspec.open(destination, "wb") as f_dest:
with fsspec.open(source, "rb") as f_source:
f_dest.write(f_source.read())
datamol.utils.jobs.jobs
¶
JobRunner
¶
is_sequential
property
readonly
¶
Check whether the job is sequential or parallel
__call__(self, *args, **kwargs)
special
¶
Run job using the n_jobs attribute to determine regime
Source code in datamol/utils/jobs.py
def __call__(self, *args, **kwargs):
"""
Run job using the n_jobs attribute to determine regime
"""
if self.is_sequential:
return self.sequential(*args, **kwargs)
return self.parallel(*args, **kwargs)
__init__(self, n_jobs=-1, prefer=None, progress=False, **job_kwargs)
special
¶
JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which allows taking advantage of its features, while the progress bar use tqdm
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_jobs |
Optional[int] |
Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation |
-1 |
prefer |
str |
Choose from ['processes', 'threads'] or None. Default to None.
Soft hint to choose the default backend if no specific backend
was selected with the parallel_backend context manager. The
default process-based backend is 'loky' and the default
thread-based backend is 'threading'. Ignored if the |
None |
progress |
bool |
whether to display progress bar |
False |
job_kwargs |
|
Any additional keyword argument supported by joblib.Parallel. |
{} |
Examples:
import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
Source code in datamol/utils/jobs.py
def __init__(
self,
n_jobs: Optional[int] = -1,
prefer: str = None,
progress: bool = False,
**job_kwargs,
):
"""
JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
allows taking advantage of its features, while the progress bar use tqdm
Args:
n_jobs: Number of process. Use 0 or None to force sequential.
Use -1 to use all the available processors. For details see
https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
prefer: Choose from ['processes', 'threads'] or None. Default to None.
Soft hint to choose the default backend if no specific backend
was selected with the parallel_backend context manager. The
default process-based backend is 'loky' and the default
thread-based backend is 'threading'. Ignored if the ``backend``
parameter is specified.
progress: whether to display progress bar
job_kwargs: Any additional keyword argument supported by joblib.Parallel.
Example:
```python
import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
```
"""
self.n_jobs = n_jobs
self.prefer = prefer
self.job_kwargs = job_kwargs
self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer)
self.no_progress = not progress
get_iterator_length(data)
staticmethod
¶
Attempt to get the length of an iterator
Source code in datamol/utils/jobs.py
@staticmethod
def get_iterator_length(data):
"""Attempt to get the length of an iterator"""
total_length = None
try:
total_length = len(data)
except TypeError:
# most likely a generator, ignore
pass
return total_length
parallel(self, callable_fn, data, arg_type=None, **fn_kwargs)
¶
Run job in parallel
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callable_fn |
Callable |
function to call |
required |
data |
Iterable[Any] |
input data |
required |
arg_type |
Optional[str] |
function argument type ('arg'/None or 'args' or 'kwargs') |
None |
fn_kwargs |
dict |
optional keyword argument to pass to the callable funciton |
{} |
Source code in datamol/utils/jobs.py
def parallel(
self,
callable_fn: Callable,
data: Iterable[Any],
arg_type: Optional[str] = None,
**fn_kwargs,
):
r"""
Run job in parallel
Args:
callable_fn (callable): function to call
data (iterable): input data
arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
"""
runner = JobRunner._parallel_helper(**self.job_kwargs)
total_length = JobRunner.get_iterator_length(data)
results = runner(total=total_length, disable=self.no_progress)(
delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
)
return results
sequential(self, callable_fn, data, arg_type=None, **fn_kwargs)
¶
Run job in sequential version
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callable_fn |
Callable |
function to call |
required |
data |
Iterable[Any] |
input data |
required |
arg_type |
Optional[str] |
function argument type ('arg'/None or 'args' or 'kwargs') |
None |
fn_kwargs |
dict |
optional keyword argument to pass to the callable funciton |
{} |
Source code in datamol/utils/jobs.py
def sequential(
self,
callable_fn: Callable,
data: Iterable[Any],
arg_type: Optional[str] = None,
**fn_kwargs,
):
r"""
Run job in sequential version
Args:
callable_fn (callable): function to call
data (iterable): input data
arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
"""
total_length = JobRunner.get_iterator_length(data)
res = [
JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
for dt in tqdm(data, total=total_length, disable=self.no_progress)
]
return res
wrap_fn(fn, arg_type=None, **fn_kwargs)
staticmethod
¶
Small wrapper around a callable to properly format it's argument
Source code in datamol/utils/jobs.py
@staticmethod
def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
"""Small wrapper around a callable to properly format it's argument"""
# EN probably use something like (moms.utils.commons.is_callable) ?
def _run(args: Any):
if arg_type == "kwargs":
fn_kwargs.update(**args)
return fn(**fn_kwargs)
elif arg_type == "args":
return fn(*args, **fn_kwargs)
return fn(args, **fn_kwargs)
return _run
parallelized(fn, inputs_list, scheduler='processes', n_jobs=-1, progress=False, arg_type='arg')
¶
Run a function in parallel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
Callable |
The function to run in parallel. |
required |
inputs_list |
Iterable[Any] |
List of inputs to pass to |
required |
scheduler |
str |
Choose between ["processes", "threads"]. Defaults to None which uses the default joblib "loky" scheduler. |
'processes' |
n_jobs |
Optional[int] |
Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation |
-1 |
progress |
bool |
Display a progress bar. Defaults to False. |
False |
arg_type |
str |
One of ["arg", "args", "kwargs]:
- "arg": the input is passed as an argument: |
'arg' |
Returns:
Type | Description |
---|---|
Optional[List[Any]] |
The results of the execution as a list. |
Source code in datamol/utils/jobs.py
def parallelized(
fn: Callable,
inputs_list: Iterable[Any],
scheduler: str = "processes",
n_jobs: Optional[int] = -1,
progress: bool = False,
arg_type: str = "arg",
) -> Optional[List[Any]]:
"""Run a function in parallel.
Args:
fn: The function to run in parallel.
inputs_list: List of inputs to pass to `fn`.
scheduler: Choose between ["processes", "threads"]. Defaults
to None which uses the default joblib "loky" scheduler.
n_jobs: Number of process. Use 0 or None to force sequential.
Use -1 to use all the available processors. For details see
https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
progress: Display a progress bar. Defaults to False.
arg_type: One of ["arg", "args", "kwargs]:
- "arg": the input is passed as an argument: `fn(arg)` (default).
- "args": the input is passed as a list: `fn(*args)`.
- "kwargs": the input is passed as a map: `fn(**kwargs)`.
Returns:
The results of the execution as a list.
"""
runner = JobRunner(n_jobs=n_jobs, progress=progress, prefer=scheduler)
return runner(fn, inputs_list, arg_type=arg_type)
datamol.utils.jobs.parallelized(fn, inputs_list, scheduler='processes', n_jobs=-1, progress=False, arg_type='arg')
¶
Run a function in parallel.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
Callable |
The function to run in parallel. |
required |
inputs_list |
Iterable[Any] |
List of inputs to pass to |
required |
scheduler |
str |
Choose between ["processes", "threads"]. Defaults to None which uses the default joblib "loky" scheduler. |
'processes' |
n_jobs |
Optional[int] |
Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation |
-1 |
progress |
bool |
Display a progress bar. Defaults to False. |
False |
arg_type |
str |
One of ["arg", "args", "kwargs]:
- "arg": the input is passed as an argument: |
'arg' |
Returns:
Type | Description |
---|---|
Optional[List[Any]] |
The results of the execution as a list. |
Source code in datamol/utils/jobs.py
def parallelized(
fn: Callable,
inputs_list: Iterable[Any],
scheduler: str = "processes",
n_jobs: Optional[int] = -1,
progress: bool = False,
arg_type: str = "arg",
) -> Optional[List[Any]]:
"""Run a function in parallel.
Args:
fn: The function to run in parallel.
inputs_list: List of inputs to pass to `fn`.
scheduler: Choose between ["processes", "threads"]. Defaults
to None which uses the default joblib "loky" scheduler.
n_jobs: Number of process. Use 0 or None to force sequential.
Use -1 to use all the available processors. For details see
https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
progress: Display a progress bar. Defaults to False.
arg_type: One of ["arg", "args", "kwargs]:
- "arg": the input is passed as an argument: `fn(arg)` (default).
- "args": the input is passed as a list: `fn(*args)`.
- "kwargs": the input is passed as a map: `fn(**kwargs)`.
Returns:
The results of the execution as a list.
"""
runner = JobRunner(n_jobs=n_jobs, progress=progress, prefer=scheduler)
return runner(fn, inputs_list, arg_type=arg_type)