Skip to content

datamol.utils.fs

The fs module makes it easier to work with all type of path (the ones supported by fsspec).

copy_dir(source, destination, force=False, progress=False, leave_progress=True, file_progress=False, file_leave_progress=False, chunk_size=None)

Copy one directory to another location across different filesystem (local, S3, GCS, etc).

Note that if both FS from source and destination are the same, progress won't be shown.

Parameters:

Name Type Description Default
source Union[str, Path]

Path to the source directory.

required
destination Union[str, Path]

Path to the destination directory.

required
chunk_size Optional[int]

the chunk size to use. If progress is enabled the chunk size is None, it is set to 2048.

None
force bool

whether to overwrite the destination directory if it exists.

False
progress bool

Whether to display a progress bar.

False
leave_progress bool

Whether to hide the progress bar once the copy is done.

True
file_progress bool

Whether to display a progress bar for each file.

False
file_leave_progress bool

Whether to hide the progress bar once a file copy is done.

False
chunk_size Optional[int]

See dm.utils.fs.copy_file.

None
Source code in datamol/utils/fs.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def copy_dir(
    source: Union[str, pathlib.Path],
    destination: Union[str, pathlib.Path],
    force: bool = False,
    progress: bool = False,
    leave_progress: bool = True,
    file_progress: bool = False,
    file_leave_progress: bool = False,
    chunk_size: Optional[int] = None,
):
    """Copy one directory to another location across different filesystem (local, S3, GCS, etc).

    Note that if both FS from source and destination are the same, progress won't be shown.

    Args:
        source: Path to the source directory.
        destination: Path to the destination directory.
        chunk_size: the chunk size to use. If progress is enabled the chunk
            size is `None`, it is set to 2048.
        force: whether to overwrite the destination directory if it exists.
        progress: Whether to display a progress bar.
        leave_progress: Whether to hide the progress bar once the copy is done.
        file_progress: Whether to display a progress bar for each file.
        file_leave_progress: Whether to hide the progress bar once a file copy is done.
        chunk_size: See `dm.utils.fs.copy_file`.
    """

    source = str(source)
    destination = str(destination)

    source_fs = get_mapper(source).fs
    destination_fs = get_mapper(destination).fs

    # Sanity check
    if not is_dir(source):
        raise ValueError(
            f"The directory being copied does not exist or is not a directory: {source}"
        )

    if not force and is_dir(destination):
        raise ValueError(f"The destination folder to copy already exists: {destination}")

    # If both fs are the same then we just rely on the internal `copy` method
    # which is much faster.
    if destination_fs.__class__ == source_fs.__class__:
        source_fs.copy(source, destination, recursive=True)
        return

    # Get all input paths with details
    # NOTE(hadim): we could have use `.glob(..., detail=True)` here but that API is inconsistent
    # between the backends resulting in different object types being returned (dict, list, etc).
    detailed_paths = source_fs.find(source, withdirs=True, detail=True)
    detailed_paths = list(detailed_paths.values())

    # Get list of input types
    input_types = [d["type"] for d in detailed_paths]

    # Get list of input path + add protocol if needed
    input_paths = [d["name"] for d in detailed_paths]
    input_paths = [fsspec.utils._unstrip_protocol(p, source_fs) for p in input_paths]

    # Build all the output paths
    output_paths: List[str] = fsspec.utils.other_paths(input_paths, destination)  # type: ignore

    def _copy_source_to_destination(input_path, input_type, output_path):
        # A directory
        if input_type == "directory":
            destination_fs.mkdir(output_path)

        # A file
        else:
            copy_file(
                input_path,
                output_path,
                force=force,
                progress=file_progress,
                leave_progress=file_leave_progress,
                chunk_size=chunk_size,
            )

    # Copy source files/directories to destination in parallel
    parallelized(
        _copy_source_to_destination,
        inputs_list=list(zip(input_paths, input_types, output_paths)),
        arg_type="args",
        progress=progress,
        tqdm_kwargs=dict(leave=leave_progress),
        scheduler="threads",
    )

copy_file(source, destination, chunk_size=None, force=False, progress=False, leave_progress=True)

Copy one file to another location across different filesystem (local, S3, GCS, etc).

Parameters:

Name Type Description Default
source Union[str, Path, IOBase, OpenFile]

path or file-like object to copy from.

required
destination Union[str, Path, IOBase, OpenFile]

path or file-like object to copy to.

required
chunk_size Optional[int]

the chunk size to use. If progress is enabled the chunk size is None, it is set to 1MB (1024 * 1024).

None
force bool

whether to overwrite the destination file if it exists.

False
progress bool

whether to display a progress bar.

False
leave_progress bool

whether to hide the progress bar once the copy is done.

True
Source code in datamol/utils/fs.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def copy_file(
    source: Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile],
    destination: Union[str, pathlib.Path, io.IOBase, fsspec.core.OpenFile],
    chunk_size: Optional[int] = None,
    force: bool = False,
    progress: bool = False,
    leave_progress: bool = True,
):
    """Copy one file to another location across different filesystem (local, S3, GCS, etc).

    Args:
        source: path or file-like object to copy from.
        destination: path or file-like object to copy to.
        chunk_size: the chunk size to use. If progress is enabled the chunk
            size is `None`, it is set to 1MB (1024 * 1024).
        force: whether to overwrite the destination file if it exists.
        progress: whether to display a progress bar.
        leave_progress: whether to hide the progress bar once the copy is done.
    """

    if progress and chunk_size is None:
        chunk_size = 1024 * 1024

    if isinstance(source, (str, os.PathLike)):
        source_file = fsspec.open(str(source), "rb")
    else:
        source_file = source

    if isinstance(destination, (str, os.PathLike)):
        # adapt the file mode of the destination depending on the source file.
        destination_mode = "wb"
        if hasattr(source_file, "mode"):
            destination_mode = "wb" if "b" in getattr(source_file, "mode") else "w"
        elif isinstance(source_file, io.BytesIO):
            destination_mode = "wb"
        elif isinstance(source_file, io.StringIO):
            destination_mode = "w"

        destination_file = fsspec.open(str(destination), destination_mode)
    else:
        destination_file = destination

    if not is_file(source_file):  # type: ignore
        raise ValueError(f"The file being copied does not exist or is not a file: {source}")

    if not force and is_file(destination_file):  # type: ignore
        raise ValueError(f"The destination file to copy already exists: {destination}")

    with source_file as source_stream:
        with destination_file as destination_stream:
            if chunk_size is None:
                # copy without chunks
                destination_stream.write(source_stream.read())  # type: ignore

            else:
                # copy with chunks

                # determine the size of the source file
                source_size = None
                if progress:
                    source_size = get_size(source)

                pbar = None
                if progress:
                    tqdm = _import_tqdm()

                    if tqdm is None:
                        raise ImportError(
                            "If the progress bar is enabled, you must have `tqdm` "
                            "installed: `conda install tqdm`."
                        )
                    else:
                        # init progress bar
                        pbar = tqdm(
                            total=source_size,
                            leave=leave_progress,
                            disable=not progress,
                            unit="B",
                            unit_divisor=1024,
                            unit_scale=True,
                        )

                # start the loop
                while True:
                    data = source_stream.read(chunk_size)  # type: ignore
                    if not data:
                        break
                    destination_stream.write(data)  # type: ignore

                    if pbar is not None:
                        pbar.update(chunk_size)

                if pbar is not None:
                    pbar.close()

exists(path)

Check whether a file or a directory exists.

Important: File-like object always exists.

Parameters:

Name Type Description Default
path Union[str, PathLike, OpenFile, IOBase]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
82
83
84
85
86
87
88
89
90
def exists(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
    """Check whether a file or a directory exists.

    Important: File-like object always exists.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    return is_file(path) or is_dir(path)

get_basename(path)

Get the basename of a file or a folder.

Parameters:

Name Type Description Default
path Union[str, PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
60
61
62
63
64
65
66
67
68
69
def get_basename(path: Union[str, os.PathLike]):
    """Get the basename of a file or a folder.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    path = str(path)
    mapper = get_mapper(path)
    clean_path = path.rstrip(mapper.fs.sep)
    return str(clean_path).split(mapper.fs.sep)[-1]

get_cache_dir(app_name, suffix=None, create=True)

Get a local cache directory for a given application name.

Parameters:

Name Type Description Default
app_name str

The name of the application.

required
suffix Optional[str]

A subdirectory appended to the cache dir.

None
create bool

Whether to create the directory and its parents if it does not already exist.

True
Source code in datamol/utils/fs.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def get_cache_dir(app_name: str, suffix: Optional[str] = None, create: bool = True):
    """Get a local cache directory for a given application name.

    Args:
        app_name: The name of the application.
        suffix: A subdirectory appended to the cache dir.
        create: Whether to create the directory and its parents if it does not
            already exist.
    """

    cache_dir = pathlib.Path(platformdirs.user_cache_dir(appname=app_name))

    if suffix is not None:
        cache_dir /= suffix

    if create:
        cache_dir.mkdir(exist_ok=True, parents=True)

    return cache_dir

get_extension(path)

Get the extension of a file.

Parameters:

Name Type Description Default
path Union[str, PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
72
73
74
75
76
77
78
79
def get_extension(path: Union[str, os.PathLike]):
    """Get the extension of a file.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    basename = get_basename(path)
    return basename.split(".")[-1]

get_mapper(path)

Get the fsspec mapper.

Parameters:

Name Type Description Default
path Union[str, PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
51
52
53
54
55
56
57
def get_mapper(path: Union[str, os.PathLike]):
    """Get the fsspec mapper.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    return fsspec.get_mapper(str(path))

get_protocol(path, fs=None)

Return the name of the path protocol.

Parameters:

Name Type Description Default
path Union[str, PathLike]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_protocol(path: Union[str, os.PathLike], fs: Optional[fsspec.AbstractFileSystem] = None):
    """Return the name of the path protocol.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """

    if fs is None:
        fs = get_mapper(path).fs

    protocol = fs.protocol  # type: ignore

    if "s3" in protocol:
        return "s3"
    elif "gs" in protocol:
        return "gs"
    elif isinstance(protocol, (tuple, list)):
        return protocol[0]
    return protocol

get_size(file)

Get the size of a file given its path. Return None if the size can't be retrieved.

Source code in datamol/utils/fs.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def get_size(file: Union[str, os.PathLike, io.IOBase, fsspec.core.OpenFile]) -> Optional[int]:
    """Get the size of a file given its path. Return None if the
    size can't be retrieved.
    """

    if isinstance(file, io.IOBase) and hasattr(file, "name"):
        fs_local = fsspec.filesystem("file")
        file_size = fs_local.size(getattr(file, "name"))

    elif isinstance(file, (str, os.PathLike)):
        fs = get_mapper(str(file)).fs
        file_size = fs.size(str(file))

    elif isinstance(file, fsspec.core.OpenFile):
        file_size = file.fs.size(file.path)

    else:
        file_size = None

    return file_size

glob(path, detail=False, **kwargs)

Find files by glob-matching.

Parameters:

Name Type Description Default
path str

A glob-style path.

required
Source code in datamol/utils/fs.py
310
311
312
313
314
315
316
317
318
319
320
321
def glob(path: str, detail: bool = False, **kwargs) -> List[str]:
    """Find files by glob-matching.

    Args:
        path: A glob-style path.
    """
    # Get the list of paths
    path = str(path)
    fs = get_mapper(path).fs
    paths = fs.glob(path, detail=detail, **kwargs)
    paths = [fsspec.utils._unstrip_protocol(d, fs) for d in paths]
    return paths

is_dir(path)

Check whether a file exists.

Parameters:

Name Type Description Default
path Union[str, PathLike, OpenFile, IOBase]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def is_dir(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
    """Check whether a file exists.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    if isinstance(path, fsspec.core.OpenFile):
        return path.fs.isdir(path.path)

    elif isinstance(path, (str, os.PathLike)):
        mapper = get_mapper(str(path))
        return mapper.fs.isdir(str(path))

    else:
        return False

is_file(path)

Check whether a file exists.

Parameters:

Name Type Description Default
path Union[str, PathLike, OpenFile, IOBase]

a path supported by fsspec such as local, s3, gcs, etc.

required
Source code in datamol/utils/fs.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def is_file(path: Union[str, os.PathLike, fsspec.core.OpenFile, io.IOBase]):
    """Check whether a file exists.

    Args:
        path: a path supported by `fsspec` such as local, s3, gcs, etc.
    """
    if isinstance(path, fsspec.core.OpenFile):
        return path.fs.isfile(path.path)

    elif isinstance(path, (str, os.PathLike)):
        mapper = get_mapper(str(path))
        return mapper.fs.isfile(str(path))

    else:
        return False

is_local_path(path)

Check whether a path is local.

Source code in datamol/utils/fs.py
148
149
150
def is_local_path(path: Union[str, os.PathLike]):
    """Check whether a path is local."""
    return get_protocol(str(path)) == "file"

join(*paths)

Join paths together. The first element determine the filesystem to use (and so the separator.

Parameters:

Name Type Description Default
*paths str

a list of paths supported by fsspec such as local, s3, gcs, etc.

()
Source code in datamol/utils/fs.py
153
154
155
156
157
158
159
160
161
162
163
164
def join(*paths: str):
    """Join paths together. The first element determine the
    filesystem to use (and so the separator.

    Args:
        *paths: a list of paths supported by `fsspec` such as local, s3, gcs, etc.
    """
    _paths = [str(path).rstrip("/") for path in paths]
    source_path = _paths[0]
    fs = get_mapper(source_path).fs
    full_path = fs.sep.join(_paths)
    return full_path

md5(filepath)

Return the md5 hash of a file.

Parameters:

Name Type Description Default
filepath Union[str, PathLike]

The path to the file to compute the MD5 hash on.

required
Source code in datamol/utils/fs.py
297
298
299
300
301
302
303
304
305
306
307
def md5(filepath: Union[str, os.PathLike]):
    """Return the md5 hash of a file.

    Args:
        filepath: The path to the file to compute the MD5 hash on.
    """
    with fsspec.open(filepath) as f:
        file_hash = hashlib.md5()
        file_hash.update(f.read())  # type: ignore
        file_hash_str = file_hash.hexdigest()
    return file_hash_str

mkdir(dir_path, exist_ok=False)

Create a directory.

Parameters:

Name Type Description Default
dir_path Union[str, PathLike]

The path of the directory to create.

required
exist_ok bool

Whether to ignore the error if the directory already exists.

False
Source code in datamol/utils/fs.py
285
286
287
288
289
290
291
292
293
294
def mkdir(dir_path: Union[str, os.PathLike], exist_ok: bool = False):
    """Create a directory.

    Args:
        dir_path: The path of the directory to create.
        exist_ok: Whether to ignore the error if the directory
            already exists.
    """
    fs = get_mapper(str(dir_path)).fs
    fs.mkdirs(str(dir_path), exist_ok=exist_ok)