Coverage for src/accsr/remote_storage.py : 89%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import glob
2import json
3import logging.handlers
4import os
5import re
6from contextlib import contextmanager
7from copy import copy
8from dataclasses import asdict, dataclass, field, is_dataclass
9from enum import Enum
10from functools import cached_property
11from pathlib import Path
12from typing import (
13 Callable,
14 Dict,
15 Generator,
16 List,
17 Literal,
18 Optional,
19 Pattern,
20 Protocol,
21 Tuple,
22 Union,
23 cast,
24 runtime_checkable,
25)
27from libcloud.storage.base import Container, StorageDriver
28from libcloud.storage.providers import get_driver
29from libcloud.storage.types import (
30 ContainerAlreadyExistsError,
31 InvalidContainerNameError,
32)
33from tqdm import tqdm
35from accsr.files import md5sum
37log = logging.getLogger(__name__)
40def _to_optional_pattern(regex: Optional[Union[str, Pattern]]) -> Optional[Pattern]:
41 if isinstance(regex, str):
42 return re.compile(regex)
43 return regex
46class _SummariesJSONEncoder(json.JSONEncoder):
47 def default(self, o):
48 if isinstance(o, TransactionSummary):
49 # special case for TransactionSummary, since the drivers are not serializable and dataclasses.asdict
50 # calls deepcopy
51 result = copy(o.__dict__)
52 _replace_driver_by_name(result)
53 return result
54 if is_dataclass(o):
55 return asdict(o)
56 if isinstance(o, RemoteObjectProtocol):
57 result = copy(o.__dict__)
58 _replace_driver_by_name(result)
59 return result
60 if isinstance(o, SyncObject):
61 return o.to_dict(make_serializable=True)
62 return str(o)
65def _replace_driver_by_name(obj):
66 # The driver object from libcloud stores a connection and is not serializable.
67 # Since sometimes we want to be able to deepcopy these things around,
68 # we replace the driver by its name. This is needed for `asdict` to work.
69 if isinstance(obj, RemoteObjectProtocol) and hasattr(obj, "driver"):
70 obj.driver = obj.driver.name # type: ignore
71 if isinstance(obj, list) or isinstance(obj, tuple):
72 for item in obj:
73 _replace_driver_by_name(item)
74 if isinstance(obj, dict):
75 for key, value in obj.items():
76 _replace_driver_by_name(value)
79class _JsonReprMixin:
80 def to_json(self) -> str:
81 return json.dumps(self, indent=2, sort_keys=True, cls=_SummariesJSONEncoder)
83 def __repr__(self):
84 return f"\n{self.__class__.__name__}: \n{self.to_json()}\n"
87@contextmanager
88def _switch_to_dir(path: Optional[str] = None) -> Generator[None, None, None]:
89 if path:
90 cur_dir = os.getcwd()
91 try:
92 os.chdir(path)
93 yield
94 finally:
95 os.chdir(cur_dir)
96 else:
97 yield
100class Provider(str, Enum):
101 GOOGLE_STORAGE = "google_storage"
102 S3 = "s3"
103 AZURE_BLOBS = "azure_blobs"
106@runtime_checkable
107class RemoteObjectProtocol(Protocol):
108 """
109 Protocol of classes that describe remote objects. Describes information about the remote object and functionality
110 to download the object.
111 """
113 name: str
114 size: int
115 hash: int
116 provider: Union[Provider, str]
118 def download(
119 self, download_path, overwrite_existing=False
120 ) -> Optional["RemoteObjectProtocol"]:
121 pass
124class SyncObject(_JsonReprMixin):
125 """
126 Class representing the sync-status between a local path and a remote object. Is mainly used for creating
127 summaries and syncing within RemoteStorage and for introspection before and after push/pull transactions.
129 It is not recommended creating or manipulating instances of this class outside RemoteStorage, in particular
130 in user code. This class forms part of the public interface because instances of it are given to users for
131 introspection.
132 """
134 def __init__(
135 self,
136 sync_direction: Literal["push", "pull"],
137 local_path: Optional[str] = None,
138 remote_obj: Optional[RemoteObjectProtocol] = None,
139 remote_path: Optional[str] = None,
140 remote_obj_overridden_md5_hash: Optional[int] = None,
141 ):
142 """
143 :param sync_direction: the synchronisation direction
144 :param local_path: path to the local file
145 :param remote_obj: remote object
146 :param remote_path: path to the remote file (always in linux style)
147 :param remote_obj_overridden_md5_hash: pass this to override the hash of the remote object
148 (by default, the hash attribute of the remote object is used).
149 Setting this might be useful for Azure blob storage, as uploads to it may be chunked,
150 and the md5 hash of the remote object becomes different from the hash of the local file.
151 The hash is used to check if the local and remote files are equal.
152 """
153 self.sync_direction = sync_direction
154 if remote_path is not None:
155 remote_path = remote_path.lstrip("/")
156 if remote_obj is not None:
157 remote_obj = copy(remote_obj)
158 remote_obj.name = remote_obj.name.lstrip("/")
160 self.exists_locally = False
161 self.local_path = None
162 self.set_local_path(local_path)
164 if self.local_path is None and remote_obj is None:
165 raise ValueError(
166 f"Either a local path or a remote object has to be passed."
167 )
169 self.remote_obj = remote_obj
171 if remote_path is not None:
172 if remote_obj is not None and remote_obj.name != remote_path:
173 raise ValueError(
174 f"Passed both remote_path and remote_obj but the paths don't agree: "
175 f"{remote_path} != {remote_obj.name}"
176 )
177 self.remote_path = remote_path
178 else:
179 if remote_obj is None:
180 raise ValueError(f"Either remote_path or remote_obj should be not None")
181 self.remote_path = remote_obj.name
183 if self.exists_locally:
184 assert self.local_path is not None
185 self.local_size = os.path.getsize(self.local_path)
186 self.local_hash = md5sum(self.local_path)
187 else:
188 self.local_size = 0
189 self.local_hash = None
191 if remote_obj_overridden_md5_hash is not None:
192 if remote_obj is None:
193 raise ValueError(
194 "remote_obj_overridden_md5_hash can only be set if remote_obj is not None"
195 )
196 self.remote_hash = remote_obj_overridden_md5_hash
197 elif remote_obj is not None:
198 self.remote_hash = remote_obj.hash
199 else:
200 self.remote_hash = None
202 @property
203 def name(self):
204 return self.remote_path
206 @property
207 def exists_on_target(self) -> bool:
208 """
209 True iff the file exists on both locations
210 """
211 return self.exists_on_remote and self.exists_locally
213 def set_local_path(self, path: Optional[str]):
214 """
215 Changes the local path of the SyncObject
216 :param path:
217 :return: None
218 """
219 if path is not None:
220 local_path = os.path.abspath(path)
221 if os.path.isdir(local_path):
222 raise FileExistsError(
223 f"local_path needs to point to file but pointed to a directory: {local_path}"
224 )
225 self.local_path = local_path
226 self.exists_locally = os.path.isfile(local_path)
228 @property
229 def exists_on_remote(self):
230 return self.remote_obj is not None
232 @property
233 def equal_md5_hash_sum(self):
234 if self.exists_on_target:
235 return self.local_hash == self.remote_hash
236 return False
238 def get_bytes_transferred(self) -> int:
239 """
240 :return: the number of bytes (to be) transferred for this object
241 """
242 if self.sync_direction == "push":
243 if not self.exists_locally:
244 raise FileNotFoundError(
245 f"Cannot retrieve size of non-existing file: {self.local_path}"
246 )
247 return self.local_size
248 elif self.sync_direction == "pull":
249 if self.remote_obj is None:
250 raise FileNotFoundError(
251 f"Cannot retrieve size of non-existing remote object corresponding to: {self.local_path}"
252 )
253 return self.remote_obj.size
254 else:
255 raise RuntimeError(
256 f"Unknown sync direction: {self.sync_direction}. Can only be push or pull. This is likely a bug!"
257 )
259 def to_dict(self, make_serializable=True):
260 result = copy(self.__dict__)
261 if make_serializable:
262 _replace_driver_by_name(result)
264 result["exists_on_remote"] = self.exists_on_remote
265 result["exists_on_target"] = self.exists_on_target
266 result["equal_md5_hash_sum"] = self.equal_md5_hash_sum
267 return result
270@dataclass(repr=False)
271class TransactionSummary(_JsonReprMixin):
272 """
273 Class representing the summary of a push or pull operation. Is mainly used for introspection before and after
274 push/pull transactions.
276 It is not recommended creating or manipulate instances of this class outside RemoteStorage, in particular
277 in user code. This class forms part of the public interface because instances of it are given to users for
278 introspection.
279 """
281 matched_source_files: List[SyncObject] = field(default_factory=list)
282 not_on_target: List[SyncObject] = field(default_factory=list)
283 on_target_eq_md5: List[SyncObject] = field(default_factory=list)
284 on_target_neq_md5: List[SyncObject] = field(default_factory=list)
285 unresolvable_collisions: Dict[str, Union[List[RemoteObjectProtocol], str]] = field(
286 default_factory=dict
287 )
288 skipped_source_files: List[SyncObject] = field(default_factory=list)
290 synced_files: List[SyncObject] = field(default_factory=list)
291 sync_direction: Optional[Literal["push", "pull"]] = None
293 def __post_init__(self):
294 if self.sync_direction not in ["pull", "push", None]:
295 raise ValueError(
296 f"sync_direction can only be set to pull, push or None, instead got: {self.sync_direction}"
297 )
299 @property
300 def files_to_sync(self) -> List[SyncObject]:
301 """
302 Returns of files that need synchronization.
304 :return: list of all files that are not on the target or have different md5sums on target and remote
305 """
306 return self.not_on_target + self.on_target_neq_md5
308 def size_files_to_sync(self) -> int:
309 """
310 Computes the total size of all objects that need synchronization. Raises a RuntimeError if the sync_direction
311 property is not set to 'push' or 'pull'.
313 :return: the total size of all local objects that need synchronization if self.sync_direction='push' and
314 the size of all remote files that need synchronization if self.sync_direction='pull'
315 """
316 return sum(obj.get_bytes_transferred() for obj in self.files_to_sync)
318 @property
319 def requires_force(self) -> bool:
320 """
321 Getter of the requires_force property.
322 :return: True iff a failure of the transaction can only be prevented by setting force=True.
323 """
324 return len(self.on_target_neq_md5) != 0
326 @property
327 def has_unresolvable_collisions(self) -> bool:
328 """
329 Getter of the requires_force property.
330 :return: True iff there exists a collision that cannot be resolved.
331 """
332 return len(self.unresolvable_collisions) != 0
334 @property
335 def all_files_analyzed(self) -> List[SyncObject]:
336 """
337 Getter of the all_files_analyzed property.
338 :return: list of all analyzed source files
339 """
340 return self.skipped_source_files + self.matched_source_files
342 def add_entry(
343 self,
344 synced_object: SyncObject,
345 collides_with: Optional[Union[List[RemoteObjectProtocol], str]] = None,
346 skip: bool = False,
347 ):
348 """
349 Adds a SyncObject to the summary.
350 :param synced_object: either a SyncObject or a path to a local file.
351 :param collides_with: specification of unresolvable collisions for the given sync object
352 :param skip: if True, the object is marked to be skipped
353 :return: None
354 """
355 if skip:
356 self.skipped_source_files.append(synced_object)
357 else:
358 self.matched_source_files.append(synced_object)
359 if collides_with:
360 self.unresolvable_collisions[synced_object.name] = collides_with
361 elif synced_object.exists_on_target:
362 if synced_object.equal_md5_hash_sum:
363 self.on_target_eq_md5.append(synced_object)
364 else:
365 self.on_target_neq_md5.append(synced_object)
366 else:
367 self.not_on_target.append(synced_object)
369 def get_short_summary_dict(self):
370 """
371 Returns a short summary of the transaction as a dictionary.
372 """
373 return {
374 "sync_direction": self.sync_direction,
375 "files_to_sync": len(self.files_to_sync),
376 "total_size": self.size_files_to_sync(),
377 "unresolvable_collisions": len(self.unresolvable_collisions),
378 "synced_files": len(self.synced_files),
379 }
381 def print_short_summary(self):
382 """
383 Prints a short summary of the transaction (shorter than the full repr, which contains
384 information about local and remote objects).
385 """
386 print(json.dumps(self.get_short_summary_dict(), indent=2))
389@dataclass
390class RemoteStorageConfig:
391 """
392 Contains all necessary information to establish a connection
393 to a bucket within the remote storage, and the base path on the remote.
394 """
396 provider: str
397 key: str
398 bucket: str
399 secret: str = field(repr=False)
400 region: Optional[str] = None
401 host: Optional[str] = None
402 port: Optional[int] = None
403 base_path: str = ""
404 secure: bool = True
405 use_pbar: bool = True
406 """
407 whether to use progress bars which are printed to stderr.
408 If set to False, progress will instead be logged at the log level specified in :attr:`log_level`
409 """
410 log_level: int = logging.INFO
411 """
412 level at which to log progress for the case where `use_pbar` is disabled.
413 """
416class RemoteStorage:
417 """
418 Wrapper around lib-cloud for accessing remote storage services.
419 """
421 def __init__(
422 self,
423 conf: RemoteStorageConfig,
424 add_extra_to_upload: Optional[Callable[[SyncObject], dict]] = None,
425 remote_hash_extractor: Optional[Callable[[RemoteObjectProtocol], int]] = None,
426 ):
427 """
428 :param conf: configuration for the remote storage
429 :param add_extra_to_upload: a function that takes a `SyncObject` and returns a dictionary with extra parameters
430 that should be passed to the `upload_object` method of the storage driver as value of the `extra` kwarg.
431 This can be used to set custom metadata or other parameters. For example, for Azure blob storage, one can
432 set the hash of the local file as metadata by using
433 `add_extra_to_upload = lambda sync_object: {"meta_data": {"md5": sync_object.local_hash}}`.
434 :param remote_hash_extractor: a function that extracts the hash from a `RemoteObjectProtocol` object.
435 This is useful for Azure blob storage, as uploads to may be chunked, and the md5 hash of the remote object
436 becomes different from the hash of the local file. In that case, one can add the hash of the local file
437 to the metadata using `add_extra_to_upload`, and then use this function to extract the hash from the
438 remote object. If not set, the `hash` attribute of the `RemoteObjectProtocol` object is used.
439 """
440 self._bucket: Optional[Container] = None
441 self._conf = conf
442 self._provider = conf.provider
443 self._remote_base_path = ""
444 self.set_remote_base_path(conf.base_path)
445 possible_driver_kwargs = {
446 "key": self.conf.key,
447 "secret": self.conf.secret,
448 "region": self.conf.region,
449 "host": self.conf.host,
450 "port": self.conf.port,
451 "secure": self.conf.secure,
452 }
453 self.driver_kwargs = {
454 k: v for k, v in possible_driver_kwargs.items() if v is not None
455 }
456 self.add_extra_to_upload = add_extra_to_upload
457 self.remote_hash_extractor = remote_hash_extractor
459 def create_bucket(self, exist_ok: bool = True):
460 try:
461 log.info(
462 f"Creating bucket {self.conf.bucket} from configuration {self.conf}."
463 )
464 self.driver.create_container(container_name=self.conf.bucket)
465 except (ContainerAlreadyExistsError, InvalidContainerNameError):
466 if not exist_ok:
467 raise
468 log.info(
469 f"Bucket {self.conf.bucket} already exists (or the name was invalid)."
470 )
472 @property
473 def conf(self) -> RemoteStorageConfig:
474 return self._conf
476 @property
477 def provider(self) -> str:
478 return self._provider
480 @property
481 def remote_base_path(self) -> str:
482 return self._remote_base_path
484 def set_remote_base_path(self, path: Optional[str]):
485 """
486 Changes the base path in the remote storage
487 (overriding the base path extracted from RemoteStorageConfig during instantiation).
488 Pull and push operations will only affect files within the remote base path.
490 :param path: a path with linux-like separators
491 """
492 if path is None:
493 path = ""
494 else:
495 # google storage pulling and listing does not work with paths starting with "/"
496 path = path.strip().lstrip("/")
497 self._remote_base_path = path.strip()
499 @cached_property
500 def bucket(self) -> Container:
501 log.info(f"Establishing connection to bucket {self.conf.bucket}")
502 return self.driver.get_container(self.conf.bucket)
504 @cached_property
505 def driver(self) -> StorageDriver:
506 storage_driver_factory = get_driver(self.provider)
507 return storage_driver_factory(**self.driver_kwargs)
509 def _execute_sync(
510 self,
511 sync_object: SyncObject,
512 file_count: Tuple[int, int],
513 direction: Literal["push", "pull"],
514 force=False,
515 use_pbar: Optional[bool] = None,
516 ) -> SyncObject:
517 """
518 Synchronizes the local and the remote file in the given direction. Will raise an error if a file from the source
519 would overwrite an already existing file on the target and force=False. In this case, no operations will be
520 performed on the target.
522 :param sync_object: instance of SyncObject that will be used as basis for synchronization. Usually
523 created from a get_*_summary method.
524 :param file_count: a tuple (n, total) indicating sync progress in terms of files (n-th file of total files)
525 :param direction: either "push" or "pull"
526 :param force: if True, all already existing files on the target (with a different md5sum than the source files)
527 will be overwritten.
528 :param use_pbar: If not None, overrides the configured default value for this flag.
529 Specifically, if True, will use a progress bar for the pull operation; if False, will use logging.
530 :return: a SyncObject that represents the status of remote and target after the synchronization
531 """
532 if sync_object.equal_md5_hash_sum:
533 self._log(
534 f"Skipping {direction} of {sync_object.name} (already up-to-date)",
535 use_pbar,
536 )
537 return sync_object
539 if sync_object.exists_on_target and not force:
540 raise ValueError(
541 f"Cannot perform {direction} because {sync_object.name} already exists and force is False"
542 )
544 sync_object_str = f"{sync_object.name} ({self._readable_size(sync_object.get_bytes_transferred())})"
546 if direction == "push":
547 if not sync_object.exists_locally:
548 raise FileNotFoundError(
549 f"Cannot push non-existing file: {sync_object.local_path}"
550 )
551 assert sync_object.local_path is not None
553 extra = (
554 self.add_extra_to_upload(sync_object)
555 if self.add_extra_to_upload is not None
556 else None
557 )
559 # do upload
560 self._log(
561 f"Uploading file {file_count[0]}/{file_count[1]} to {self.bucket.name}: {sync_object_str}",
562 use_pbar,
563 )
564 remote_obj = cast(
565 RemoteObjectProtocol,
566 self.bucket.upload_object(
567 sync_object.local_path,
568 sync_object.remote_path,
569 extra=extra,
570 verify_hash=False,
571 ),
572 )
574 if self.remote_hash_extractor is not None:
575 remote_obj_overridden_md5_hash = self.remote_hash_extractor(remote_obj)
576 else:
577 remote_obj_overridden_md5_hash = None
578 return SyncObject(
579 sync_direction=sync_object.sync_direction,
580 local_path=sync_object.local_path,
581 remote_obj=remote_obj,
582 remote_obj_overridden_md5_hash=remote_obj_overridden_md5_hash,
583 )
585 elif direction == "pull":
586 if None in [sync_object.remote_obj, sync_object.local_path]:
587 raise RuntimeError(
588 f"Cannot pull without remote object and local path. Affects: {sync_object.name}"
589 )
590 assert sync_object.local_path is not None
591 if os.path.isdir(sync_object.local_path):
592 raise FileExistsError(
593 f"Cannot pull file to a path which is an existing directory: {sync_object.local_path}"
594 )
596 os.makedirs(os.path.dirname(sync_object.local_path), exist_ok=True)
598 # do download
599 self._log(
600 f"Downloading file {file_count[0]}/{file_count[1]} from {self.bucket.name}: {sync_object_str}",
601 use_pbar,
602 )
603 sync_object.remote_obj.download(
604 sync_object.local_path, overwrite_existing=force
605 )
607 return SyncObject(
608 sync_direction=sync_object.sync_direction,
609 local_path=sync_object.local_path,
610 remote_obj=sync_object.remote_obj,
611 )
612 else:
613 raise ValueError(
614 f"Unknown direction {direction}, has to be either 'push' or 'pull'."
615 )
617 @staticmethod
618 def _get_remote_path(remote_obj: RemoteObjectProtocol) -> str:
619 """
620 Returns the full path to the remote object. The resulting path never starts with "/" as it can cause problems
621 with some backends (e.g. google cloud storage).
622 """
623 return remote_obj.name.lstrip("/")
625 def _get_relative_remote_path(self, remote_obj: RemoteObjectProtocol) -> str:
626 """
627 Returns the path to the remote object relative to configured base dir (as expected by pull for a single file)
628 """
629 result = remote_obj.name
630 result = result[len(self.remote_base_path) :]
631 result = result.lstrip("/")
632 return result
634 def _full_remote_path(self, remote_path: str) -> str:
635 """
636 :param remote_path: remote_path on storage bucket relative to the configured remote base remote_path.
637 e.g. 'data/some_file.json'
638 :return: full remote remote_path on storage bucket. With the example above gives
639 "remote_base_path/data/some_file.json". Does not start with "/" even if remote_base_path is empty
640 """
641 # in google cloud paths cannot begin with / for pulling or listing (for pushing they can though...)
642 remote_path = "/".join([self.remote_base_path, remote_path])
643 return remote_path.lstrip("/")
645 @staticmethod
646 def _listed_due_to_name_collision(
647 full_remote_path: str, remote_object: RemoteObjectProtocol
648 ) -> bool:
649 """
650 Checks whether a remote object was falsely listed because its name starts with the same
651 characters as full_remote_path.
653 Example 1: full remote path is 'pull/this/dir' and remote storage includes paths like 'pull/this/dir_subfix'.
654 Example 2: full remote path is 'delete/this/file' and remote storage includes paths like 'delete/this/file_2'.
656 All such paths will be listed in bucket.list_objects(full_remote_path), and we need to exclude them in
657 most methods like pull or delete.
659 :param full_remote_path: usually the output of self._full_remote_path(remote_path)
660 :param remote_object: the object to check
661 :return:
662 """
663 # no name collisions possible in this case
664 if full_remote_path.endswith("/") or full_remote_path == "":
665 return False
667 # Remove leading / for comparison of paths
668 full_remote_path = full_remote_path.lstrip("/")
669 object_remote_path = RemoteStorage._get_remote_path(remote_object)
670 is_in_selected_dir = object_remote_path.startswith(full_remote_path + "/")
671 is_selected_file = object_remote_path == full_remote_path
672 return not (is_in_selected_dir or is_selected_file)
674 def _log(self, msg: str, use_pbar: Optional[bool]) -> None:
675 """
676 Logs the given message, provided that logging is enabled (progress bar is disabled).
677 """
678 if use_pbar is None:
679 use_pbar = self.conf.use_pbar
680 if not use_pbar:
681 log.log(self.conf.log_level, msg)
683 @staticmethod
684 def _readable_size(num_bytes: int) -> str:
685 if num_bytes < 1000:
686 return f"{num_bytes} B"
687 elif num_bytes < 1000**2:
688 return f"{round(num_bytes / 1000)} kB"
689 elif num_bytes < 1000**3:
690 return f"{num_bytes / 1000 ** 2:.1f} MB"
691 else:
692 return f"{num_bytes / 1000 ** 3:.2f} GB"
694 def _pbar(
695 self,
696 iterable=None,
697 total=None,
698 desc=None,
699 enabled: Optional[bool] = None,
700 unit_scale=True,
701 ):
702 if enabled is None:
703 enabled = self.conf.use_pbar
704 return tqdm(
705 iterable=iterable,
706 total=total,
707 desc=desc,
708 disable=not enabled,
709 unit_scale=unit_scale,
710 )
712 def _execute_sync_from_summary(
713 self,
714 summary: TransactionSummary,
715 dryrun: bool = False,
716 force: bool = False,
717 use_pbar: Optional[bool] = None,
718 ) -> TransactionSummary:
719 """
720 Executes a transaction summary.
721 :param summary: The transaction summary
722 :param dryrun: if True, logs any error that would have prevented the execution and returns the summary
723 without actually executing the sync.
724 :param force: raises an error if dryrun=False and any files would be overwritten by the sync
725 :param use_pbar: If not None, overrides the configured default value for this flag.
726 Specifically, if True, will use a progress bar for the pull operation; if False, will use logging.
727 :return: Returns the input transaction summary. Note that the function potentially alters the state of the
728 input summary.
729 """
730 if dryrun:
731 log.info(f"Skipping {summary.sync_direction} because dryrun=True")
732 if summary.has_unresolvable_collisions:
733 log.warning(
734 f"This transaction has unresolvable collisions and would not succeed."
735 )
736 if summary.requires_force and not force:
737 log.warning(
738 f"This transaction requires overwriting of files and would not succeed without force=True"
739 )
740 return summary
742 if summary.has_unresolvable_collisions:
743 raise FileExistsError(
744 f"Found name collisions files with directories, not syncing anything. "
745 f"Suggestion: perform a dryrun and analyze the summary. "
746 f"Affected names: {list(summary.unresolvable_collisions.keys())}. "
747 )
749 if summary.requires_force and not force:
750 raise FileExistsError(
751 f"Operation requires overwriting of objects but `force=False`. "
752 f"Suggestion: perform a dryrun and analyze the summary. "
753 f"Affected names: {[obj.name for obj in summary.on_target_neq_md5]}. "
754 )
756 total_files = len(summary.files_to_sync)
757 if total_files == 0:
758 self._log("No files to be updated", use_pbar)
759 else:
760 desc = f"{summary.sync_direction}ing (bytes)"
761 if force:
762 desc = "force " + desc
763 with self._pbar(
764 total=summary.size_files_to_sync(), desc=desc, enabled=use_pbar
765 ) as pbar:
766 for cnt, sync_obj in enumerate(summary.files_to_sync, start=1):
767 assert summary.sync_direction is not None
768 synced_obj = self._execute_sync(
769 sync_obj,
770 file_count=(cnt, total_files),
771 direction=summary.sync_direction,
772 force=force,
773 use_pbar=use_pbar,
774 )
775 pbar.update(synced_obj.local_size)
776 summary.synced_files.append(synced_obj)
777 return summary
779 def pull(
780 self,
781 remote_path: str,
782 local_base_dir: str = "",
783 force: bool = False,
784 include_regex: Optional[Union[Pattern, str]] = None,
785 exclude_regex: Optional[Union[Pattern, str]] = None,
786 convert_to_linux_path: bool = True,
787 dryrun: bool = False,
788 path_regex: Optional[Union[Pattern, str]] = None,
789 strip_abspath_prefix: Optional[str] = None,
790 strip_abs_local_base_dir: bool = True,
791 use_pbar: Optional[bool] = None,
792 ) -> TransactionSummary:
793 r"""
794 Pull either a file or a directory under the given path relative to local_base_dir.
796 :param remote_path: remote path on storage bucket relative to the configured remote base path.
797 e.g. 'data/ground_truth/some_file.json'. Can also be an absolute local path if ``strip_abspath_prefix``
798 is specified.
799 :param local_base_dir: Local base directory for constructing local path
800 e.g. passing 'local_base_dir' will download to the path
801 'local_base_dir/data/ground_truth/some_file.json' in the above example
802 :param force: If False, pull will raise an error if an already existing file deviates from the remote in
803 its md5sum. If True, these files are overwritten.
804 :param include_regex: If not None only files with paths matching the regex will be pulled. This is useful for
805 filtering files within a remote directory before pulling them.
806 :param exclude_regex: If not None, files with paths matching the regex will be excluded from the pull.
807 Takes precedence over ``include_regex``, i.e. if a file matches both, it will be excluded.
808 :param convert_to_linux_path: if True, will convert windows path to linux path (as needed by remote storage) and
809 thus passing a remote path like 'data\my\path' will be converted to 'data/my/path' before pulling.
810 This should only be set to False if you want to pull a remote object with '\' in its file name
811 (which is discouraged).
812 :param dryrun: If True, simulates the pull operation and returns the remote objects that would have been pulled.
813 :param path_regex: DEPRECATED! Use ``include_regex`` instead.
814 :param strip_abspath_prefix: Will only have an effect if the `remote_path` is absolute.
815 Then the given prefix is removed from it before pulling. This is useful for pulling files from a remote storage
816 by directly specifying absolute local paths instead of first converting them to actual remote paths.
817 Similar in logic to `local_path_prefix` in `push`.
818 A common use case is to always set `local_base_dir` to the same value and to always pass absolute paths
819 as `remote_path` to `pull`.
820 :param strip_abs_local_base_dir: If True, and `local_base_dir` is an absolute path, then
821 the `local_base_dir` will be treated as `strip_abspath_prefix`. See explanation of `strip_abspath_prefix`.
822 :param use_pbar: If not None, overrides the configured default value for this flag.
823 Specifically, if True, will use a progress bar for the pull operation; if False, will use logging.
824 :return: An object describing the summary of the operation.
825 """
827 if strip_abs_local_base_dir and os.path.isabs(local_base_dir):
828 if strip_abspath_prefix is not None:
829 raise ValueError(
830 f"Cannot specify both `strip_abs_local_base_dir`={strip_abs_local_base_dir} "
831 f"and `strip_abspath_prefix`={strip_abspath_prefix}"
832 f"when `local_base_dir`={local_base_dir} is an absolute path."
833 )
834 strip_abspath_prefix = local_base_dir
836 remote_path_is_abs = remote_path.startswith("/") or os.path.isabs(remote_path)
838 if strip_abspath_prefix is not None and remote_path_is_abs:
839 remote_path = remote_path.replace("\\", "/")
840 strip_abspath_prefix = strip_abspath_prefix.replace("\\", "/").rstrip("/")
841 if not remote_path.startswith(strip_abspath_prefix):
842 raise ValueError(
843 f"Remote path {remote_path} is absolute but does not start "
844 f"with the given prefix {strip_abspath_prefix}"
845 )
846 # +1 for removing the leading '/'
847 remote_path = remote_path[len(strip_abspath_prefix) + 1 :]
849 # scan files to determine required operations
850 include_regex = self._handle_deprecated_path_regex(include_regex, path_regex)
851 summary = self._get_pull_summary(
852 remote_path,
853 local_base_dir,
854 include_regex=include_regex,
855 exclude_regex=exclude_regex,
856 convert_to_linux_path=convert_to_linux_path,
857 use_pbar=use_pbar,
858 )
859 if len(summary.all_files_analyzed) == 0:
860 log.warning(f"No files found in remote storage under path: {remote_path}")
862 # perform the actual synchronisation
863 return self._execute_sync_from_summary(
864 summary, dryrun=dryrun, force=force, use_pbar=use_pbar
865 )
867 def _get_destination_path(
868 self, obj: RemoteObjectProtocol, local_base_dir: str
869 ) -> str:
870 """
871 Return the destination path of the given object
872 """
873 relative_obj_path = self._get_relative_remote_path(obj)
874 return os.path.join(local_base_dir, relative_obj_path)
876 def _get_pull_summary(
877 self,
878 remote_path: str,
879 local_base_dir: str = "",
880 include_regex: Optional[Union[Pattern, str]] = None,
881 exclude_regex: Optional[Union[Pattern, str]] = None,
882 convert_to_linux_path: bool = True,
883 path_regex: Optional[Union[Pattern, str]] = None,
884 use_pbar: Optional[bool] = None,
885 ) -> TransactionSummary:
886 r"""
887 Creates TransactionSummary of the specified pull operation.
889 :param remote_path: remote path on storage bucket relative to the configured remote base path.
890 e.g. 'data/ground_truth/some_file.json'
891 :param local_base_dir: Local base directory for constructing local path.
892 Example: passing 'local_base_dir' will download to the path
893 'local_base_dir/data/ground_truth/some_file.json' in the above example
894 :param include_regex: If not None, only files with paths matching the regex will be pulled. This is useful for
895 filtering files within a remote directory before pulling them.
896 :param exclude_regex: If not None, only files with paths not matching the regex will be pulled.
897 Takes precedence over include_regex, i.e. if a file matches both, it will be excluded.
898 :param convert_to_linux_path: if True, will convert windows path to linux path (as needed by remote storage) and
899 thus passing a remote path like 'data\my\path' will be converted to 'data/my/path' before pulling.
900 This should only be set to False if you want to pull a remote object with '\' in its file name
901 (which is discouraged).
902 :param path_regex: DEPRECATED! use ``include_regex`` instead.
903 :return:
904 """
905 include_regex = self._handle_deprecated_path_regex(include_regex, path_regex)
907 include_regex = _to_optional_pattern(include_regex)
908 exclude_regex = _to_optional_pattern(exclude_regex)
910 local_base_dir = os.path.abspath(local_base_dir)
911 if convert_to_linux_path:
912 remote_path = remote_path.replace("\\", "/")
914 summary = TransactionSummary(sync_direction="pull")
915 full_remote_path = self._full_remote_path(remote_path)
916 # noinspection PyTypeChecker
917 remote_objects = cast(
918 List[RemoteObjectProtocol], list(self.bucket.list_objects(full_remote_path))
919 )
921 msg = f"Scanning remote paths in {self.bucket.name}/{full_remote_path}"
922 self._log(msg, use_pbar)
923 for remote_obj in self._pbar(
924 iterable=remote_objects, desc=f"{msg}: ", enabled=use_pbar
925 ):
926 local_path = None
927 collides_with = None
928 if (remote_obj.size == 0) or (
929 self._listed_due_to_name_collision(full_remote_path, remote_obj)
930 ):
931 log.debug(
932 f"Skipping {remote_obj.name} since it was listed due to name collisions"
933 )
934 skip = True
935 else:
936 relative_obj_path = self._get_relative_remote_path(remote_obj)
937 skip = self._should_skip(
938 relative_obj_path, include_regex, exclude_regex
939 )
941 if not skip:
942 local_path = self._get_destination_path(remote_obj, local_base_dir)
943 if os.path.isdir(local_path):
944 collides_with = local_path
946 remote_obj_overridden_md5_hash = (
947 self.remote_hash_extractor(remote_obj)
948 if self.remote_hash_extractor is not None
949 else None
950 )
951 sync_obj = SyncObject(
952 sync_direction="pull",
953 local_path=local_path,
954 remote_obj=remote_obj,
955 remote_obj_overridden_md5_hash=remote_obj_overridden_md5_hash,
956 )
958 summary.add_entry(
959 sync_obj,
960 skip=skip,
961 collides_with=collides_with,
962 )
964 return summary
966 def get_push_remote_path(self, local_path: str) -> str:
967 """
968 Get the full path within a remote storage bucket for pushing.
970 :param local_path: the local path to the file
971 :return: the remote path that corresponds to the local path
972 """
973 return (
974 "/".join([self.remote_base_path, local_path])
975 .replace(os.sep, "/")
976 .lstrip("/")
977 )
979 def _get_push_summary(
980 self,
981 path: str,
982 local_path_prefix: Optional[str] = None,
983 include_regex: Optional[Union[Pattern, str]] = None,
984 exclude_regex: Optional[Union[Pattern, str]] = None,
985 path_regex: Optional[Union[Pattern, str]] = None,
986 use_pbar: Optional[bool] = None,
987 ) -> TransactionSummary:
988 """
989 Retrieves the summary of the push-transaction plan, before it has been executed.
990 Nothing will be pushed and the synced_files entry of the summary will be an empty list.
992 :param path: Path to the local object (file or directory) to be uploaded, may be absolute or relative.
993 globs are permitted, thus ``path`` may contain wildcards.
994 :param local_path_prefix: path names on the remote will be relative to this path. Thus, specifying
995 for example ``local_path_prefix=/bar/foo`` (on a unix system) and ``path=baz``
996 will push ``/bar/foo/baz`` to ``remote_base_path/baz``. The same will happen if
997 ``path=/bar/foo/baz`` is specified.
998 **NOTE**: if ``local_path_prefix`` is specified and ``path`` is absolute, it is assumed that
999 ``path`` is child of ``local_path_prefix``. If this is not the case, an error will be raised.
1000 :param include_regex: If not None, only files with paths matching the regex will be pushed.
1001 Note that paths matched against the regex will be relative to ``local_path_prefix``.
1002 :param exclude_regex: If not None, only files with paths not matching the regex will be pushed.
1003 Takes precedence over ``include_regex``, i.e. if a file matches both regexes, it will be excluded.
1004 Note that paths matched against the regex will be relative to ``local_path_prefix``.
1005 :param path_regex: DEPRECATED! Same as ``include_regex``.
1006 :return: the summary object
1007 """
1008 summary = TransactionSummary(sync_direction="push")
1009 include_regex = self._handle_deprecated_path_regex(include_regex, path_regex)
1011 if local_path_prefix is not None:
1012 local_path_prefix = os.path.abspath(local_path_prefix)
1013 include_regex = _to_optional_pattern(include_regex)
1014 exclude_regex = _to_optional_pattern(exclude_regex)
1016 _path = Path(path)
1017 if _path.is_absolute() and local_path_prefix:
1018 try:
1019 path = str(_path.relative_to(local_path_prefix))
1020 except ValueError:
1021 raise ValueError(
1022 f"Specified {path=} is not a child of {local_path_prefix=}"
1023 )
1025 # at this point, path is relative to local_path_prefix.
1026 with _switch_to_dir(local_path_prefix):
1027 # collect all paths to scan
1028 all_files_analyzed = []
1029 for local_path in glob.glob(path):
1030 if os.path.isfile(local_path):
1031 all_files_analyzed.append(local_path)
1032 elif os.path.isdir(local_path):
1033 for root, _, fs in os.walk(local_path):
1034 all_files_analyzed.extend([os.path.join(root, f) for f in fs])
1035 if len(all_files_analyzed) == 0:
1036 raise FileNotFoundError(
1037 f"No files found under {path=} with {local_path_prefix=}"
1038 )
1040 msg = f"Scanning files in {os.path.join(os.getcwd(), path)}"
1041 self._log(msg, use_pbar)
1042 for file in self._pbar(
1043 iterable=all_files_analyzed,
1044 desc=f"{msg}: ",
1045 enabled=use_pbar,
1046 ):
1047 collides_with = None
1048 remote_obj = None
1049 skip = self._should_skip(file, include_regex, exclude_regex)
1051 remote_path = self.get_push_remote_path(file)
1053 all_matched_remote_obj = cast(
1054 List[RemoteObjectProtocol], self.bucket.list_objects(remote_path)
1055 )
1056 matched_remote_obj = [
1057 obj
1058 for obj in all_matched_remote_obj
1059 if not self._listed_due_to_name_collision(remote_path, obj)
1060 ]
1062 # name collision of local file with remote dir
1063 if len(matched_remote_obj) > 1:
1064 collides_with = matched_remote_obj
1066 elif matched_remote_obj:
1067 remote_obj = matched_remote_obj[0]
1068 remote_obj_overridden_md5_hash = (
1069 self.remote_hash_extractor(remote_obj)
1070 if self.remote_hash_extractor is not None and remote_obj is not None
1071 else None
1072 )
1073 synced_obj = SyncObject(
1074 sync_direction="push",
1075 local_path=file,
1076 remote_obj=remote_obj,
1077 remote_path=remote_path,
1078 remote_obj_overridden_md5_hash=remote_obj_overridden_md5_hash,
1079 )
1080 summary.add_entry(
1081 synced_obj,
1082 collides_with=collides_with,
1083 skip=skip,
1084 )
1086 return summary
1088 @staticmethod
1089 def _should_skip(
1090 file: str, include_regex: Optional[Pattern], exclude_regex: Optional[Pattern]
1091 ):
1092 if include_regex is not None and not include_regex.match(file):
1093 log.debug(
1094 f"Skipping {file} since it does not match regular expression '{include_regex}'."
1095 )
1096 return True
1097 if exclude_regex is not None and exclude_regex.match(file):
1098 log.debug(
1099 f"Skipping {file} since it matches regular expression '{exclude_regex}'."
1100 )
1101 return True
1102 return False
1104 @staticmethod
1105 def _handle_deprecated_path_regex(
1106 include_regex: Optional[Union[Pattern, str]],
1107 path_regex: Optional[Union[Pattern, str]],
1108 ):
1109 if path_regex is not None:
1110 log.warning(
1111 "Using deprecated parameter 'path_regex'. Use 'include_regex' instead."
1112 )
1113 if include_regex is not None:
1114 raise ValueError(
1115 "Cannot specify both 'path_regex' and 'include_regex'. "
1116 "Use only 'include_regex' instead, 'path_regex' is deprecated."
1117 f"Got {path_regex=} and {include_regex=}"
1118 )
1119 include_regex = path_regex
1120 return include_regex
1122 def push(
1123 self,
1124 path: str,
1125 local_path_prefix: Optional[str] = None,
1126 force: bool = False,
1127 include_regex: Optional[Union[Pattern, str]] = None,
1128 exclude_regex: Optional[Union[Pattern, str]] = None,
1129 dryrun: bool = False,
1130 path_regex: Optional[Union[Pattern, str]] = None,
1131 use_pbar: Optional[bool] = None,
1132 ) -> TransactionSummary:
1133 """
1134 Upload files into the remote storage.
1135 Does not upload files for which the md5sum matches existing remote files.
1136 The remote path for uploading will be constructed from the remote_base_path and the provided path.
1137 The `local_path_prefix` serves for finding the directory on the local system or for stripping off
1138 parts of absolute paths if path is absolute, see examples below.
1140 Examples:
1141 1) path=foo/bar, local_path_prefix=None -->
1142 ./foo/bar uploaded to remote_base_path/foo/bar
1143 2) path=/home/foo/bar, local_path_prefix=None -->
1144 /home/foo/bar uploaded to remote_base_path/home/foo/bar
1145 3) path=bar, local_path_prefix=/home/foo -->
1146 /home/foo/bar uploaded to remote_base_path/bar
1147 4) path=/home/foo/bar, local_path_prefix=/home/foo -->
1148 /home/foo/bar uploaded to remote_base_path/bar (Same as 3)
1149 5) path=/home/baz/bar, local_path_prefix=/home/foo -->
1150 ValueError: Specified path=/home/baz/bar is not a child of local_path_prefix=/home/foo
1152 :param path: Path to the local object (file or directory) to be uploaded, may be absolute or relative.
1153 globs are supported as well, thus ``path`` may be a pattern like ``*.txt``.
1154 :param local_path_prefix: Prefix to be concatenated with ``path``
1155 :param force: If False, push will raise an error if an already existing remote file deviates from the local
1156 in its md5sum. If True, these files are overwritten.
1157 :param include_regex: If not None, only files with paths matching the regex will be pushed.
1158 Note that paths matched against the regex will be relative to ``local_path_prefix``.
1159 :param exclude_regex: If not None, only files with paths not matching the regex will be pushed. Takes precedence
1160 over ``include_regex``, i.e. if a file matches both regexes, it will be excluded.
1161 Note that paths matched against the regex will be relative to ``local_path_prefix``.
1162 :param dryrun: If True, simulates the push operation and returns the summary
1163 (with synced_files being an empty list).
1164 :param path_regex: DEPRECATED! Same as ``include_regex``.
1165 :param use_pbar: If not None, overrides the configured default value for this flag.
1166 Specifically, if True, will use a progress bar for the pull operation; if False, will use logging.
1167 :return: An object describing the summary of the operation.
1168 """
1169 include_regex = self._handle_deprecated_path_regex(include_regex, path_regex)
1170 summary = self._get_push_summary(
1171 path,
1172 local_path_prefix,
1173 include_regex=include_regex,
1174 exclude_regex=exclude_regex,
1175 use_pbar=use_pbar,
1176 )
1177 return self._execute_sync_from_summary(
1178 summary, dryrun=dryrun, force=force, use_pbar=use_pbar
1179 )
1181 def delete(
1182 self,
1183 remote_path: str,
1184 include_regex: Optional[Union[Pattern, str]] = None,
1185 exclude_regex: Optional[Union[Pattern, str]] = None,
1186 path_regex: Optional[Union[Pattern, str]] = None,
1187 ) -> List[RemoteObjectProtocol]:
1188 """
1189 Deletes a file or a directory under the given path relative to local_base_dir. Use with caution!
1191 :param remote_path: remote path on storage bucket relative to the configured remote base path.
1192 :param include_regex: If not None only files with paths matching the regex will be deleted.
1193 :param exclude_regex: If not None only files with paths not matching the regex will be deleted.
1194 Takes precedence over ``include_regex``, i.e. if a file matches both regexes, it will be excluded.
1195 :param path_regex: DEPRECATED! Same as ``include_regex``.
1196 :return: list of remote objects referring to all deleted files
1197 """
1198 include_regex = self._handle_deprecated_path_regex(include_regex, path_regex)
1199 include_regex = _to_optional_pattern(include_regex)
1200 exclude_regex = _to_optional_pattern(exclude_regex)
1202 full_remote_path = self._full_remote_path(remote_path)
1204 remote_objects = cast(
1205 List[RemoteObjectProtocol], self.bucket.list_objects(full_remote_path)
1206 )
1207 if len(remote_objects) == 0:
1208 log.warning(
1209 f"No such remote file or directory: {full_remote_path}. Not deleting anything"
1210 )
1211 return []
1212 deleted_objects = []
1213 for remote_obj in remote_objects:
1214 if self._listed_due_to_name_collision(full_remote_path, remote_obj):
1215 log.debug(
1216 f"Skipping deletion of {remote_obj.name} as it was listed due to name collision"
1217 )
1218 continue
1220 relative_obj_path = self._get_relative_remote_path(remote_obj)
1221 if include_regex is not None and not include_regex.match(relative_obj_path):
1222 log.info(f"Skipping {relative_obj_path} due to regex {include_regex}")
1223 continue
1224 if exclude_regex is not None and exclude_regex.match(relative_obj_path):
1225 log.info(f"Skipping {relative_obj_path} due to regex {exclude_regex}")
1226 continue
1227 log.debug(f"Deleting {remote_obj.name}")
1228 self.bucket.delete_object(remote_obj) # type: ignore
1229 deleted_objects.append(remote_obj)
1230 return deleted_objects
1232 def list_objects(self, remote_path: str) -> List[RemoteObjectProtocol]:
1233 """
1234 :param remote_path: remote path on storage bucket relative to the configured remote base path.
1235 :return: list of remote objects under the remote path (multiple entries if the remote path is a directory)
1236 """
1237 full_remote_path = self._full_remote_path(remote_path)
1238 return self.bucket.list_objects(full_remote_path) # type: ignore