Best Python code snippet using autotest_python
zeus.py
Source:zeus.py
...121 if var is True:122 return return_code, res123 else:124 return return_code125def _remote_scp(src, dst, way, out=None):126 if way == "get":127 cmd = [128 "scp",129 "-r",130 "{user}@{host}:{src}".format(user=username, host=hostname, src=src),131 "{dst}".format(dst=dst),132 ]133 elif way == "put":134 cmd = [135 "scp",136 "-r",137 "{src}".format(src=src),138 "{user}@{host}:{dst}".format(user=username, host=hostname, dst=dst),139 ]140 else:141 if out:142 out.append_stdout("Scp wrapper not used properly\n")143 else:144 print("Scp wrapper not used properly\n")145 return -1146 popen = subprocess.Popen(147 cmd,148 stdout=subprocess.PIPE,149 stderr=subprocess.PIPE,150 universal_newlines=True,151 )152 for stdout_line in iter(popen.stdout.readline, ""):153 if out:154 out.append_stdout(stdout_line.strip() + "\n")155 else:156 print(stdout_line.strip())157 popen.stdout.close()158 return_code = popen.wait()159 if return_code:160 if out:161 for stdout_line in iter(popen.stderr.readline, ""):162 out.append_stdout(stdout_line.strip() + "\n")163 else:164 for stdout_line in iter(popen.stderr.readline, ""):165 print(stdout_line.strip())166 return return_code167def _remote_bsub(script_path, out=None):168 cmd = [169 "ssh",170 "{user}@{host}".format(user=username, host=hostname),171 "bsub < {cmd}".format(cmd=script_path),172 ]173 popen = subprocess.Popen(174 cmd,175 stdout=subprocess.PIPE,176 stderr=subprocess.PIPE,177 universal_newlines=True,178 )179 jobid = -1180 for stdout_line in iter(popen.stdout.readline, ""):181 if "is submitted to queue" in stdout_line:182 res = re.match(r"^.*<([0-9]*)>.*$", stdout_line)183 if res:184 jobid = res.group(1)185 if out:186 out.append_stdout(stdout_line.strip() + "\n")187 else:188 print(stdout_line.strip() + "\n")189 popen.stdout.close()190 return_code = popen.wait()191 if return_code:192 for stdout_line in iter(popen.stderr.readline, ""):193 if out:194 out.append_stdout(stdout_line.strip() + "\n")195 else:196 print(stdout_line.strip() + "\n")197 return return_code, jobid198def _remote_bjobs(jobid, out):199 status_str = "bjobs -o stat {jobid} | tail -n 1".format(jobid=jobid)200 cmd = [201 "ssh",202 "{user}@{host}".format(user=username, host=hostname),203 "{cmd}".format(cmd=status_str),204 ]205 popen = subprocess.Popen(206 cmd,207 stdout=subprocess.PIPE,208 stderr=subprocess.PIPE,209 universal_newlines=True,210 )211 status = None212 for stdout_line in iter(popen.stdout.readline, ""):213 status = stdout_line214 popen.stdout.close()215 return_code = popen.wait()216 if return_code:217 for stdout_line in iter(popen.stderr.readline, ""):218 out.append_stdout(stdout_line.strip() + "\n")219 return return_code, status220def execute(cmd):221 """Execute a command remotely on Zeus.222 Parameters223 ----------224 cmd : str225 Command to be executed on Zeus.226 Returns227 -------228 None229 Examples230 --------231 >>> zeus.execute('ls ~/')232 """233 _remote_cmd(cmd)234 return None235def get(src, dst):236 """Download a file from Zeus.237 Copy remote file specified in `src` from Zeus to the local path238 specified in `dst`.239 Parameters240 ----------241 src : str242 Path of the remote file to be downloaded from Zeus.243 dst : str244 Local path/name where to copy the file from Zeus.245 Returns246 -------247 None248 Examples249 --------250 >>> zeus.get('/users_home/user/file.nc', '/home/user/file.nc')251 """252 _remote_scp(src, dst, "get")253 return None254def put(src, dst):255 """Upload a file to Zeus.256 Copy a local file specified in `src` to the remote path specified in257 `dst` on Zeus.258 Parameters259 ----------260 src : str261 Path of the local file to be uploaded on Zeus.262 dst : str263 Path/name where to copy the file on Zeus.264 Returns265 -------266 None267 Examples268 --------269 >>> zeus.put('/home/user/file.nc', '/users_home/user/file.nc')270 """271 _remote_scp(src, dst, "put")272 return None273def process(script_name, data, compress=False, frequency=10):274 """Submit a script on LSF and download the output data.275 This function uploads the local script specified in `script_name`,276 executes the script on LSF (i.e., bsub < script_name) and downloads the277 output specified in `data` once the job is completed.278 Output data can be optionally compressed as tar.gz before downloading it.279 Job status checking frequency can be adjusted based on the expected job280 duration.281 The function is non-blocking, which means that other cells in the notebook282 can be executed right after this function is started, without needing to283 wait for the job to end.284 Parameters285 ----------286 script_name : src287 Absolute (local) path of the script to be submitted on LSF.288 data : src289 Absolute (remote) path of data to be downloaded.290 compress : bool, optional291 If data downloaded has to be compressed or not (default is False).292 frequency : int, optional293 Interval for checking job status in seconds (default is 10s).294 Returns295 -------296 None297 Examples298 --------299 >>> zeus.process('/home/user/script.lsf', '/users_home/user/output.nc',300 compress=True, frequency=20)301 """302 # Perform initial checks303 if not os.path.isabs(script_name) or not os.path.isfile(script_name):304 print("Please specify the absolute path of the script file")305 return None306 if not os.path.isabs(data):307 print("Please specify the absolute path of the remote data file")308 return None309 def thread_func(script_name, data, compress, out):310 if _remote_scp(script_name, os.path.basename(script_name), "put", out):311 out.append_stdout(312 "Something went wrong while uploading the script on Zeus"313 )314 return None315 ret, jobid = _remote_bsub(os.path.basename(script_name), out)316 if ret or int(jobid) < 0:317 out.append_stdout(318 "Something went wrong while running the script on LSF"319 )320 return None321 old_status = ""322 while True:323 ret, status = _remote_bjobs(jobid, out)324 if ret or not status:325 out.append_stdout(326 "Something went wrong while checking the job status on LSF"327 )328 return None329 if status and status != old_status:330 out.append_stdout("Job status is: " + status)331 old_status = status332 if "DONE" in status or "EXIT" in status:333 break334 time.sleep(frequency)335 if "EXIT" in status:336 out.append_stdout("Job execution was unsuccessful")337 return None338 if _remote_cmd("ls " + data, out):339 out.append_stdout("Remote data path not found")340 return None341 if compress:342 if _remote_cmd("mkdir -p %s" % tmp_path, out):343 out.append_stdout(344 "Something went wrong while compressing the file"345 )346 return None347 if _remote_cmd(348 "tar -zcf %s%s.tar.gz %s"349 % (tmp_path, os.path.basename(data), data),350 out,351 ):352 out.append_stdout(353 "Something went wrong while compressing the file"354 )355 return None356 local_path = os.path.join(357 os.environ["PWD"], os.path.basename(data) + ".tar.gz"358 )359 remote_path = os.path.join(360 tmp_path, os.path.basename(data) + ".tar.gz "361 )362 else:363 local_path = os.path.join(os.environ["PWD"], os.path.basename(data))364 remote_path = data365 if _remote_scp(remote_path, local_path, "get", out):366 out.append_stdout(367 "Something went wrong while getting the output data from Zeus"368 )369 return None370 if compress:371 _remote_cmd(372 "rm %s%s.tar.gz" % (tmp_path, os.path.basename(data)), out373 )374 out.append_stdout("Data has been downloaded in: " + local_path)375 return None376 display("Starting Job execution")377 out = widgets.Output()378 display(out)379 thread = threading.Thread(380 target=thread_func, args=(script_name, data, compress, out)381 )382 thread.start()383 return None384def info(jobid=None):385 """Get status of LSF jobs on Zeus.386 This function returns the list of the jobs the user submitted on LSF. If387 `jobid` argument is provided then only the status of that particular job388 is shown, otherwise all recent jobs will be listed.389 Parameters390 ----------391 jobid : int, optional392 ID of LSF job.393 Returns394 -------395 None396 Examples397 --------398 >>> zeus.info()399 >>> zeus.info(1234)400 """401 if jobid is not None:402 cmd = "bjobs " + str(jobid)403 else:404 cmd = "bjobs -a"405 _remote_cmd(cmd)406 return None407def start_dask(408 project,409 cores,410 memory,411 name="Dask-Test",412 processes=None,413 queue="p_short",414 local_directory="~/dask-space",415 interface="ib0",416 walltime=None,417 job_extra=None,418 env_extra=None,419 log_directory="~/dask-space",420 death_timeout=60,421 n_workers=1,422):423 """Start a new Dask cluster on Zeus424 This function starts a new Dask scheduler on the cluster front-end node425 and a set of worker process on the cluster compute nodes. The function426 returns a ready-to-use Dask client object. The arguments defined in the427 interface are lent from the Dask Jobqueue interface.428 Parameters429 ----------430 project : str431 Accounting string associated with each worker job. Passed to432 `#BSUB -P` option.433 cores : int434 Number of cores for the worker nodes. Passed to `#BSUB -n` option.435 memory: str436 Total amount of memory per worker job. Passed to `#BSUB -M` option.437 name: str, optional438 Name of Dask workers. By default set to Dask-test.439 processes: int, optional440 Cut the job up into this many processes. Good for GIL workloads or for441 nodes with many cores. By default, process ~= sqrt(cores) so that the442 number of processes and the number of threads per process is roughly443 the same.444 queue: str, optional445 Destination queue for each worker job. Passed to #BSUB -q option. By446 default `p_short` queue is used.447 local_directory: str, optional448 Dask worker local directory for file spilling. By default the folder449 `dask-space` in the home directory is used.450 interface: str, optional451 Network interface like `eth0` or `ib0`. This will be used for the Dask452 workers interface. By default `ib0` is used.453 walltime: str, optional454 Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option. If455 not specified the default queue walltime is used.456 job_extra: list, optional457 List of optional LSF options, for example -x. Each option will be458 prepended with the #BSUB prefix.459 env_extra: list, optional460 Optional commands to add to script before launching worker.461 log_directory: str, optional462 Directory to use for job scheduler logs. By default the folder463 `dask-space` in the home directory is used.464 death_timeout: float, optional465 Seconds to wait for a scheduler before closing workers (default is 60).466 n_workers : int, optional467 Number of worker process to startup, i.e. jobs on LSF (default is 1).468 Returns469 -------470 dask.distributed.Client471 A ready-to-use Dask distributed client connected to the scheduler472 Examples473 --------474 >>> client = zeus.start_dask(475 project="R000",476 cores=36,477 memory="80 GB",478 name="Test",479 processes=12,480 local_directory="~/dask-space",481 interface="ib0",482 walltime="00:30",483 job_extra=["-x"],484 n_workers=1485 )486 Create a new cluster with a single worker on a whole Zeus node, using 12487 processes (3 threads/process), 80GB of RAM memory requested. Note that each488 process will get a maximum of 80/12GB of memory489 """490 # default values491 shebang = "#!/bin/bash"492 python = "python3"493 def lsf_format_bytes_ceil(n, lsf_units="mb"):494 """ Format bytes as text495 Convert bytes to megabytes which LSF requires.496 Parameters497 ----------498 n: int499 Bytes500 lsf_units: str501 Units for the memory in 2 character shorthand, kb through eb502 Examples503 --------504 >>> lsf_format_bytes_ceil(1234567890)505 '1235'506 """507 # Adapted from dask_jobqueue lsf.py508 units = {509 "B": 1,510 "KB": 10 ** 3,511 "MB": 10 ** 6,512 "GB": 10 ** 9,513 "TB": 10 ** 12,514 }515 number, unit = [string.strip() for string in n.split()]516 lsf_units = lsf_units.lower()[0]517 converter = {"k": 1, "m": 2, "g": 3, "t": 4, "p": 5, "e": 6, "z": 7}518 return "%d" % math.ceil(519 float(number) * units[unit] / (1000 ** converter[lsf_units])520 )521 def create_scheduler_script(522 shebang, python, name, log_directory, env_extra523 ):524 sched_script_lines = []525 sched_script_lines.append("%s" % shebang)526 """527 sched_script_lines.append("")528 sched_script_lines.append("#BSUB -J scheduler_%s" % name)529 sched_script_lines.append("#BSUB -e %s/scheduler_%s-%%J.err" % (log_directory, name))530 sched_script_lines.append("#BSUB -o %s/scheduler_%s-%%J.out" % (log_directory, name))531 sched_script_lines.append("#BSUB -q %s" % scheduler_queue)532 sched_script_lines.append("#BSUB -P %s" % project)533 memory_string = lsf_format_bytes_ceil(scheduler_memory)534 sched_script_lines.append("#BSUB -M %s" % memory_string)535 if scheduler_cores > 36:536 scheduler_cores = 36537 print("Worker cores specification for LSF higher than available, initializing it to %s" % scheduler_cores)538 sched_script_lines.append("#BSUB -n %s" % scheduler_cores)539 if scheduler_cores > 1:540 sched_script_lines.append('#BSUB -R "span[hosts=1]"')541 if walltime is not None:542 sched_script_lines.append("#BSUB -W %s" % walltime)543 if job_extra is not None:544 sched_script_lines.extend(["#BSUB %s" % arg for arg in job_extra])545 """546 # Zeus specific lines547 sched_script_lines.append("")548 sched_script_lines.append("module load anaconda/3.7")549 sched_script_lines.append("source activate %s" % conda_env)550 if env_extra is not None:551 sched_script_lines.extend(["%s" % arg for arg in env_extra])552 # Executable lines553 sched_exec = "%s -m distributed.cli.dask_scheduler" % python554 sched_exec += (555 " --port 0 --dashboard-address 0 --scheduler-file %s/connection"556 " --idle-timeout 3600"557 % local_directory558 )559 sched_exec += " --interface ens2f1"560 sched_exec += " >> %s/scheduler_%s.log 2>&1 &" % (log_directory, name)561 sched_script_lines.append(sched_exec)562 sched_script = "\n".join(sched_script_lines)563 return sched_script564 def create_worker_script(565 shebang,566 name,567 log_directory,568 project,569 worker_queue,570 worker_memory,571 worker_cores,572 walltime,573 job_extra,574 env_extra,575 interface,576 processes,577 death_timeout,578 sched_ip,579 ):580 if log_directory[0:1] == "~":581 log_directory = log_directory.replace("~", home)582 worker_script_lines = []583 worker_script_lines.append("%s" % shebang)584 worker_script_lines.append("")585 worker_script_lines.append("#BSUB -J dask_worker_%s" % name)586 worker_script_lines.append(587 "#BSUB -e %s/worker_%s-%%J.err" % (log_directory, name)588 )589 worker_script_lines.append(590 "#BSUB -o %s/worker_%s-%%J.out" % (log_directory, name)591 )592 worker_script_lines.append("#BSUB -q %s" % worker_queue)593 worker_script_lines.append("#BSUB -P %s" % project)594 memory_string = lsf_format_bytes_ceil(worker_memory)595 worker_script_lines.append("#BSUB -M %s" % memory_string)596 if worker_cores > 36:597 worker_cores = 36598 print(599 "Worker cores specification for LSF higher than available, "600 "initializing it to %s" % worker_cores601 )602 worker_script_lines.append("#BSUB -n %s" % worker_cores)603 if worker_cores > 1:604 worker_script_lines.append('#BSUB -R "span[hosts=1]"')605 if walltime is not None:606 worker_script_lines.append("#BSUB -W %s" % walltime)607 if job_extra is not None:608 worker_script_lines.extend(["#BSUB %s" % arg for arg in job_extra])609 # Python env specific lines610 worker_script_lines.append("")611 worker_script_lines.append("module load anaconda/3.7")612 worker_script_lines.append("source activate %s" % conda_env)613 if env_extra is not None:614 worker_script_lines.extend(["%s" % arg for arg in env_extra])615 # Executable lines616 worker_exec = "%s -m distributed.cli.dask_worker %s" % (617 python,618 sched_ip,619 )620 worker_exec += " --local-directory %s" % local_directory621 worker_exec += " --interface %s" % interface622 # Detect memory, processes and threads per each worker623 if processes is None:624 processes = max(math.floor(math.sqrt(worker_cores)), 1)625 threads = max(math.floor(float(worker_cores) / processes), 1)626 mem = float(memory_string) / processes627 worker_exec += (628 " --nthreads %i --nprocs %i --memory-limit %.2fMB --name 0 --nanny"629 " --death-timeout %i" % (threads, processes, mem, death_timeout)630 )631 worker_script_lines.append(worker_exec)632 worker_script = "\n".join(worker_script_lines)633 return worker_script634 def delete_tmp_files(local_path, remote_path):635 local_file = os.path.join(local_path, "scheduler.sh")636 if os.path.exists(local_file):637 os.remove(local_file)638 local_file = os.path.join(local_path, "worker.lsf")639 if os.path.exists(local_file):640 os.remove(local_file)641 local_file = os.path.join(local_path, "connection")642 if os.path.exists(local_file):643 os.remove(local_file)644 _remote_cmd("rm %s{%s,%s}" % (tmp_path, "scheduler.sh", "worker.lsf"))645 return None646 def run_scheduler(local_path, remote_path, local_directory, sched_script):647 timeout = 20648 local_file = os.path.join(local_path, "scheduler.sh")649 remote_file = os.path.join(remote_path, "scheduler.sh")650 with open(local_file, "w") as sched_file:651 sched_file.write(sched_script)652 if _remote_scp(local_file, remote_file, "put"):653 print("Error while copying scripts to Zeus")654 return None655 if _remote_cmd("/bin/bash %s" % remote_file):656 print("Something went wrong while executing Dask scheduler script")657 delete_tmp_files(local_path, remote_path)658 stop_dask()659 return None660 # Check connection file availablility661 i = 0662 ret = -1663 while i < timeout:664 time.sleep(1)665 ret = _remote_cmd("ls %s/connection" % local_directory)666 if ret == 0:667 break668 i += 1669 if ret != 0:670 print("Unable to retrieve Dask scheduler address")671 delete_tmp_files(local_path, remote_path)672 stop_dask()673 return None674 local_file = os.path.join(local_path, "connection")675 remote_file = "%s/connection" % local_directory676 if _remote_scp(remote_file, local_file, "get"):677 print("Error while copying files from Zeus")678 delete_tmp_files(local_path, remote_path)679 stop_dask()680 return None681 # Read connection info682 import json683 sched_address = None684 with open(local_file) as f:685 data = json.load(f)686 if "address" in data:687 sched_address = data["address"]688 if sched_address is None:689 print(690 "Something went wrong while retreiving Dask scheduler address"691 )692 delete_tmp_files(local_path, remote_path)693 stop_dask()694 return None695 return sched_address696 def run_workers(local_path, remote_path, n_workers, worker_script):697 local_file = os.path.join(local_path, "worker.lsf")698 remote_file = os.path.join(remote_path, "worker.lsf")699 with open(local_file, "w") as worker_file:700 worker_file.write(worker_script)701 if _remote_scp(local_file, remote_file, "put"):702 print("Error while copying scripts to Zeus")703 delete_tmp_files(local_path, remote_path)704 stop_dask()705 return None706 if n_workers < 1:707 n_workers = 1708 # Run worker scripts709 job_array = []710 job_num = 0711 for i in range(0, n_workers):712 if job_num > 0:713 if _remote_cmd(714 "sed -i 's/--name %i/--name %i/g' %s"715 % (job_num - 1, job_num, remote_file)...
remote.py
Source:remote.py
...194 time.sleep(2)195 # Timeout expired; try one more time but don't catch exceptions196 return remote_login(client, host, port, username, password, prompt,197 linesep, log_filename, internal_timeout)198def _remote_scp(session, password_list, transfer_timeout=600, login_timeout=20):199 """200 Transfer file(s) to a remote host (guest) using SCP. Wait for questions201 and provide answers. If login_timeout expires while waiting for output202 from the child (e.g. a password prompt), fail. If transfer_timeout expires203 while waiting for the transfer to complete, fail.204 @brief: Transfer files using SCP, given a command line.205 @param session: An Expect or ShellSession instance to operate on206 @param password_list: Password list to send in reply to the password prompt207 @param transfer_timeout: The time duration (in seconds) to wait for the208 transfer to complete.209 @param login_timeout: The maximal time duration (in seconds) to wait for210 each step of the login procedure (i.e. the "Are you sure" prompt or211 the password prompt)212 @raise SCPAuthenticationError: If authentication fails213 @raise SCPTransferTimeoutError: If the transfer fails to complete in time214 @raise SCPTransferFailedError: If the process terminates with a nonzero215 exit code216 @raise SCPError: If some other error occurs217 """218 password_prompt_count = 0219 timeout = login_timeout220 authentication_done = False221 scp_type = len(password_list)222 while True:223 try:224 match, text = session.read_until_last_line_matches(225 [r"[Aa]re you sure", r"[Pp]assword:\s*$", r"lost connection"],226 timeout=timeout, internal_timeout=0.5)227 if match == 0: # "Are you sure you want to continue connecting"228 logging.debug("Got 'Are you sure...', sending 'yes'")229 session.sendline("yes")230 continue231 elif match == 1: # "password:"232 if password_prompt_count == 0:233 logging.debug("Got password prompt, sending '%s'" %234 password_list[password_prompt_count])235 session.sendline(password_list[password_prompt_count])236 password_prompt_count += 1237 timeout = transfer_timeout238 if scp_type == 1:239 authentication_done = True240 continue241 elif password_prompt_count == 1 and scp_type == 2:242 logging.debug("Got password prompt, sending '%s'" %243 password_list[password_prompt_count])244 session.sendline(password_list[password_prompt_count])245 password_prompt_count += 1246 timeout = transfer_timeout247 authentication_done = True248 continue249 else:250 raise SCPAuthenticationError("Got password prompt twice",251 text)252 elif match == 2: # "lost connection"253 raise SCPError("SCP client said 'lost connection'", text)254 except aexpect.ExpectTimeoutError, e:255 if authentication_done:256 raise SCPTransferTimeoutError(e.output)257 else:258 raise SCPAuthenticationTimeoutError(e.output)259 except aexpect.ExpectProcessTerminatedError, e:260 if e.status == 0:261 logging.debug("SCP process terminated with status 0")262 break263 else:264 raise SCPTransferFailedError(e.status, e.output)265def remote_scp(command, password_list, log_filename=None, transfer_timeout=600,266 login_timeout=20):267 """268 Transfer file(s) to a remote host (guest) using SCP.269 @brief: Transfer files using SCP, given a command line.270 @param command: The command to execute271 (e.g. "scp -r foobar root@localhost:/tmp/").272 @param password_list: Password list to send in reply to a password prompt.273 @param log_filename: If specified, log all output to this file274 @param transfer_timeout: The time duration (in seconds) to wait for the275 transfer to complete.276 @param login_timeout: The maximal time duration (in seconds) to wait for277 each step of the login procedure (i.e. the "Are you sure" prompt278 or the password prompt)279 @raise: Whatever _remote_scp() raises280 """281 logging.debug("Trying to SCP with command '%s', timeout %ss",282 command, transfer_timeout)283 if log_filename:284 output_func = utils_misc.log_line285 output_params = (log_filename,)286 else:287 output_func = None288 output_params = ()289 session = aexpect.Expect(command,290 output_func=output_func,291 output_params=output_params)292 try:293 _remote_scp(session, password_list, transfer_timeout, login_timeout)294 finally:295 session.close()296def scp_to_remote(host, port, username, password, local_path, remote_path,297 limit="", log_filename=None, timeout=600):298 """299 Copy files to a remote host (guest) through scp.300 @param host: Hostname or IP address301 @param username: Username (if required)302 @param password: Password (if required)303 @param local_path: Path on the local machine where we are copying from304 @param remote_path: Path on the remote machine where we are copying to305 @param limit: Speed limit of file transfer.306 @param log_filename: If specified, log all output to this file307 @param timeout: The time duration (in seconds) to wait for the transfer...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!