Best Python code snippet using autotest_python
resources.py
Source:resources.py
1# -*- coding: utf-8 -*-2"""3(c) 2012-2022 Martin Wendt; see https://github.com/mar10/pyftpsync4Licensed under the MIT license: https://www.opensource.org/licenses/mit-license.php5"""6import os7from datetime import datetime8from posixpath import join as join_url9from posixpath import normpath as normpath_url10from posixpath import relpath as relpath_url11from ftpsync.util import DEBUG_FLAGS, eps_compare, write12ENTRY_CLASSIFICATIONS = frozenset(13 ["existing", "unmodified", "modified", "new", "deleted"]14)15# PAIR_CLASSIFICATIONS = frozenset([16# "conflict", "equal", "other"17# ])18PAIR_OPERATIONS = frozenset(19 [20 "conflict",21 "copy_local",22 "copy_remote",23 "delete_local",24 "delete_remote",25 "equal",26 "need_compare",27 ]28)29operation_map = {30 # (local, remote) => operation31 ("missing", "missing"): None, # Not allowed32 ("missing", "new"): "copy_remote",33 ("missing", "unmodified"): "copy_remote",34 ("missing", "modified"): "copy_remote",35 ("missing", "deleted"): True, # Nothing to do (only update metadata)36 ("new", "missing"): "copy_local",37 ("new", "new"): "need_compare",38 ("new", "unmodified"): "need_compare",39 ("new", "modified"): "need_compare",40 ("new", "deleted"): "conflict",41 ("unmodified", "missing"): "copy_local",42 ("unmodified", "new"): "need_compare",43 ("unmodified", "unmodified"): "equal",44 ("unmodified", "modified"): "copy_remote",45 ("unmodified", "deleted"): "delete_local",46 ("modified", "missing"): "copy_local",47 ("modified", "new"): "need_compare",48 ("modified", "unmodified"): "copy_local",49 ("modified", "modified"): "conflict",50 ("modified", "deleted"): "conflict",51 ("deleted", "missing"): True, # Nothing to do (only update metadata)52 ("deleted", "new"): "conflict",53 ("deleted", "unmodified"): "delete_remote",54 ("deleted", "modified"): "conflict",55 ("deleted", "deleted"): True, # Nothing to do (only update metadata)56 # No meta data available: treat as 'unmodified' in general:57 ("existing", "missing"): "copy_local",58 ("missing", "existing"): "copy_remote",59 ("existing", "existing"): "need_compare",60}61# ===============================================================================62# EntryPair63# ===============================================================================64class EntryPair:65 """"""66 def __init__(self, local, remote):67 self.local = local68 self.remote = remote69 any_entry = local or remote70 assert any_entry71 if local and remote:72 assert local.name == remote.name73 assert local.get_rel_path() == remote.get_rel_path()74 assert local.is_dir() == remote.is_dir()75 #: str:76 self.name = any_entry.name77 #: str:78 self.rel_path = any_entry.get_rel_path()79 #: bool:80 self.is_dir = any_entry.is_dir()81 #: str:82 self.local_classification = None83 #: str:84 self.remote_classification = None85 #: str:86 self.operation = None87 #: str:88 self.re_class_reason = None89 # #: bool:90 # self.was_skipped = None91 def __str__(self):92 s = "<EntryPair({})>: ({}, {}) => {}".format(93 "[{}]".format(self.rel_path) if self.is_dir else self.rel_path,94 self.local_classification,95 self.remote_classification,96 self.operation,97 )98 return s99 @property100 def any_entry(self):101 """Return the local entry (or the remote entry if it is None)."""102 return self.local or self.remote103 def is_conflict(self):104 assert self.operation105 return self.operation == "conflict"106 def is_same_time(self):107 """Return True if local.mtime == remote.mtime."""108 return (109 self.local110 and self.remote111 and FileEntry._eps_compare(self.local.mtime, self.remote.mtime) == 0112 )113 def override_operation(self, operation, reason):114 """Re-Classify entry pair."""115 # prev_class = (self.local_classification, self.remote_classification)116 prev_op = self.operation117 assert operation != prev_op118 assert operation in PAIR_OPERATIONS119 if "classify" in DEBUG_FLAGS:120 write(121 "override_operation {} -> {} (reason: '{}')".format(122 self, operation, reason123 ),124 debug=True,125 )126 self.operation = operation127 self.re_class_reason = reason128 def classify(self, peer_dir_meta):129 """Classify entry pair."""130 assert self.operation is None131 # Note: We pass False if the entry is not listed in the metadata.132 # We pass None if we don't have metadata all.133 peer_entry_meta = peer_dir_meta.get(self.name, False) if peer_dir_meta else None134 if self.local:135 self.local.classify(peer_dir_meta)136 self.local_classification = self.local.classification137 elif peer_entry_meta:138 self.local_classification = "deleted"139 else:140 self.local_classification = "missing"141 if self.remote:142 self.remote.classify(peer_dir_meta)143 self.remote_classification = self.remote.classification144 elif peer_entry_meta:145 self.remote_classification = "deleted"146 else:147 self.remote_classification = "missing"148 c_pair = (self.local_classification, self.remote_classification)149 self.operation = operation_map.get(c_pair)150 if not self.operation:151 raise RuntimeError(152 "Undefined operation for pair classification {}".format(c_pair)153 )154 if "classify" in DEBUG_FLAGS:155 write(156 "Classified pair {}, meta={}".format(self, peer_entry_meta),157 debug=True,158 )159 # if not entry.meta:160 # assert self.classification in PAIR_CLASSIFICATIONS161 assert self.operation in PAIR_OPERATIONS162 return self.operation163# ===============================================================================164# _Resource165# ===============================================================================166class _Resource:167 """Common base class for files and directories."""168 def __init__(self, target, rel_path, name, size, mtime, unique):169 """170 Args:171 target:172 rel_path (str):173 name (str): base name174 size (int): file size in bytes175 mtime (float): modification time as UTC stamp176 uniqe (str): string177 """178 #: :class:`_Target`: Parent target object.179 self.target = target180 #: str: Path relative to :attr:`target`181 self.rel_path = rel_path182 #: str: File name.183 self.name = name184 #: int: Current file size185 self.size = size186 #: float: Current file modification time stamp187 #: (for FTP targets adjusted using metadata information).188 self.mtime = mtime189 # #: datetime: Converted version of :attr:`mtime`.190 # self.dt_modified = datetime.fromtimestamp(self.mtime)191 #: float: Modification time stamp (as reported by source FTP server).192 self.mtime_org = mtime193 # #: datetime: Converted version of :attr:`mtime_org`.194 # self.dt_modified_org = self.mtime_org195 #: str: Unique id of file/directory.196 self.unique = unique197 # #: dict: Additional metadata (set by target.get_dir()).198 # self.meta = None199 #: int: File size at the time of last sync operation200 self.ps_size = None201 #: float: File modification time stamp at the time of last sync operation202 self.ps_mtime = None203 #: float: Time stamp of last sync operation204 self.ps_utime = None205 #: str: (set by synchronizer._classify_entry()).206 self.classification = None207 #: bool: May be set to true by synchronizer208 self.was_deleted = None209 def __str__(self):210 dt_modified = datetime.fromtimestamp(self.mtime)211 path = os.path.join(self.rel_path, self.name)212 if self.is_dir():213 res = "{}([{}])".format(self.__class__.__name__, path)214 else:215 res = "{}('{}', size:{}, modified:{})".format(216 self.__class__.__name__,217 path,218 "{:,}".format(self.size) if self.size is not None else self.size,219 dt_modified,220 )221 # + " ## %s, %s" % (self.mtime, time.asctime(time.gmtime(self.mtime)))222 if self.classification:223 res += " => {}".format(self.classification)224 return res225 def as_string(self, other_resource=None):226 # dt = datetime.fromtimestamp(self.get_adjusted_mtime())227 dt = datetime.fromtimestamp(self.mtime)228 res = "{}, {:>8,} bytes".format(dt.strftime("%Y-%m-%d %H:%M:%S"), self.size)229 if other_resource:230 comp = []231 if self.mtime < other_resource.mtime:232 comp.append("older")233 elif self.mtime > other_resource.mtime:234 comp.append("newer")235 if self.size < other_resource.size:236 comp.append("smaller")237 elif self.size > other_resource.size:238 comp.append("larger")239 if comp:240 res += " ({})".format(", ".join(comp))241 return res242 def __eq__(self, other):243 raise NotImplementedError244 def get_rel_path(self):245 path = relpath_url(self.target.cur_dir, self.target.root_dir)246 return normpath_url(join_url(path, self.name))247 def is_file(self):248 return False249 def is_dir(self):250 return False251 def is_local(self):252 return self.target.is_local()253 def get_sync_info(self, key=None):254 return None255 def set_sync_info(self, local_file):256 raise NotImplementedError257 def classify(self, peer_dir_meta):258 """Classify this entry as 'new', 'unmodified', or 'modified'."""259 assert self.classification is None, "{}, {}".format(self, peer_dir_meta)260 peer_entry_meta = None261 if peer_dir_meta:262 # Metadata is generally available, so we can detect 'new' or 'modified'263 peer_entry_meta = peer_dir_meta.get(self.name, False)264 if self.is_dir():265 # Directories are considered 'unmodified' (would require deep266 # traversal to check otherwise)267 if peer_entry_meta:268 self.classification = "unmodified"269 else:270 self.classification = "new"271 elif peer_entry_meta:272 # File entries can be classified as modified/unmodified273 self.ps_size = peer_entry_meta.get("s")274 self.ps_mtime = peer_entry_meta.get("m")275 self.ps_utime = peer_entry_meta.get("u")276 if (277 self.size == self.ps_size278 and FileEntry._eps_compare(self.mtime, self.ps_mtime) == 0279 ):280 self.classification = "unmodified"281 else:282 self.classification = "modified"283 else:284 # A new file entry285 self.classification = "new"286 else:287 # No metadata available:288 if self.is_dir():289 # Directories are considered 'unmodified' (would require deep290 # traversal to check otherwise)291 self.classification = "unmodified"292 else:293 # That's all we know, but EntryPair.classify() may adjust this294 self.classification = "existing"295 if "classify" in DEBUG_FLAGS:296 write("Classified {}, meta={}".format(self, peer_entry_meta), debug=True)297 assert self.classification in ENTRY_CLASSIFICATIONS298 return self.classification299# ===============================================================================300# FileEntry301# ===============================================================================302class FileEntry(_Resource):303 # 2 seconds difference is considered equal.304 # mtime stamp resolution depends on filesystem: FAT32. 2 seconds, NTFS ms, OSX. 1 sec.305 EPS_TIME = 2.01306 # EPS_TIME = 0.1307 def __init__(self, target, rel_path, name, size, mtime, unique):308 super().__init__(target, rel_path, name, size, mtime, unique)309 @staticmethod310 def _eps_compare(date_1, date_2):311 return eps_compare(date_1, date_2, FileEntry.EPS_TIME)312 def is_file(self):313 return True314 def __eq__(self, other):315 same_time = self._eps_compare(self.mtime, other.mtime) == 0316 return (317 other318 and other.__class__ == self.__class__319 and other.name == self.name320 and other.size == self.size321 and same_time322 )323 def __gt__(self, other):324 time_greater = self._eps_compare(self.mtime, other.mtime) > 0325 return (326 other327 and other.__class__ == self.__class__328 and other.name == self.name329 and time_greater330 )331 def get_sync_info(self, key=None):332 """Get mtime/size when this resource was last synchronized with remote."""333 return self.target.get_sync_info(self.name, key)334 def was_modified_since_last_sync(self):335 """Return True if this resource was modified since last sync.336 None is returned if we don't know (because of missing meta data).337 """338 info = self.get_sync_info()339 if not info:340 return None341 if self.size != info["s"]:342 return True343 if self.mtime > info["m"]:344 return True345 return False346# ===============================================================================347# DirectoryEntry348# ===============================================================================349class DirectoryEntry(_Resource):350 def __init__(self, target, rel_path, name, size, mtime, unique):351 super().__init__(target, rel_path, name, size, mtime, unique)352 # Directories don't have a size (that we could reasonably use for classification)353 self.size = 0354 def is_dir(self):...
google.py
Source:google.py
...9# Deployment - Interface10#11config_files = ["update_script.sh"]12machines_in_cluster = ['google_compute']13def copy_remote():14 """15 Copies a script needed on the server for executing the update of the repo!16 """17 source = os.path.join(util.OWN_PATH, 'cloud_specific_files', 'google','compute')18 for machine in machines_in_cluster:19 for file_0 in config_files :20 #scp update_script.sh google_compute:update_script.sh21 command = ["scp",os.path.join(source,file_0), machine+":"+file_0]22 try:23 subprocess.call(command)24 except :25 print("> > > Copy of Deploy_script failed...")26def apply_remote():27 """28 Get repository at remote and commits our changes to the repo.29 """ 30 31 source = os.path.join(util.OWN_PATH, 'cloud_specific_files', 'google','compute')32 remote_repo = "/home/sys/environments/amos/repository"33 #Save old path34 old_path = os.getcwd()35 36 for machine in machines_in_cluster:37 try:38 39 os.chdir(source)40 41 #pull remote repository42 command = ["git","clone",machine+":"+remote_repo,os.path.join(source,"repo_google")]43 subprocess.call(command)44 45 os.chdir(os.path.join(source,"repo"))46 47 #clean our copy of repository48 command = ["rm","-r",".git"]49 subprocess.call(command)50 51 #copy configfiles of google_remote to our repo52 command = ["cp","-f","-r",os.path.join(source,"repo_google",".git"), os.path.join(source,"repo",".git")]53 subprocess.call(command)54 # add everything, I mean really everything in this repo to our next commit55 command = ["git","add","*"]56 subprocess.call(command)57 # Commit all changes58 command = ["git","commit","-a","-m\"I am log.\""]59 subprocess.call(command)60 61 command = ["git","push"]62 subprocess.call(command)63 64 os.chdir(source)65 66 # remove garbage repos67 command = ["rm","-r","repo"]68 subprocess.call(command)69 70 command = ["rm","-r","repo_google"]71 subprocess.call(command)72 except :73 print("> > > Deploy_script failed at " + machine + "...")74 75 os.chdir(old_path)76def deploy_remote():77 """78 Executes Remote script, which we copied via copy_remote79 """80 print('\n> Deploying on systems...')81 for machine in machines_in_cluster: 82 #ssh google_compute "sudo bash update_script.sh branch"83 command = ["ssh", machine,"sudo bash "+config_files[0]]84 try:85 subprocess.call(command)86 except :87 print("> > > Deploy of Deploy_script failed...")88# Implement this function89def deploy():90 util.copy_repo_to_specific('google/compute')91 92 print("Pulling Remote and add changes...")93 apply_remote()94 95 print("Copy scripts to remote ...")96 copy_remote()97 print("Deploy changes...")98 deploy_remote()99# Implement this function100def all_requirements_available():101 available = True102 OWN_FOLDER = os.path.join(util.OWN_PATH, 'cloud_specific_files', 'google','compute')103 for k in config_files :104 if not os.path.exists(os.path.join(OWN_FOLDER, k)):105 available = False...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!