Skip to content

datamol.utils

disable_on_os(os_names)

A decorator to disable a function raising an error if the OS detected is not supported.

Parameters:

Name Type Description Default
os_names Union[str, List[str]]

OS names to disable this function. Valid OS names are: ["linux", "osx", "win"].

required
Source code in datamol/utils/decorators.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def disable_on_os(os_names: Union[str, List[str]]):
    """A decorator to disable a function raising an error if the OS detected is not supported.

    Args:
        os_names: OS names to disable this function. Valid OS names are: `["linux", "osx", "win"]`.
    """

    if isinstance(os_names, str):
        os_names = [os_names]

    valid_os_names = []
    for os_name in os_names:
        if os_name == "linux":
            valid_os_names.append("Linux")
        elif os_name == "win":
            valid_os_names.append("Windows")
        elif os_name == "osx":
            valid_os_names.append("Darwin")
        else:
            valid_os_names.append(os_name)

    def real_decorator(function: Callable):
        @wraps(function)
        def wrapper(*args, **kwargs):
            if platform.system() not in valid_os_names:
                retval = function(*args, **kwargs)
                return retval
            else:
                raise NotImplementedError(
                    f"The function {function.__name__} is not supported"
                    f" for the platform '{platform.system()}'."
                )

        return wrapper

    return real_decorator

The fs module makes it easier to work with all type of path (the ones supported by fsspec).

copy_dir(source, destination, force=False, progress=False, leave_progress=True, file_progress=False, file_leave_progress=False, chunk_size=None)

Copy one directory to another location across different filesystem (local, S3, GCS, etc).

Note that if both FS from source and destination are the same, progress won't be shown.

Parameters:

Name Type Description Default
source Union[str, pathlib.Path]

Path to the source directory.

required
destination Union[str, pathlib.Path]

Path to the destination directory.

required
chunk_size Optional[int]

the chunk size to use. If progress is enabled the chunk size is None, it is set to 2048.

None
force bool

whether to overwrite the destination directory if it exists.

False
progress bool

Whether to display a progress bar.

False
leave_progress bool

Whether to hide the progress bar once the copy is done.

True
file_progress bool

Whether to display a progress bar for each file.

False
file_leave_progress bool

Whether to hide the progress bar once a file copy is done.

False
chunk_size Optional[int]

See dm.utils.fs.copy_file.

None
Source code in datamol/utils/fs.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def copy_dir(
    source: Union[str, pathlib.Path],
    destination: Union[str, pathlib.Path],
    force: bool = False,
    progress: bool = False,
    leave_progress: bool = True,
    file_progress: bool = False,
    file_leave_progress: bool = False,
    chunk_size: Optional[int] = None,
):
    """Copy one directory to another location across different filesystem (local, S3, GCS, etc).

    Note that if both FS from source and destination are the same, progress won't be shown.

    Args:
        source: Path to the source directory.
        destination: Path to the destination directory.
        chunk_size: the chunk size to use. If progress is enabled the chunk
            size is `None`, it is set to 2048.
        force: whether to overwrite the destination directory if it exists.
        progress: Whether to display a progress bar.
        leave_progress: Whether to hide the progress bar once the copy is done.
        file_progress: Whether to display a progress bar for each file.
        file_leave_progress: Whether to hide the progress bar once a file copy is done.
        chunk_size: See `dm.utils.fs.copy_file`.
    """

    source = str(source)
    destination = str(destination)

    source_fs = get_mapper(source).fs
    destination_fs = get_mapper(destination).fs

    # Sanity check
    if not is_dir(source):
        raise ValueError(
            f"The directory being copied does not exist or is not a directory: {source}"
        )

    if not force and is_dir(destination):
        raise ValueError(f"The destination folder to copy already exists: {destination}")

    # If both fs are the same then we just rely on the internal `copy` method
    # which is much faster.
    if destination_fs.__class__ == source_fs.__class__:
        source_fs.copy(source, destination, recursive=True)
        return

    # Get all input paths with details
    # NOTE(hadim): we could have use `.glob(..., detail=True)` here but that API is inconsistent
    # between the backends resulting in different object types being returned (dict, list, etc).
    detailed_paths = source_fs.find(source, withdirs=True, detail=True)
    detailed_paths = list(detailed_paths.values())

    # Get list of input types
    input_types = [d["type"] for d in detailed_paths]

    # Get list of input path + add protocol if needed
    input_paths = [d["name"] for d in detailed_paths]
    input_paths = [fsspec.utils._unstrip_protocol(p, source_fs) for p in input_paths]

    # Build all the output paths
    output_paths: List[str] = fsspec.utils.other_paths(input_paths, destination)  # type: ignore

    def _copy_source_to_destination(input_path, input_type, output_path):
        # A directory
        if input_type == "directory":
            destination_fs.mkdir(output_path)

        # A file
        else:
            copy_file(
                input_path,
                output_path,
                force=force,
                progress=file_progress,
                leave_progress=file_leave_progress,
                chunk_size=chunk_size,
            )

    # Copy source files/directories to destination in parallel
    parallelized(
        _copy_source_to_destination,
        inputs_list=list(zip(input_paths, input_types, output_paths)),
        arg_type="args",
        progress=progress,
        tqdm_kwargs=dict(leave=leave_progress),
        scheduler="threads",
    )

copy_file(source, destination, chunk_size=None, force=False, progress=False, leave_progress=True)

Copy one file to another location across different filesystem (local, S3, GCS, etc).

Parameters:

Name Type Description Default
source Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile]

path or file-like object to copy from.

required
destination Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile]

path or file-like object to copy to.

required
chunk_size Optional[int]

the chunk size to use. If progress is enabled the chunk size is None, it is set to 1MB (1024 * 1024).

None
force bool

whether to overwrite the destination file if it exists.

False
progress bool

whether to display a progress bar.

False
leave_progress bool

whether to hide the progress bar once the copy is done.

True
Source code in datamol/utils/fs.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def copy_file(
    source: Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile],
    destination: Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile],
    chunk_size: Optional[int] = None,
    force: bool = False,
    progress: bool = False,
    leave_progress: bool = True,
):
    """Copy one file to another location across different filesystem (local, S3, GCS, etc).

    Args:
        source: path or file-like object to copy from.
        destination: path or file-like object to copy to.
        chunk_size: the chunk size to use. If progress is enabled the chunk
            size is `None`, it is set to 1MB (1024 * 1024).
        force: whether to overwrite the destination file if it exists.
        progress: whether to display a progress bar.
        leave_progress: whether to hide the progress bar once the copy is done.
    """

    if progress and chunk_size is None:
        chunk_size = 1024 * 1024

    if isinstance(source, (str, os.PathLike)):
        source_file = fsspec.open(str(source), "rb")
    else:
        source_file = source

    if isinstance(destination, (str, os.PathLike)):
        # adapt the file mode of the destination depending on the source file.
        destination_mode = "wb"
        if hasattr(source_file, "mode"):
            destination_mode = "wb" if "b" in getattr(source_file, "mode") else "w"
        elif isinstance(source_file, io.BytesIO):
            destination_mode = "wb"
        elif isinstance(source_file, io.StringIO):
            destination_mode = "w"

        destination_file = fsspec.open(str(destination), destination_mode)
    else:
        destination_file = destination

    if not is_file(source_file):  # type: ignore
        raise ValueError(f"The file being copied does not exist or is not a file: {source}")

    if not force and is_file(destination_file):  # type: ignore
        raise ValueError(f"The destination file to copy already exists: {destination}")

    with source_file as source_stream:
        with destination_file as destination_stream:
            if chunk_size is None:
                # copy without chunks
                destination_stream.write(source_stream.read())  # type: ignore

            else:
                # copy with chunks

                # determine the size of the source file
                source_size = None
                if progress:
                    source_size = get_size(source)

                pbar = None
                if progress:
                    tqdm = _import_tqdm()

                    if tqdm is None:
                        raise ImportError(
                            "If the progress bar is enabled, you must have `tqdm` "
                            "installed: `conda install tqdm`."
                        )
                    else:
                        # init progress bar
                        pbar = tqdm(
                            total=source_size,
                            leave=leave_progress,
                            disable=not progress,
                            unit="B",
                            unit_divisor=1024,
                            unit_scale=True,
                        )

                # start the loop
                while True:
                    data = source_stream.read(chunk_size)  # type: ignore
                    if not data:
                        break
                    destination_stream.write(data)  # type: ignore

                    if pbar is not None:
                        pbar.update(chunk_size)

                if pbar is not None:
                    pbar.close()

exists(path)

Check whether a file or a directory exists.

Important: File-like object always exists.

Parameters:

Name Type Description Default
path Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
82
83
84
85
86
87
88
89
90
def exists(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
    """Check whether a file or a directory exists.

    Important: File-like object always exists.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    return is_file(path) or is_dir(path)

get_basename(path)

Get the basename of a file or a folder.

Parameters:

Name Type Description Default
path Union[str, os.PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
60
61
62
63
64
65
66
67
68
69
def get_basename(path: Union[str, os.PathLike]):
    """Get the basename of a file or a folder.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    path = str(path)
    mapper = get_mapper(path)
    clean_path = path.rstrip(mapper.fs.sep)
    return str(clean_path).split(mapper.fs.sep)[-1]

get_cache_dir(app_name, suffix=None, create=True)

Get a local cache directory for a given application name.

Parameters:

Name Type Description Default
app_name str

The name of the application.

required
suffix Optional[str]

A subdirectory appended to the cache dir.

None
create bool

Whether to create the directory and its parents if it does not already exist.

True
Source code in datamol/utils/fs.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def get_cache_dir(app_name: str, suffix: Optional[str] = None, create: bool = True):
    """Get a local cache directory for a given application name.

    Args:
        app_name: The name of the application.
        suffix: A subdirectory appended to the cache dir.
        create: Whether to create the directory and its parents if it does not
            already exist.
    """

    cache_dir = pathlib.Path(platformdirs.user_cache_dir(appname=app_name))

    if suffix is not None:
        cache_dir /= suffix

    if create:
        cache_dir.mkdir(exist_ok=True, parents=True)

    return cache_dir

get_extension(path)

Get the extension of a file.

Parameters:

Name Type Description Default
path Union[str, os.PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
72
73
74
75
76
77
78
79
def get_extension(path: Union[str, os.PathLike]):
    """Get the extension of a file.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    basename = get_basename(path)
    return basename.split(".")[-1]

get_mapper(path)

Get the fsspec mapper.

Parameters:

Name Type Description Default
path Union[str, os.PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
51
52
53
54
55
56
57
def get_mapper(path: Union[str, os.PathLike]):
    """Get the fsspec mapper.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    return fsspec.get_mapper(str(path))

get_protocol(path, fs=None)

Return the name of the path protocol.

Parameters:

Name Type Description Default
path Union[str, os.PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_protocol(path: Union[str, os.PathLike], fs: Optional[fsspec.AbstractFileSystem] = None):
    """Return the name of the path protocol.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """

    if fs is None:
        fs = get_mapper(path).fs

    protocol = fs.protocol  # type: ignore

    if "s3" in protocol:
        return "s3"
    elif "gs" in protocol:
        return "gs"
    elif isinstance(protocol, (tuple, list)):
        return protocol[0]
    return protocol

get_size(file)

Get the size of a file given its path. Return None if the size can't be retrieved.

Source code in datamol/utils/fs.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def get_size(file: Union[str, os.PathLike, io.IOBase, fsspec.core.OpenFile]) -> Optional[int]:
    """Get the size of a file given its path. Return None if the
    size can't be retrieved.
    """

    if isinstance(file, io.IOBase) and hasattr(file, "name"):
        fs_local = fsspec.filesystem("file")
        file_size = fs_local.size(getattr(file, "name"))

    elif isinstance(file, (str, os.PathLike)):
        fs = get_mapper(str(file)).fs
        file_size = fs.size(str(file))

    elif isinstance(file, fsspec.core.OpenFile):
        file_size = file.fs.size(file.path)

    else:
        file_size = None

    return file_size

glob(path, detail=False, **kwargs)

Find files by glob-matching.

Parameters:

Name Type Description Default
path str

A glob-style path.

required
Source code in datamol/utils/fs.py
310
311
312
313
314
315
316
317
318
319
320
def glob(path: str, detail: bool = False, **kwargs) -> List[str]:
    """Find files by glob-matching.

    Args:
        path: A glob-style path.
    """
    # Get the list of paths
    fs = get_mapper(path).fs
    paths = fs.glob(path, detail=detail, **kwargs)
    paths = [fsspec.utils._unstrip_protocol(d, fs) for d in paths]
    return paths

is_dir(path)

Check whether a file exists.

Parameters:

Name Type Description Default
path Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def is_dir(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
    """Check whether a file exists.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    if isinstance(path, fsspec.core.OpenFile):
        return path.fs.isdir(path.path)

    elif isinstance(path, (str, os.PathLike)):
        mapper = get_mapper(str(path))
        return mapper.fs.isdir(str(path))

    else:
        return False

is_file(path)

Check whether a file exists.

Parameters:

Name Type Description Default
path Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def is_file(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
    """Check whether a file exists.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    if isinstance(path, fsspec.core.OpenFile):
        return path.fs.isfile(path.path)

    elif isinstance(path, (str, os.PathLike)):
        mapper = get_mapper(str(path))
        return mapper.fs.isfile(str(path))

    else:
        return False

is_local_path(path)

Check whether a path is local.

Source code in datamol/utils/fs.py
148
149
150
def is_local_path(path: Union[str, os.PathLike]):
    """Check whether a path is local."""
    return get_protocol(str(path)) == "file"

join(*paths)

Join paths together. The first element determine the filesystem to use (and so the separator.

Parameters:

Name Type Description Default
*paths str

a list of paths supported by fsspec such as local, s3, gcs, etc.

()
Source code in datamol/utils/fs.py
153
154
155
156
157
158
159
160
161
162
163
164
def join(*paths: str):
    """Join paths together. The first element determine the
    filesystem to use (and so the separator.

    Args:
        *paths: a list of paths supported by `fsspec` such as local, s3, gcs, etc.
    """
    _paths = [str(path).rstrip("/") for path in paths]
    source_path = _paths[0]
    fs = get_mapper(source_path).fs
    full_path = fs.sep.join(_paths)
    return full_path

md5(filepath)

Return the md5 hash of a file.

Parameters:

Name Type Description Default
filepath Union[str, os.PathLike]

The path to the file to compute the MD5 hash on.

required
Source code in datamol/utils/fs.py
297
298
299
300
301
302
303
304
305
306
307
def md5(filepath: Union[str, os.PathLike]):
    """Return the md5 hash of a file.

    Args:
        filepath: The path to the file to compute the MD5 hash on.
    """
    with fsspec.open(filepath) as f:
        file_hash = hashlib.md5()
        file_hash.update(f.read())  # type: ignore
        file_hash = file_hash.hexdigest()
    return file_hash

mkdir(dir_path, exist_ok=False)

Create a directory.

Parameters:

Name Type Description Default
dir_path Union[str, os.PathLike]

The path of the directory to create.

required
exist_ok bool

Whether to ignore the error if the directory already exists.

False
Source code in datamol/utils/fs.py
285
286
287
288
289
290
291
292
293
294
def mkdir(dir_path: Union[str, os.PathLike], exist_ok: bool = False):
    """Create a directory.

    Args:
        dir_path: The path of the directory to create.
        exist_ok: Whether to ignore the error if the directory
            already exists.
    """
    fs = get_mapper(str(dir_path)).fs
    fs.mkdirs(str(dir_path), exist_ok=exist_ok)

JobRunner

Source code in datamol/utils/jobs.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class JobRunner:
    def __init__(
        self,
        n_jobs: Optional[int] = -1,
        batch_size: Union[int, str] = "auto",
        prefer: Optional[str] = None,
        progress: bool = False,
        total: Optional[int] = None,
        tqdm_kwargs: Optional[dict] = None,
        **job_kwargs: Any,
    ):
        """
        JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
        allows taking advantage of its features, while the progress bar use tqdm

        Args:
            n_jobs: Number of process. Use 0 or None to force sequential.
                Use -1 to use all the available processors. For details see
                https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
            batch_size: Whether to  batch `inputs_list`. You can specify batch_size when the length
                of `inputs_list` is very large (>100k elements). By default, the auto batching of joblib is used.
            prefer: Choose from ['processes', 'threads'] or None. Default to None.
                Soft hint to choose the default backend if no specific backend
                was selected with the parallel_backend context manager. The
                default process-based backend is 'loky' and the default
                thread-based backend is 'threading'. Ignored if the ``backend``
                parameter is specified.
            progress: whether to display progress bar
            total: The number of elements in the iterator. Only used when `progress` is True.
            tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
            **job_kwargs: Any additional arguments supported by `joblib.Parallel`.

        Example:

        ```python
        import datamol as dm
        runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
        results = runner(lambda x: x**2, [1, 2, 3, 4])
        ```
        """

        self.n_jobs = n_jobs
        self.batch_size = batch_size
        self.prefer = prefer
        self.job_kwargs = job_kwargs
        self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer, batch_size=self.batch_size)
        self.no_progress = not progress
        self.total = total
        self.tqdm_kwargs = tqdm_kwargs or {}

    @property
    def is_sequential(self):
        """Check whether the job is sequential or parallel"""
        return (self.n_jobs is None) or (self.n_jobs in [0, 1])

    @staticmethod
    def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
        """Small wrapper around a callable to properly format it's argument"""

        def _run(args: Any):
            if arg_type == "kwargs":
                fn_kwargs.update(**args)
                return fn(**fn_kwargs)
            elif arg_type == "args":
                return fn(*args, **fn_kwargs)
            return fn(args, **fn_kwargs)

        return _run

    def sequential(
        self,
        callable_fn: Callable,
        data: Iterable[Any],
        arg_type: Optional[str] = None,
        **fn_kwargs,
    ):
        """
        Run job in sequential version

        Args:
            callable_fn (callable): function to call
            data (iterable): input data
            arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
            **fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
        """
        total_length = JobRunner.get_iterator_length(data)

        if self.total is not None:
            self.tqdm_kwargs["total"] = self.total
        elif "total" not in self.tqdm_kwargs:
            self.tqdm_kwargs["total"] = total_length

        if "disable" not in self.tqdm_kwargs:
            self.tqdm_kwargs["disable"] = self.no_progress

        results = [
            JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
            for dt in tqdm(data, **self.tqdm_kwargs)
        ]
        return results

    def parallel(
        self,
        callable_fn: Callable,
        data: Iterable[Any],
        arg_type: Optional[str] = None,
        **fn_kwargs,
    ):
        """
        Run job in parallel

        Args:
            callable_fn (callable): function to call
            data (iterable): input data
            arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
            **fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
        """

        total_length = JobRunner.get_iterator_length(data)

        if self.total is not None:
            self.tqdm_kwargs["total"] = self.total
        elif "total" not in self.tqdm_kwargs:
            self.tqdm_kwargs["total"] = total_length

        if "disable" not in self.tqdm_kwargs:
            self.tqdm_kwargs["disable"] = self.no_progress

        runner = JobRunner._parallel_helper(**self.job_kwargs)
        results = runner(**self.tqdm_kwargs)(
            delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
        )

        return results

    def __call__(self, *args, **kwargs):
        """
        Run job using the n_jobs attribute to determine regime
        """
        if self.is_sequential:
            return self.sequential(*args, **kwargs)
        return self.parallel(*args, **kwargs)

    @staticmethod
    def _parallel_helper(**joblib_args):
        r"""
        Parallel helper function for joblib with tqdm support
        """

        def run(**tq_args):
            def tmp(op_iter):
                with _tqdm_callback(tqdm(**tq_args)):
                    return Parallel(**joblib_args)(op_iter)

            return tmp

        return run

    @staticmethod
    def get_iterator_length(data):
        """Attempt to get the length of an iterator"""
        total_length = None
        try:
            total_length = len(data)
        except TypeError:
            # most likely a generator, ignore
            pass
        return total_length

is_sequential property

Check whether the job is sequential or parallel

__call__(*args, **kwargs)

Run job using the n_jobs attribute to determine regime

Source code in datamol/utils/jobs.py
153
154
155
156
157
158
159
def __call__(self, *args, **kwargs):
    """
    Run job using the n_jobs attribute to determine regime
    """
    if self.is_sequential:
        return self.sequential(*args, **kwargs)
    return self.parallel(*args, **kwargs)

__init__(n_jobs=-1, batch_size='auto', prefer=None, progress=False, total=None, tqdm_kwargs=None, **job_kwargs)

JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which allows taking advantage of its features, while the progress bar use tqdm

Parameters:

Name Type Description Default
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
batch_size Union[int, str]

Whether to batch inputs_list. You can specify batch_size when the length of inputs_list is very large (>100k elements). By default, the auto batching of joblib is used.

'auto'
prefer Optional[str]

Choose from ['processes', 'threads'] or None. Default to None. Soft hint to choose the default backend if no specific backend was selected with the parallel_backend context manager. The default process-based backend is 'loky' and the default thread-based backend is 'threading'. Ignored if the backend parameter is specified.

None
progress bool

whether to display progress bar

False
total Optional[int]

The number of elements in the iterator. Only used when progress is True.

None
tqdm_kwargs Optional[dict]

Any additional arguments supported by the tqdm progress bar.

None
**job_kwargs Any

Any additional arguments supported by joblib.Parallel.

{}

Example:

import datamol as dm
runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
results = runner(lambda x: x**2, [1, 2, 3, 4])
Source code in datamol/utils/jobs.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(
    self,
    n_jobs: Optional[int] = -1,
    batch_size: Union[int, str] = "auto",
    prefer: Optional[str] = None,
    progress: bool = False,
    total: Optional[int] = None,
    tqdm_kwargs: Optional[dict] = None,
    **job_kwargs: Any,
):
    """
    JobRunner with sequential/parallel regimes. The multiprocessing backend use joblib which
    allows taking advantage of its features, while the progress bar use tqdm

    Args:
        n_jobs: Number of process. Use 0 or None to force sequential.
            Use -1 to use all the available processors. For details see
            https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        batch_size: Whether to  batch `inputs_list`. You can specify batch_size when the length
            of `inputs_list` is very large (>100k elements). By default, the auto batching of joblib is used.
        prefer: Choose from ['processes', 'threads'] or None. Default to None.
            Soft hint to choose the default backend if no specific backend
            was selected with the parallel_backend context manager. The
            default process-based backend is 'loky' and the default
            thread-based backend is 'threading'. Ignored if the ``backend``
            parameter is specified.
        progress: whether to display progress bar
        total: The number of elements in the iterator. Only used when `progress` is True.
        tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
        **job_kwargs: Any additional arguments supported by `joblib.Parallel`.

    Example:

    ```python
    import datamol as dm
    runner = dm.JobRunner(n_jobs=4, progress=True, prefer="threads")
    results = runner(lambda x: x**2, [1, 2, 3, 4])
    ```
    """

    self.n_jobs = n_jobs
    self.batch_size = batch_size
    self.prefer = prefer
    self.job_kwargs = job_kwargs
    self.job_kwargs.update(n_jobs=self.n_jobs, prefer=self.prefer, batch_size=self.batch_size)
    self.no_progress = not progress
    self.total = total
    self.tqdm_kwargs = tqdm_kwargs or {}

get_iterator_length(data) staticmethod

Attempt to get the length of an iterator

Source code in datamol/utils/jobs.py
176
177
178
179
180
181
182
183
184
185
@staticmethod
def get_iterator_length(data):
    """Attempt to get the length of an iterator"""
    total_length = None
    try:
        total_length = len(data)
    except TypeError:
        # most likely a generator, ignore
        pass
    return total_length

parallel(callable_fn, data, arg_type=None, **fn_kwargs)

Run job in parallel

Parameters:

Name Type Description Default
callable_fn callable

function to call

required
data iterable

input data

required
arg_type str

function argument type ('arg'/None or 'args' or 'kwargs')

None
**fn_kwargs dict

optional keyword argument to pass to the callable funciton

{}
Source code in datamol/utils/jobs.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def parallel(
    self,
    callable_fn: Callable,
    data: Iterable[Any],
    arg_type: Optional[str] = None,
    **fn_kwargs,
):
    """
    Run job in parallel

    Args:
        callable_fn (callable): function to call
        data (iterable): input data
        arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
        **fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
    """

    total_length = JobRunner.get_iterator_length(data)

    if self.total is not None:
        self.tqdm_kwargs["total"] = self.total
    elif "total" not in self.tqdm_kwargs:
        self.tqdm_kwargs["total"] = total_length

    if "disable" not in self.tqdm_kwargs:
        self.tqdm_kwargs["disable"] = self.no_progress

    runner = JobRunner._parallel_helper(**self.job_kwargs)
    results = runner(**self.tqdm_kwargs)(
        delayed(JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs))(dt) for dt in data
    )

    return results

sequential(callable_fn, data, arg_type=None, **fn_kwargs)

Run job in sequential version

Parameters:

Name Type Description Default
callable_fn callable

function to call

required
data iterable

input data

required
arg_type str

function argument type ('arg'/None or 'args' or 'kwargs')

None
**fn_kwargs dict

optional keyword argument to pass to the callable funciton

{}
Source code in datamol/utils/jobs.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def sequential(
    self,
    callable_fn: Callable,
    data: Iterable[Any],
    arg_type: Optional[str] = None,
    **fn_kwargs,
):
    """
    Run job in sequential version

    Args:
        callable_fn (callable): function to call
        data (iterable): input data
        arg_type (str, optional): function argument type ('arg'/None or 'args' or 'kwargs')
        **fn_kwargs (dict, optional): optional keyword argument to pass to the callable funciton
    """
    total_length = JobRunner.get_iterator_length(data)

    if self.total is not None:
        self.tqdm_kwargs["total"] = self.total
    elif "total" not in self.tqdm_kwargs:
        self.tqdm_kwargs["total"] = total_length

    if "disable" not in self.tqdm_kwargs:
        self.tqdm_kwargs["disable"] = self.no_progress

    results = [
        JobRunner.wrap_fn(callable_fn, arg_type, **fn_kwargs)(dt)
        for dt in tqdm(data, **self.tqdm_kwargs)
    ]
    return results

wrap_fn(fn, arg_type=None, **fn_kwargs) staticmethod

Small wrapper around a callable to properly format it's argument

Source code in datamol/utils/jobs.py
73
74
75
76
77
78
79
80
81
82
83
84
85
@staticmethod
def wrap_fn(fn: Callable, arg_type: Optional[str] = None, **fn_kwargs):
    """Small wrapper around a callable to properly format it's argument"""

    def _run(args: Any):
        if arg_type == "kwargs":
            fn_kwargs.update(**args)
            return fn(**fn_kwargs)
        elif arg_type == "args":
            return fn(*args, **fn_kwargs)
        return fn(args, **fn_kwargs)

    return _run

parallelized(fn, inputs_list, scheduler='processes', n_jobs=-1, batch_size='auto', progress=False, arg_type='arg', total=None, tqdm_kwargs=None, **job_kwargs)

Run a function in parallel.

Parameters:

Name Type Description Default
fn Callable

The function to run in parallel.

required
inputs_list Iterable[Any]

List of inputs to pass to fn.

required
scheduler str

Choose between ["processes", "threads"]. Defaults to None which uses the default joblib "loky" scheduler.

'processes'
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
batch_size Union[int, str]

Whether to automatically batch inputs_list. You should only use it when the length of inputs_list is very large (>100k elements). The length of inputs_list must also be defined.

'auto'
progress bool

Display a progress bar. Defaults to False.

False
arg_type str

One of ["arg", "args", "kwargs]: - "arg": the input is passed as an argument: fn(arg) (default). - "args": the input is passed as a list: fn(*args). - "kwargs": the input is passed as a map: fn(**kwargs).

'arg'
total Optional[int]

The number of elements in the iterator. Only used when progress is True.

None
tqdm_kwargs Optional[dict]

Any additional arguments supported by the tqdm progress bar.

None
**job_kwargs Any

Any additional arguments supported by joblib.Parallel.

{}

Returns:

Type Description
Sequence[Optional[Any]]

The results of the execution as a list.

Source code in datamol/utils/jobs.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
def parallelized(
    fn: Callable,
    inputs_list: Iterable[Any],
    scheduler: str = "processes",
    n_jobs: Optional[int] = -1,
    batch_size: Union[int, str] = "auto",
    progress: bool = False,
    arg_type: str = "arg",
    total: Optional[int] = None,
    tqdm_kwargs: Optional[dict] = None,
    **job_kwargs: Any,
) -> Sequence[Optional[Any]]:
    """Run a function in parallel.

    Args:
        fn: The function to run in parallel.
        inputs_list: List of inputs to pass to `fn`.
        scheduler: Choose between ["processes", "threads"]. Defaults
            to None which uses the default joblib "loky" scheduler.
        n_jobs: Number of process. Use 0 or None to force sequential.
                Use -1 to use all the available processors. For details see
                https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        batch_size: Whether to automatically batch `inputs_list`. You should only use it when the length
            of `inputs_list` is very large (>100k elements). The length of `inputs_list` must also be
            defined.
        progress: Display a progress bar. Defaults to False.
        arg_type: One of ["arg", "args", "kwargs]:
            - "arg": the input is passed as an argument: `fn(arg)` (default).
            - "args": the input is passed as a list: `fn(*args)`.
            - "kwargs": the input is passed as a map: `fn(**kwargs)`.
        total: The number of elements in the iterator. Only used when `progress` is True.
        tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
        **job_kwargs: Any additional arguments supported by `joblib.Parallel`.

    Returns:
        The results of the execution as a list.
    """

    runner = JobRunner(
        n_jobs=n_jobs,
        batch_size=batch_size,
        progress=progress,
        prefer=scheduler,
        total=total,
        tqdm_kwargs=tqdm_kwargs,
        **job_kwargs,
    )
    return runner(fn, inputs_list, arg_type=arg_type)

parallelized_with_batches(fn, inputs_list, batch_size, scheduler='processes', n_jobs=-1, progress=False, arg_type='arg', total=None, tqdm_kwargs=None, flatten_results=True, joblib_batch_size='auto', **job_kwargs)

Run a function in parallel using batches.

Parameters:

Name Type Description Default
fn Callable

The function to run in parallel. It must accept a batch of inputs_list.

required
inputs_list Iterable[Any]

List of inputs to pass to fn.

required
batch_size int

Batch size on which to run fn.

required
scheduler str

Choose between ["processes", "threads"]. Defaults to None which uses the default joblib "loky" scheduler.

'processes'
n_jobs Optional[int]

Number of process. Use 0 or None to force sequential. Use -1 to use all the available processors. For details see https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation

-1
progress bool

Display a progress bar. Defaults to False.

False
arg_type str

One of ["arg", "args", "kwargs]: - "arg": the input is passed as an argument: fn(arg) (default). - "args": the input is passed as a list: fn(*args). - "kwargs": the input is passed as a map: fn(**kwargs).

'arg'
total Optional[int]

The number of elements in the iterator. Only used when progress is True.

None
tqdm_kwargs Optional[dict]

Any additional arguments supported by the tqdm progress bar.

None
flatten_results bool

Whether to flatten the results.

True
joblib_batch_size Union[int, str]

It corresponds to the batch_size argument of dm.parallelized that is forwarded to joblib.Parallel under the hood.

'auto'
**job_kwargs Any

Any additional arguments supported by joblib.Parallel.

{}

Returns:

Type Description
Sequence[Optional[Any]]

The results of the execution as a list.

Source code in datamol/utils/jobs.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def parallelized_with_batches(
    fn: Callable,
    inputs_list: Iterable[Any],
    batch_size: int,
    scheduler: str = "processes",
    n_jobs: Optional[int] = -1,
    progress: bool = False,
    arg_type: str = "arg",
    total: Optional[int] = None,
    tqdm_kwargs: Optional[dict] = None,
    flatten_results: bool = True,
    joblib_batch_size: Union[int, str] = "auto",
    **job_kwargs: Any,
) -> Sequence[Optional[Any]]:
    """Run a function in parallel using batches.

    Args:
        fn: The function to run in parallel. It must accept a batch of `inputs_list`.
        inputs_list: List of inputs to pass to `fn`.
        batch_size: Batch size on which to run `fn`.
        scheduler: Choose between ["processes", "threads"]. Defaults
            to None which uses the default joblib "loky" scheduler.
        n_jobs: Number of process. Use 0 or None to force sequential.
                Use -1 to use all the available processors. For details see
                https://joblib.readthedocs.io/en/latest/parallel.html#parallel-reference-documentation
        progress: Display a progress bar. Defaults to False.
        arg_type: One of ["arg", "args", "kwargs]:
            - "arg": the input is passed as an argument: `fn(arg)` (default).
            - "args": the input is passed as a list: `fn(*args)`.
            - "kwargs": the input is passed as a map: `fn(**kwargs)`.
        total: The number of elements in the iterator. Only used when `progress` is True.
        tqdm_kwargs: Any additional arguments supported by the `tqdm` progress bar.
        flatten_results: Whether to flatten the results.
        joblib_batch_size: It corresponds to the `batch_size` argument of `dm.parallelized` that
            is forwarded to `joblib.Parallel` under the hood.
        **job_kwargs: Any additional arguments supported by `joblib.Parallel`.

    Returns:
        The results of the execution as a list.
    """

    def _batch_iterator(n: int, iterable: Iterable):
        it = iter(iterable)
        while True:
            chunk_it = itertools.islice(it, n)
            try:
                first_el = next(chunk_it)
            except StopIteration:
                return
            yield list(itertools.chain((first_el,), chunk_it))

    # Compute the total number of batches of possible
    if total is not None:
        n_batches = total // batch_size
        n_batches = max(n_batches, 1)
    elif isinstance(inputs_list, collections.abc.Sized):
        n_batches = len(inputs_list) // batch_size
        n_batches = max(n_batches, 1)
    else:
        n_batches = None

    # Make an iterator over batches so it works even with Iterator without a defined length
    input_chunks = _batch_iterator(batch_size, inputs_list)

    runner = JobRunner(
        n_jobs=n_jobs,
        batch_size=joblib_batch_size,
        progress=progress,
        prefer=scheduler,
        total=n_batches,
        tqdm_kwargs=tqdm_kwargs,
        **job_kwargs,
    )
    results = runner(fn, input_chunks, arg_type=arg_type)

    # Flatten the results
    if flatten_results:
        results = [item for sublist in results for item in sublist]

    return results

watch_duration

A Python decorator to measure execution time with logging capability.

Parameters:

Name Type Description Default
log bool

Whether to log the measured duration.

True
log_human_duration bool

Whether to log duration in a human way depending on the amount.

True

Example:

def fn(n):
    for i in range(n):
        print(i)
        time.sleep(0.2)

with dm.utils.perf.watch_duration(log=True) as w:
    fn(5)

print(w.duration)
Source code in datamol/utils/perf.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class watch_duration:
    """A Python decorator to measure execution time with logging capability.

    Args:
        log: Whether to log the measured duration.
        log_human_duration: Whether to log duration in a human way
            depending on the amount.

    Example:

    ```python
    def fn(n):
        for i in range(n):
            print(i)
            time.sleep(0.2)

    with dm.utils.perf.watch_duration(log=True) as w:
        fn(5)

    print(w.duration)
    ```
    """

    def __init__(self, log: bool = True, log_human_duration: bool = True):
        self.log = log
        self.log_human_duration = log_human_duration

        self.start = None
        self.end = None
        self.duration = None
        self.duration_minutes = None

    def __enter__(self):
        self.start = time.time()
        return self

    def __exit__(self, *_):
        assert self.start is not None

        self.end = time.time()
        self.duration = self.end - self.start
        self.duration_minutes = self.duration / 60

        if self.log:
            if self.log_human_duration:
                logger.info(f"Duration {human_duration(self.duration)}.")
            else:
                logger.info(f"Duration {self.duration_minutes:.2f} minutes")