Skip to content

datamol.utils

disable_on_os(os_names)

A decorator to disable a function raising an error if the OS detected is not supported.

Parameters:

Name Type Description Default
os_names Union[str, List[str]]

OS names to disable this function. Valid OS names are: ["linux", "osx", "win"].

required
Source code in datamol/utils/decorators.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def disable_on_os(os_names: Union[str, List[str]]):
    """A decorator to disable a function raising an error if the OS detected is not supported.

    Args:
        os_names: OS names to disable this function. Valid OS names are: `["linux", "osx", "win"]`.
    """

    if isinstance(os_names, str):
        os_names = [os_names]

    valid_os_names = []
    for os_name in os_names:
        if os_name == "linux":
            valid_os_names.append("Linux")
        elif os_name == "win":
            valid_os_names.append("Windows")
        elif os_name == "osx":
            valid_os_names.append("Darwin")
        else:
            valid_os_names.append(os_name)

    def real_decorator(function: Callable):
        @wraps(function)
        def wrapper(*args, **kwargs):
            if platform.system() not in valid_os_names:
                retval = function(*args, **kwargs)
                return retval
            else:
                raise NotImplementedError(
                    f"The function {function.__name__} is not supported"
                    f" for the platform '{platform.system()}'."
                )

        return wrapper

    return real_decorator

JobRunner

Source code in datamol/utils/jobs.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class JobRunner:
    def __init__(
        self,
        n_jobs: Optional[int] = -1,
        batch_size: Union[int, str] = "auto",
        prefer: Optional[str] = None,
        progress: bool = False,
        total: Optional[int] = None,
        tqdm_kwargs: Optional[dict] = None,
        **job_kwargs: Any,
    ):
        """
        JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
        allows taking advantage of its features, while the progress bar use tqdm

        Args:
            n_jobs: Number of process. Use 0 or None to force sequential.
                Use -1 to use all the available processors. For details see
                https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
            batch_size: Whether to  batch `inputs_list`. You can specify batch_size when the length
                of `inputs_list` is very large (>100k elements). By default, the auto batching of joblib is used.
            prefer: Choose from ['processes', 'threads'] or None. Default to None.
                Soft hint to choose the default backend if no specific backend
                was selected with the parallel_backend context manager. The
                default process-based backend is 'loky' and the default
                thread-based backend is 'threading'. Ignored if the ``backend``
                parameter is specified.
            progress: whether to display progress bar
            total: The number of elements in the iterator. Only used when `progress` is True.
            tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
            **job_kwargs: Any additional arguments supported by `joblib.Parallel`.

        Example:

        ```python
        import datamol as dm
        runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
        results = runner(lambda x: x**2, [1, 2, 3, 4])
        ```
        """

        self.n_jobs = n_jobs
        self.batch_size = batch_size
        self.prefer = prefer
        self.job_kwargs = job_kwargs
        self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer, batch_size=self.batch_size)
        self.no_progress = not progress
        self.total = total
        self.tqdm_kwargs = tqdm_kwargs or {}

    @property
    def is_sequential(self):
        """Check whether the job is sequential or parallel"""
        return (self.n_jobs is None) or (self.n_jobs in [0, 1])

    @staticmethod
    def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
        """Small wrapper around a callable to properly format it's argument"""

        def _run(args: Any):
            if arg_type == "kwargs":
                fn_kwargs.update(**args)
                return fn(**fn_kwargs)
            elif arg_type == "args":
                return fn(*args, **fn_kwargs)
            return fn(args, **fn_kwargs)

        return _run

    def sequential(
        self,
        callable_fn: Callable,
        data: Iterable[Any],
        arg_type: Optional[str] = None,
        **fn_kwargs,
    ):
        """
        Run job in sequential version

        Args:
            callable_fn (callable): function to call
            data (iterable): input data
            arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
            **fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
        """
        total_length = JobRunner.get_iterator_length(data)

        if self.total is not None:
            self.tqdm_kwargs["total"] = self.total
        elif "total" not in self.tqdm_kwargs:
            self.tqdm_kwargs["total"] = total_length

        if "disable" not in self.tqdm_kwargs:
            self.tqdm_kwargs["disable"] = self.no_progress

        results = [
            JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
            for dt in tqdm(data, **self.tqdm_kwargs)
        ]
        return results

    def parallel(
        self,
        callable_fn: Callable,
        data: Iterable[Any],
        arg_type: Optional[str] = None,
        **fn_kwargs,
    ):
        """
        Run job in parallel

        Args:
            callable_fn (callable): function to call
            data (iterable): input data
            arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
            **fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
        """

        total_length = JobRunner.get_iterator_length(data)

        if self.total is not None:
            self.tqdm_kwargs["total"] = self.total
        elif "total" not in self.tqdm_kwargs:
            self.tqdm_kwargs["total"] = total_length

        if "disable" not in self.tqdm_kwargs:
            self.tqdm_kwargs["disable"] = self.no_progress

        runner = JobRunner._parallel_helper(**self.job_kwargs)
        results = runner(**self.tqdm_kwargs)(
            delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
        )

        return results

    def __call__(self, *args, **kwargs):
        """
        Run job using the n_jobs attribute to determine regime
        """
        if self.is_sequential:
            return self.sequential(*args, **kwargs)
        return self.parallel(*args, **kwargs)

    @staticmethod
    def _parallel_helper(**joblib_args):
        r"""
        Parallel helper function for joblib with tqdm support
        """

        def run(**tq_args):
            def tmp(op_iter):
                with _tqdm_callback(tqdm(**tq_args)):
                    return Parallel(**joblib_args)(op_iter)

            return tmp

        return run

    @staticmethod
    def get_iterator_length(data):
        """Attempt to get the length of an iterator"""
        total_length = None
        try:
            total_length = len(data)
        except TypeError:
            # most likely a generator, ignore
            pass
        return total_length

is_sequential property

Check whether the job is sequential or parallel

__call__(*args, **kwargs)

Run job using the n_jobs attribute to determine regime

Source code in datamol/utils/jobs.py
153
154
155
156
157
158
159
def __call__(self, *args, **kwargs):
    """
    Run job using the n_jobs attribute to determine regime
    """
    if self.is_sequential:
        return self.sequential(*args, **kwargs)
    return self.parallel(*args, **kwargs)

__init__(n_jobs=-1, batch_size='auto', prefer=None, progress=False, total=None, tqdm_kwargs=None, **job_kwargs)

JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which allows taking advantage of its features, while the progress bar use tqdm

Parameters:

Name Type Description Default
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
batch_size Union[int, str]

Whether to batch inputs_list. You can specify batch_size when the length of inputs_list is very large (>100k elements). By default, the auto batching of joblib is used.

'auto'
prefer Optional[str]

Choose from ['processes', 'threads'] or None. Default to None. Soft hint to choose the default backend if no specific backend was selected with the parallel_backend context manager. The default process-based backend is 'loky' and the default thread-based backend is 'threading'. Ignored if the backend parameter is specified.

None
progress bool

whether to display progress bar

False
total Optional[int]

The number of elements in the iterator. Only used when progress is True.

None
tqdm_kwargs Optional[dict]

Any additional arguments supported by the tqdm progress bar.

None
**job_kwargs Any

Any additional arguments supported by joblib.Parallel.

{}

Example:

import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
Source code in datamol/utils/jobs.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    n_jobs: Optional[int] = -1,
    batch_size: Union[int, str] = "auto",
    prefer: Optional[str] = None,
    progress: bool = False,
    total: Optional[int] = None,
    tqdm_kwargs: Optional[dict] = None,
    **job_kwargs: Any,
):
    """
    JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
    allows taking advantage of its features, while the progress bar use tqdm

    Args:
        n_jobs: Number of process. Use 0 or None to force sequential.
            Use -1 to use all the available processors. For details see
            https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        batch_size: Whether to  batch `inputs_list`. You can specify batch_size when the length
            of `inputs_list` is very large (>100k elements). By default, the auto batching of joblib is used.
        prefer: Choose from ['processes', 'threads'] or None. Default to None.
            Soft hint to choose the default backend if no specific backend
            was selected with the parallel_backend context manager. The
            default process-based backend is 'loky' and the default
            thread-based backend is 'threading'. Ignored if the ``backend``
            parameter is specified.
        progress: whether to display progress bar
        total: The number of elements in the iterator. Only used when `progress` is True.
        tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
        **job_kwargs: Any additional arguments supported by `joblib.Parallel`.

    Example:

    ```python
    import datamol as dm
    runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
    results = runner(lambda x: x**2, [1, 2, 3, 4])
    ```
    """

    self.n_jobs = n_jobs
    self.batch_size = batch_size
    self.prefer = prefer
    self.job_kwargs = job_kwargs
    self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer, batch_size=self.batch_size)
    self.no_progress = not progress
    self.total = total
    self.tqdm_kwargs = tqdm_kwargs or {}

get_iterator_length(data) staticmethod

Attempt to get the length of an iterator

Source code in datamol/utils/jobs.py
176
177
178
179
180
181
182
183
184
185
@staticmethod
def get_iterator_length(data):
    """Attempt to get the length of an iterator"""
    total_length = None
    try:
        total_length = len(data)
    except TypeError:
        # most likely a generator, ignore
        pass
    return total_length

parallel(callable_fn, data, arg_type=None, **fn_kwargs)

Run job in parallel

Parameters:

Name Type Description Default
callable_fn callable

function to call

required
data iterable

input data

required
arg_type str

function argument type ('arg'/None or 'args' or 'kwargs')

None
**fn_kwargs dict

optional keyword argument to pass to the callable funciton

{}
Source code in datamol/utils/jobs.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def parallel(
    self,
    callable_fn: Callable,
    data: Iterable[Any],
    arg_type: Optional[str] = None,
    **fn_kwargs,
):
    """
    Run job in parallel

    Args:
        callable_fn (callable): function to call
        data (iterable): input data
        arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
        **fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
    """

    total_length = JobRunner.get_iterator_length(data)

    if self.total is not None:
        self.tqdm_kwargs["total"] = self.total
    elif "total" not in self.tqdm_kwargs:
        self.tqdm_kwargs["total"] = total_length

    if "disable" not in self.tqdm_kwargs:
        self.tqdm_kwargs["disable"] = self.no_progress

    runner = JobRunner._parallel_helper(**self.job_kwargs)
    results = runner(**self.tqdm_kwargs)(
        delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
    )

    return results

sequential(callable_fn, data, arg_type=None, **fn_kwargs)

Run job in sequential version

Parameters:

Name Type Description Default
callable_fn callable

function to call

required
data iterable

input data

required
arg_type str

function argument type ('arg'/None or 'args' or 'kwargs')

None
**fn_kwargs dict

optional keyword argument to pass to the callable funciton

{}
Source code in datamol/utils/jobs.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def sequential(
    self,
    callable_fn: Callable,
    data: Iterable[Any],
    arg_type: Optional[str] = None,
    **fn_kwargs,
):
    """
    Run job in sequential version

    Args:
        callable_fn (callable): function to call
        data (iterable): input data
        arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
        **fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
    """
    total_length = JobRunner.get_iterator_length(data)

    if self.total is not None:
        self.tqdm_kwargs["total"] = self.total
    elif "total" not in self.tqdm_kwargs:
        self.tqdm_kwargs["total"] = total_length

    if "disable" not in self.tqdm_kwargs:
        self.tqdm_kwargs["disable"] = self.no_progress

    results = [
        JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
        for dt in tqdm(data, **self.tqdm_kwargs)
    ]
    return results

wrap_fn(fn, arg_type=None, **fn_kwargs) staticmethod

Small wrapper around a callable to properly format it's argument

Source code in datamol/utils/jobs.py
73
74
75
76
77
78
79
80
81
82
83
84
85
@staticmethod
def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
    """Small wrapper around a callable to properly format it's argument"""

    def _run(args: Any):
        if arg_type == "kwargs":
            fn_kwargs.update(**args)
            return fn(**fn_kwargs)
        elif arg_type == "args":
            return fn(*args, **fn_kwargs)
        return fn(args, **fn_kwargs)

    return _run

parallelized(fn, inputs_list, scheduler='processes', n_jobs=-1, batch_size='auto', progress=False, arg_type='arg', total=None, tqdm_kwargs=None, **job_kwargs)

Run a function in parallel.

Parameters:

Name Type Description Default
fn Callable

The function to run in parallel.

required
inputs_list Iterable[Any]

List of inputs to pass to fn.

required
scheduler str

Choose between ["processes", "threads"]. Defaults to None which uses the default joblib "loky" scheduler.

'processes'
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
batch_size Union[int, str]

Whether to automatically batch inputs_list. You should only use it when the length of inputs_list is very large (>100k elements). The length of inputs_list must also be defined.

'auto'
progress bool

Display a progress bar. Defaults to False.

False
arg_type str

One of ["arg", "args", "kwargs]: - "arg": the input is passed as an argument: fn(arg) (default). - "args": the input is passed as a list: fn(*args). - "kwargs": the input is passed as a map: fn(**kwargs).

'arg'
total Optional[int]

The number of elements in the iterator. Only used when progress is True.

None
tqdm_kwargs Optional[dict]

Any additional arguments supported by the tqdm progress bar.

None
**job_kwargs Any

Any additional arguments supported by joblib.Parallel.

{}

Returns:

Type Description
Sequence[Optional[Any]]

The results of the execution as a list.

Source code in datamol/utils/jobs.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def parallelized(
    fn: Callable,
    inputs_list: Iterable[Any],
    scheduler: str = "processes",
    n_jobs: Optional[int] = -1,
    batch_size: Union[int, str] = "auto",
    progress: bool = False,
    arg_type: str = "arg",
    total: Optional[int] = None,
    tqdm_kwargs: Optional[dict] = None,
    **job_kwargs: Any,
) -> Sequence[Optional[Any]]:
    """Run a function in parallel.

    Args:
        fn: The function to run in parallel.
        inputs_list: List of inputs to pass to `fn`.
        scheduler: Choose between ["processes", "threads"]. Defaults
            to None which uses the default joblib "loky" scheduler.
        n_jobs: Number of process. Use 0 or None to force sequential.
                Use -1 to use all the available processors. For details see
                https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        batch_size: Whether to automatically batch `inputs_list`. You should only use it when the length
            of `inputs_list` is very large (>100k elements). The length of `inputs_list` must also be
            defined.
        progress: Display a progress bar. Defaults to False.
        arg_type: One of ["arg", "args", "kwargs]:
            - "arg": the input is passed as an argument: `fn(arg)` (default).
            - "args": the input is passed as a list: `fn(*args)`.
            - "kwargs": the input is passed as a map: `fn(**kwargs)`.
        total: The number of elements in the iterator. Only used when `progress` is True.
        tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
        **job_kwargs: Any additional arguments supported by `joblib.Parallel`.

    Returns:
        The results of the execution as a list.
    """

    runner = JobRunner(
        n_jobs=n_jobs,
        batch_size=batch_size,
        progress=progress,
        prefer=scheduler,
        total=total,
        tqdm_kwargs=tqdm_kwargs,
        **job_kwargs,
    )
    return runner(fn, inputs_list, arg_type=arg_type)

parallelized_with_batches(fn, inputs_list, batch_size, scheduler='processes', n_jobs=-1, progress=False, arg_type='arg', total=None, tqdm_kwargs=None, flatten_results=True, joblib_batch_size='auto', **job_kwargs)

Run a function in parallel using batches.

Parameters:

Name Type Description Default
fn Callable

The function to run in parallel. It must accept a batch of inputs_list.

required
inputs_list Iterable[Any]

List of inputs to pass to fn.

required
batch_size int

Batch size on which to run fn.

required
scheduler str

Choose between ["processes", "threads"]. Defaults to None which uses the default joblib "loky" scheduler.

'processes'
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
progress bool

Display a progress bar. Defaults to False.

False
arg_type str

One of ["arg", "args", "kwargs]: - "arg": the input is passed as an argument: fn(arg) (default). - "args": the input is passed as a list: fn(*args). - "kwargs": the input is passed as a map: fn(**kwargs).

'arg'
total Optional[int]

The number of elements in the iterator. Only used when progress is True.

None
tqdm_kwargs Optional[dict]

Any additional arguments supported by the tqdm progress bar.

None
flatten_results bool

Whether to flatten the results.

True
joblib_batch_size Union[int, str]

It corresponds to the batch_size argument of dm.parallelized that is forwarded to joblib.Parallel under the hood.

'auto'
**job_kwargs Any

Any additional arguments supported by joblib.Parallel.

{}

Returns:

Type Description
Sequence[Optional[Any]]

The results of the execution as a list.

Source code in datamol/utils/jobs.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def parallelized_with_batches(
    fn: Callable,
    inputs_list: Iterable[Any],
    batch_size: int,
    scheduler: str = "processes",
    n_jobs: Optional[int] = -1,
    progress: bool = False,
    arg_type: str = "arg",
    total: Optional[int] = None,
    tqdm_kwargs: Optional[dict] = None,
    flatten_results: bool = True,
    joblib_batch_size: Union[int, str] = "auto",
    **job_kwargs: Any,
) -> Sequence[Optional[Any]]:
    """Run a function in parallel using batches.

    Args:
        fn: The function to run in parallel. It must accept a batch of `inputs_list`.
        inputs_list: List of inputs to pass to `fn`.
        batch_size: Batch size on which to run `fn`.
        scheduler: Choose between ["processes", "threads"]. Defaults
            to None which uses the default joblib "loky" scheduler.
        n_jobs: Number of process. Use 0 or None to force sequential.
                Use -1 to use all the available processors. For details see
                https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        progress: Display a progress bar. Defaults to False.
        arg_type: One of ["arg", "args", "kwargs]:
            - "arg": the input is passed as an argument: `fn(arg)` (default).
            - "args": the input is passed as a list: `fn(*args)`.
            - "kwargs": the input is passed as a map: `fn(**kwargs)`.
        total: The number of elements in the iterator. Only used when `progress` is True.
        tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
        flatten_results: Whether to flatten the results.
        joblib_batch_size: It corresponds to the `batch_size` argument of `dm.parallelized` that
            is forwarded to `joblib.Parallel` under the hood.
        **job_kwargs: Any additional arguments supported by `joblib.Parallel`.

    Returns:
        The results of the execution as a list.
    """

    def _batch_iterator(n: int, iterable: Iterable):
        it = iter(iterable)
        while True:
            chunk_it = itertools.islice(it, n)
            try:
                first_el = next(chunk_it)
            except StopIteration:
                return
            yield list(itertools.chain((first_el,), chunk_it))

    # Compute the total number of batches of possible
    if total is not None:
        n_batches = total // batch_size
        n_batches = max(n_batches, 1)
    elif isinstance(inputs_list, collections.abc.Sized):
        n_batches = len(inputs_list) // batch_size
        n_batches = max(n_batches, 1)
    else:
        n_batches = None

    # Make an iterator over batches so it works even with Iterator without a defined length
    input_chunks = _batch_iterator(batch_size, inputs_list)

    runner = JobRunner(
        n_jobs=n_jobs,
        batch_size=joblib_batch_size,
        progress=progress,
        prefer=scheduler,
        total=n_batches,
        tqdm_kwargs=tqdm_kwargs,
        **job_kwargs,
    )
    results = runner(fn, input_chunks, arg_type=arg_type)

    # Flatten the results
    if flatten_results:
        results = [item for sublist in results for item in sublist]

    return results

watch_duration

A Python decorator to measure execution time with logging capability.

Parameters:

Name Type Description Default
log bool

Whether to log the measured duration.

True
log_human_duration bool

Whether to log duration in a human way depending on the amount.

True

Example:

def fn(n):
    for i in range(n):
        print(i)
        time.sleep(0.2)

with dm.utils.perf.watch_duration(log=True) as w:
    fn(5)

print(w.duration)
Source code in datamol/utils/perf.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class watch_duration:
    """A Python decorator to measure execution time with logging capability.

    Args:
        log: Whether to log the measured duration.
        log_human_duration: Whether to log duration in a human way
            depending on the amount.

    Example:

    ```python
    def fn(n):
        for i in range(n):
            print(i)
            time.sleep(0.2)

    with dm.utils.perf.watch_duration(log=True) as w:
        fn(5)

    print(w.duration)
    ```
    """

    def __init__(self, log: bool = True, log_human_duration: bool = True):
        self.log = log
        self.log_human_duration = log_human_duration

        self.start = None
        self.end = None
        self.duration = None
        self.duration_minutes = None

    def __enter__(self):
        self.start = time.time()
        return self

    def __exit__(self, *_):
        assert self.start is not None

        self.end = time.time()
        self.duration = self.end - self.start
        self.duration_minutes = self.duration / 60

        if self.log:
            if self.log_human_duration:
                logger.info(f"Duration {human_duration(self.duration)}.")
            else:
                logger.info(f"Duration {self.duration_minutes:.2f} minutes")