Best Python code snippet using autotest_python
s3.py
Source:s3.py
...72 def clear_state(self, s_app_dir, s_exchange_basename, s_symbol, s_odf_basename):73 (s_odf_bucket, s_odf_remote_path) = self.get_odf_remote_path(s_exchange_basename, 74 s_odf_basename)7576 if self.remote_path_exists(s_odf_bucket, s_odf_remote_path):77 self.remote_remove(s_odf_remote_path)7879 self.clear_fce_state(s_app_dir, s_exchange_basename, s_symbol, s_odf_basename)8081 def clear_fce_state(self, s_app_dir, s_exchange_basename, s_symbol, s_odf_basename):82 # Delete tmp path, fce output directory on s3 and chunk output directory on s383 fce_pathspec = self.get_fce_pathspec(s_app_dir, s_exchange_basename, s_symbol, s_odf_basename)8485 (s_fce_local_dir, s_fce_local_path) = self.get_fce_local_path(self, fce_pathspec, fce_pathspec.s_fce_header_filename)8687 if self.local_path_exists(s_fce_local_path):88 self.local_remove(s_fce_local_path)89 90 (s_fce_bucket, s_fce_remote_path) = self.get_fce_remote_path(self, fce_pathspec, fce_pathspec.s_fce_header_filename)91 92 if self.remote_path_exists(s_fce_bucket, s_fce_remote_path):93 self.remote_remove(s_fce_bucket, s_fce_remote_path)94 95 def get_odf_remote_path(self, s_exchange_basename, s_odf_basename):9697 s_odf_remote_path = self.remote_make_path(self.s_bucket, s_exchange_basename, "odf", s_odf_basename)98 s_odf_remote_path = '.'.join([s_odf_remote_path, "rs3"])99 s_odf_bucket = self.remote_bucket(s_odf_remote_path)100101 if not self.remote_bucket_exists(s_odf_bucket):102 self.remote_make_bucket(s_odf_bucket)103104 return (s_odf_bucket, s_odf_remote_path)105106 def save_odf(self, s_exchange_basename, s_odf_basename, odf_obj):107 108 (s_odf_local_dir, s_odf_local_path) = self.get_odf_local_path(s_exchange_basename, s_odf_basename)109 110 if not self.local_path_exists(s_odf_local_dir):111 self.local_make_dirs(s_odf_local_dir)112113 odf_obj.to_bin_file(s_odf_local_path)114 115 (s_odf_bucket, s_odf_remote_path) = self.get_odf_remote_path(s_exchange_basename, s_odf_basename)116 117 self.upload_file(s_odf_local_path, s_odf_bucket, s_odf_remote_path)118 119 return True 120121 '''122 def get_fce_pathspec(self, s_app_dir,123 s_exchange_basename,124 s_symbol,125 s_odf_basename):126 return context.FCEContext(s_app_dir,127 s_exchange_basename,128 s_symbol,129 s_odf_basename)130 '''131132 def get_fce_local_path(self, ctx, s_fce_header_filename):133 s_fce_local_path = self.lsep.join([ctx.s_fce_local_dir, "18", 134 str(ctx.odf_jsunnoon), s_fce_header_filename])135 136 s_fce_local_dir = self.local_dirname(s_fce_local_path)137 return (s_fce_local_dir, s_fce_local_path)138 139 def get_fce_remote_path(self, ctx, s_fce_header_filename):140 141 s_fce_remote_path = self.remote_make_path(self.s_bucket, ctx.s_fce_remote_prefix, "18", 142 str(ctx.odf_jsunnoon), s_fce_header_filename)143 144 s_fce_bucket = self.remote_bucket(s_fce_remote_path)145 #log.debug(s_fce_bucket)146 #log.debug(s_fce_remote_path)147 return (s_fce_bucket, s_fce_remote_path)148149 def open_fce(self, ctx, s_fce_header_filename, key=None):150151 (s_fce_local_dir, s_fce_local_path) = self.get_fce_local_path(ctx, s_fce_header_filename)152153 if not self.local_path_exists(s_fce_local_path):154 (s_fce_bucket, s_fce_remote_path) = self.get_fce_remote_path(ctx, s_fce_header_filename) 155 156 if self.remote_path_exists(s_fce_bucket, s_fce_remote_path):157 if not self.local_path_exists(s_fce_local_dir):158 self.local_make_dirs(s_fce_local_dir)159 self.download_file(s_fce_bucket, s_fce_remote_path, s_fce_local_path)160 else:161 return None162 163 fp_bin = open(s_fce_local_path, "rb")164165 fce_obj = fce.FCE()166167 fce_obj.read_bin_stream(fp_bin, key)168 169 fp_bin.close()170171 return fce_obj172173 def fce_exists(self, ctx, s_fce_header_filename):174 175 (s_fce_local_dir, s_fce_local_path) = self.get_fce_local_path(ctx, s_fce_header_filename)176 177 if self.local_path_exists(s_fce_local_path):178 return True179180 (s_fce_bucket, s_fce_remote_path) = self.get_fce_remote_path(ctx, s_fce_header_filename)181 182 if self.remote_path_exists(s_fce_bucket, s_fce_remote_path):183 if not self.local_path_exists(s_fce_local_dir):184 self.local_make_dirs(s_fce_local_dir)185 self.download_file(s_fce_bucket, s_fce_remote_path, s_fce_local_path)186 return True187188 return False189190 def save_fce(self, ctx, s_fce_header_filename, fce_obj, key=None, b_save_csv=False):191 192 # First save the fce in the tmp directory193 (s_fce_local_dir, s_fce_local_path) = self.get_fce_local_path(ctx, s_fce_header_filename)194195 if not self.local_path_exists(s_fce_local_dir):196 self.local_make_dirs(s_fce_local_dir)197198 fce_obj.to_bin_file(s_fce_local_path, key)199 200 if b_save_csv:201 fce_obj.to_csv_file(s_fce_local_path + ".csv")202 203 # copy the tmp file to S3 directory204 (s_fce_bucket, s_fce_remote_path) = self.get_fce_remote_path(ctx, s_fce_header_filename)205 206 #log.debug(s_fce_bucket)207 #log.debug(s_fce_remote_path)208 self.upload_file(s_fce_local_path, s_fce_bucket, s_fce_remote_path)209 210211 def get_chunk_file_local_path(self, ctx, L_no, fce_jsunnoon, s_chunk_file_name):212 213 s_chunk_file_local_path = self.lsep.join([ctx.s_fce_local_dir, str(L_no), str(fce_jsunnoon), s_chunk_file_name])214 215 s_chunk_local_dir = self.local_dirname(s_chunk_file_local_path)216 217 if not self.local_path_exists(s_chunk_local_dir):218 self.local_make_dirs(s_chunk_local_dir)219 220 return (s_chunk_local_dir, s_chunk_file_local_path)221222 def get_chunk_file_remote_path(self, ctx, L_no, fce_jsunnoon, s_chunk_file_name):223 224 s_chunk_file_remote_path = self.remote_make_path(self.s_bucket, ctx.s_fce_remote_prefix, str(L_no), str(fce_jsunnoon), s_chunk_file_name)225 226 s_chunk_file_bucket = self.remote_bucket(s_chunk_file_remote_path)227 228 if not self.remote_bucket_exists(s_chunk_file_bucket):229 try:230 self.remote_make_bucket(s_chunk_file_bucket)231 except:232 log.error(s_chunk_file_bucket)233 log.error(s_chunk_file_remote_path)234 raise235 236 return (s_chunk_file_bucket, s_chunk_file_remote_path)237238 def chunk_file_exists(self, ctx, L_no, fce_jsunnoon, s_chunk_file_name):239 240 (s_chunk_file_local_dir, s_chunk_file_local_path) = self.get_chunk_file_local_path(ctx, L_no, fce_jsunnoon, s_chunk_file_name)241 242 if self.local_path_exists(s_chunk_file_local_path):243 return True244245 (s_chunk_file_bucket, s_chunk_file_remote_path) = self.get_chunk_file_remote_path(ctx, L_no, fce_jsunnoon, s_chunk_file_name)246 247 if self.remote_path_exists(s_chunk_file_bucket, s_chunk_file_remote_path):248 if not self.local_path_exists(s_chunk_file_local_dir):249 self.local_make_dirs(s_chunk_file_local_dir)250 self.download_file(s_chunk_file_bucket, s_chunk_file_remote_path, s_chunk_file_local_path)251 return True252253 return False254255 def open_chunk_file(self, ctx, L_no, fce_jsunnoon, s_chunk_file_name, chunk_size, key):256 257 (s_chunk_file_local_dir, s_chunk_file_local_path) = self.get_chunk_file_local_path(ctx, L_no, fce_jsunnoon, s_chunk_file_name)258 259 chunk_arr_short = chunk.read_short_chunk_array(s_chunk_file_local_path, s_chunk_file_name, chunk_size, key)260 261 return chunk_arr_short262 263 def save_chunk_file(self, ctx, L_no, fce_jsunnoon, chunk_array, key=None, b_do_csv_chunk=False):264 265 s_chunk_file_name = chunk_array.get_name()266 267 # First save the fce in the tmp directory268 (s_chunk_file_local_dir, s_chunk_file_local_path) = self.get_chunk_file_local_path(ctx, L_no, fce_jsunnoon, s_chunk_file_name)269270 if not self.local_path_exists(s_chunk_file_local_dir):271 self.local_make_dirs(s_chunk_file_local_dir)272273 chunk_array.to_bin_file_short(s_chunk_file_local_path, key)274 275 if b_do_csv_chunk:276 s_chunk_csv_local_path = '.'.join([s_chunk_file_local_path, "csv"])277 #log.debug("Saving chunk CSV: %s..." % (s_chunk_csv_local_path))278 chunk_array.save_csv(s_chunk_csv_local_path)279 280 # copy the tmp file to S3 directory281 (s_chunk_file_bucket, s_chunk_file_remote_path) = self.get_chunk_file_remote_path(ctx, L_no, fce_jsunnoon, s_chunk_file_name)282 283 self.upload_file(s_chunk_file_local_path, s_chunk_file_bucket, s_chunk_file_remote_path)284 285 def download_file(self, s_bucket, s_remote_file_path, s_local_file_path):286 287 s_local_dir = self.local_dirname(s_local_file_path)288 289 if not self.local_path_exists(s_local_dir):290 self.local_make_dirs(s_local_dir)291 292 self.download(s_bucket, s_remote_file_path, s_local_file_path)293294 def upload_file(self, s_local_file_path, s_bucket, s_remote_file_path):295 296 if not self.remote_bucket_exists(s_bucket):297 self.remote_make_bucket(s_bucket)298 299 self.upload(s_local_file_path, s_bucket, s_remote_file_path)300301 def move_up(self, s_local_file_path, s_bucket, s_remote_file_path):302 303 self.upload_file(s_local_file_path, s_bucket, s_remote_file_path)304 305 self.local_remove(s_local_file_path)306307 def move_down(self, s_bucket, s_remote_file_path, s_local_file_path):308 309 self.download_file(s_bucket, s_remote_file_path, s_local_file_path)310 311 self.remote_remove(s_bucket, s_remote_file_path)312313314class S3Store(AbstractFileStore):315 316 rsep = "/"317318 def __init__(self, s_bucket, s_aws_access_id, s_aws_secret_access_key, s_aws_region):319 super(S3Store, self).__init__()320 self.connection = boto.s3.connection.S3Connection(s_aws_access_id,321 s_aws_secret_access_key)322 self.s_bucket = s_bucket323324 def get_key(self, s_bucket, s_path):325 bucket = boto.s3.bucket.Bucket(connection=self.connection, name=s_bucket)326 key = boto.s3.key.Key(bucket)327 key.key = s_path328 return key329 330 def remote_make_path(self, s_bucket, *kargs):331 return self.rsep.join(kargs)332 333 def remote_abspath(self, s_bucket):334 return s_bucket335 336 def remote_path_exists(self, s_bucket, s_path):337 key = self.get_key(s_bucket, s_path)338 return key.exists()339 340 def remote_rmtree(self, s_bucket, s_path):341 self.remote_remove(s_bucket, s_path)342343 def remote_remove(self, s_bucket, s_path):344 key = self.get_key(s_bucket, s_path)345 key.delete()346 347 def remote_bucket(self, s_path):348 return self.s_bucket349 350 def remote_bucket_exists(self, s_bucket):351 bucket = boto.s3.bucket.Bucket(connection=self.connection, name=s_bucket)352 if not bucket:353 return False354 return True355 356 def remote_make_bucket(self, s_path):357 pass358 359 def upload(self, s_local_file_path, s_bucket, s_remote_file_path):360 key = self.get_key(s_bucket, s_remote_file_path)361 key.set_contents_from_filename(s_local_file_path)362 return key363364 def download(self, s_bucket, s_remote_file_path, s_local_file_path):365 key = self.get_key(s_bucket, s_remote_file_path)366 key.get_contents_to_filename(s_local_file_path)367 return key368369 370class LocalS3Store(AbstractFileStore):371 372 rsep = os.sep373 374 def __init__(self, s_remote_root):375 self.s_bucket = self.remote_abspath(s_remote_root)376 super(LocalS3Store, self).__init__()377378 def remote_make_path(self, s_bucket, *kargs):379 ls_args = [s_bucket]380 ls_args = ls_args + list(kargs)381 return self.rsep.join(ls_args)382 383 def remote_abspath(self, s_bucket):384 return os.path.abspath(s_bucket)385 386 def remote_path_exists(self, s_bucket, s_path):387 return os.path.exists(s_path)388 389 def remote_rmtree(self, s_path):390 return shutil.rmtree(s_path)391392 def remote_remove(self, s_bucket, s_path):393 return os.remove(s_path)394 395 def remote_bucket(self, s_path):396 return os.path.dirname(s_path)397 398 def remote_bucket_exists(self, s_bucket):399 return os.path.exists(s_bucket)400
...
path.py
Source:path.py
...35 if os.path.exists(s):36 return s37 else:38 raise argparse.ArgumentTypeError(f"readable_dir:{s} is not a valid path")39 def remote_path_exists(self, s):40 re_path = "(/[a-zA-Z0-9_\s]+)*" # e.g. /home/user/Documents41 re_rem = "([a-zA-Z0-9]+:)" # e.g. GoogleDrive:42 if re.fullmatch('{}{}'.format(re_rem, re_path), s) == None:43 raise argparse.ArgumentTypeError(f'{s} is not a valid remote path')44 remote = s.split(':')[0]45 output = os.popen("rclone listremotes --long")46 remote_list = output.read().split()47 if remote + ":" in remote_list[::2]:48 pos = remote_list.index(remote + ":")49 self.remote_type = remote_list[pos+1]50 if self.remote_type != "drive":51 raise argparse.ArgumentError(f"The type of the given remote {remote} is not 'drive', and therefore currently not supported")52 else: raise argparse.ArgumentTypeError(f'{remote} is not a valid remote')53 return s...
hpcutils.py
Source:hpcutils.py
...19def Generate2mass(field_name):20def ListLCDir(remote_path, stfp):21 sftp.chdir(remote_path)22 return stfp.listdir()23def remote_path_exists(path, stfp):24 try:25 stfp.stat(path)26 return True27 except IOError, e:28 return False29def LoadRemoteKeylist(field_name, stfp):30 fname = remote_keylist_fname(field_name)31 32 if not remote_path_exists(fname, stfp): return None33 34 stfp.get(fname, '%s/keylist_field%s.txt'%(keylist_dir, field_name)35 return np.loadtxt(fname, dtype=keylist_dt)36def FindLightcurvePathOnDella(ID,MPI_COMM):37 with closing(SSHClient()) as ssh:38 ssh.load_system_host_keys() #NOTE: no AutoAddPolicy() 39 ssh.connect(d['hostname'], username=d.get('user'))40 with closing(ssh.open_sftp()) as sftp:41 for field in field_info:42 lcpath = "%s/%s.tfalc"%(field_info[field]['path'], ID)43 if remote_path_exists(lcpath, stfp): return lcpath...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!