Best Python code snippet using avocado_python
bputilities.py
Source:bputilities.py
...178fi179set +v180qstat181"""182 scriptfile = make_temp_script(script)183 try:184 output = subprocess.check_output([scriptfile])185 result = []186 for line in output.split("\n"):187 fields = [x.strip() for x in line.strip().split(" ")]188 try:189 if len(fields) > 0:190 result.append(int(fields[0]))191 except:192 pass193 finally:194 os.rmdir(scriptfile)195 return result196 197def make_temp_script(script):198 '''Write a script to a tempfile199 '''200 host_fd, host_scriptfile = tempfile.mkstemp(suffix=".sh") 201 os.fchmod(host_fd, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)202 os.write(host_fd, script)203 os.close(host_fd)204 return host_scriptfile205 206def run_on_tgt_os(script, 207 group_name, 208 job_name, 209 queue_name, 210 output,211 err_output = None,212 priority = None,213 cwd=None, 214 deps=None,215 mail_before = False,216 mail_error = True,217 mail_after = True,218 email_address = None,219 task_range=None,220 memory=None):221 '''Run the given script on the target operating system222 223 script - the script to be run with shebang header line224 (e.g. #!/bin/sh)225 group_name - charge to this group226 job_name - name of the job227 queue_name - run on this queue228 output - send stdout to this file229 err_output - send stderr to this file230 priority - the priority # for the job231 cwd - change to this directory on remote machine to run script232 deps - a list of job IDs to wait for before starting this one233 mail_before - true to send email before job starts234 mail_error - true to send email on error235 mail_after - true to send email after job finishes236 email_address - address of email recipient237 task_range - for array jobs, a slice giving start / stop / step for238 task numbering239 '''240 if deps is not None:241 dep_cond = "-hold_jid %s" % (",".join(deps))242 else:243 dep_cond = ""244 if cwd is not None:245 cwd_switch = "-wd %s" % pipes.quote(cwd)246 else:247 cwd_switch = ""248 if email_address is None or not any([mail_before, mail_error, mail_after]):249 email_switches = ""250 else:251 email_events = "".join([x for x, y in (("b", mail_before),252 ("e", mail_error),253 ("a", mail_after))254 if y])255 email_switches = "-m %(email_events)s -M '%(email_address)s'" % locals()256 257 if err_output is None:258 err_output = output+".err"259 if queue_name is None:260 queue_switch = ""261 else:262 queue_switch = "-q %s" % pipes.quote(queue_name)263 if task_range is None:264 task_switch = ""265 else:266 step = task_range.step267 if step is not None:268 task_switch = "-t %d-%d:%d" % (269 task_range.start, task_range.stop-1, task_range.step)270 else:271 task_switch = "-t %d-%d" % (task_range.start, task_range.stop-1)272 if memory is not None:273 memory_switch = "-l m_mem_free=%dg" % memory274 else:275 memory_switch = ""276 if priority is not None:277 priority_switch = "-p %d" %priority278 else:279 priority_switch = ""280 optional_switches = " ".join(filter(len, [281 email_switches, cwd_switch, queue_switch, task_switch, memory_switch,282 priority_switch]))283 tgt_script = make_temp_script(script)284 host_script = make_temp_script("""#!/bin/sh285qsub -N %(job_name)s \\286 -e '%(err_output)s' \\287 -o '%(output)s' \\288 -terse %(dep_cond)s %(optional_switches)s \\289 %(tgt_script)s290""" %locals())291 try:292 p = subprocess.Popen(293 host_script, stdout=subprocess.PIPE, stderr = subprocess.PIPE)294 stdout, stderr = p.communicate(script)295 if p.returncode != 0:296 raise RuntimeError("Failed to submit job: %s" % stderr)297 if task_range is not None:298 stdout = stdout.partition(".")[0]299 return int(stdout.strip())300 finally:301 os.unlink(host_script)302 os.unlink(tgt_script)303def requeue_job(job_id):304 host_script = make_temp_script("""#!/bin/sh305if [ -e "$HOME/.batchprofiler.sh" ]; then306. "$HOME/.batchprofiler.sh"307fi308qmod -cj %d309""" % job_id)310 try:311 p = subprocess.Popen(312 host_script, stdout=subprocess.PIPE, stderr = subprocess.PIPE)313 stdout, stderr = p.communicate()314 return stdout, stderr315 finally:316 os.unlink(host_script)317def kill_job(job_id):318 host_script = make_temp_script("""#!/bin/sh319if [ -e "$HOME/.batchprofiler.sh" ]; then320. "$HOME/.batchprofiler.sh"321fi322qdel %d323""" % job_id)324 try:325 p = subprocess.Popen(326 host_script, stdout=subprocess.PIPE, stderr = subprocess.PIPE)327 stdout, stderr = p.communicate(host_script)328 return stdout, stderr329 finally:330 os.unlink(host_script)331 332def kill_jobs(job_ids):333 host_script = make_temp_script("""#!/bin/sh334if [ -e "$HOME/.batchprofiler.sh" ]; then335. "$HOME/.batchprofiler.sh"336fi337qdel %s338""" % " ".join(map(str, job_ids)))339 try:340 p = subprocess.Popen(341 host_script, stdout=subprocess.PIPE, stderr = subprocess.PIPE)342 stdout, stderr = p.communicate(host_script)343 return stdout, stderr344 finally:345 os.unlink(host_script)346 347def kill_tasks(job_id, task_ids):348 host_script = make_temp_script("""#!/bin/sh349if [ -e "$HOME/.batchprofiler.sh" ]; then350. "$HOME/.batchprofiler.sh"351fi352qdel %d -t %s353""" % (job_id, ",".join(map(str, task_ids))))354 try:355 p = subprocess.Popen(356 host_script, stdout=subprocess.PIPE, stderr = subprocess.PIPE)357 stdout, stderr = p.communicate(host_script)358 return stdout, stderr359 finally:360 os.unlink(host_script)361 362 ...
select.py
Source:select.py
...235 if answ not in inst.list():236 print "Installation %s does not exist"%selection237 continue238 break239 prep.make_temp_script(ADOPT_HOME,answ,ADOPT_SOURCE,ADOPT_SIDE,ADOPT_SUBSYSTEM)240 print "\nCreated file: install_environment.sh\n"241 242 if dryrun: sys.exit()243 if interactive:244 while 1:245 selection=raw_input("\nYour selection? ")246 if not selection: 247 selection=raw_input("Your selection (press return again to exit)? ")248 if not selection: sys.exit()249 250 if selection not in inst.list():251 print "Installation %s does not exist"%selection252 continue253 break...
2pass_dvd_iso_to_xvid
Source:2pass_dvd_iso_to_xvid
...18 os.remove(os.path.join(self.workingdir, file))19 os.rmdir(self.workingdir)20 def __del__(self):21 self.cleanup()22 def make_temp_script(self, cmd):23 ret = os.path.join(self.workingdir, ".tmp-script")24 fp = open(ret, "w")25 fp.write("#!/bin/bash\n" + cmd + "\n")26 fp.close()27 os.chmod(ret, 0777)28 return ret29 def build_fixed_quant_cmd(self):30 ret = "nice -n 19 mencoder dvd://" + str(self.title) + " -dvd-device "\31 + self.inputfile + " -oac mp3lame -lameopts q=5 -ovc xvid "\32 + " -xvidencopts fixed_quant=4 -vf pp=ci -o " + self.outputfile\33 + " &> " + self.outputfile + ".log1"34 return ret35 36 def build_first_pass_cmd(self):37 ret = "nice -n 19 mencoder dvd://" + str(self.title) + " -dvd-device "\38 + self.inputfile + " -oac mp3lame -lameopts q=5 -ovc xvid "\39 + " -xvidencopts pass=1 -vf pp=ci -o " + self.outputfile\40 + " &> " + self.outputfile + ".log1"41 return ret42 def build_second_pass_cmd(self):43 ret = "nice -n 19 mencoder dvd://" + str(self.title) + " -dvd-device "\44 + self.inputfile + " -oac mp3lame -lameopts q=5 -ovc xvid "\45 + " -xvidencopts pass=2:bitrate=" + str(self.bitrate) + " -vf pp=ci -o "\46 + self.outputfile + " &> " + self.outputfile + ".log2"47 return ret48 def do_encode(self):49 ret = 050 if len(self.outputfile) == 0:51 self.outputfile = os.path.splitext(self.inputfile)[0] + ".avi"52 if len(self.workingdir) == 0:53 self.workingdir = os.path.splitext(self.inputfile)[0] + "_working"54 parts = os.path.split(self.workingdir)55 self.workingdir = os.path.join(parts[0], "." + parts[1])56 if not os.path.isdir(self.workingdir):57 os.mkdir(self.workingdir)58 os.chdir(self.workingdir)59 #cmd = self.make_temp_script(self.build_fixed_quant_cmd())60 #ret = os.system(cmd)61 cmd = self.make_temp_script(self.build_first_pass_cmd())62 ret = os.system(cmd)63 if ret:64 return ret65 cmd = self.make_temp_script(self.build_second_pass_cmd())66 return os.system(cmd)67def usage():68 print "Usage: " + os.path.basename(sys.argv[0])\69 + " -i <input iso file> {-o output file} {-w working dir}"70 print " -t {title} -lf {listfile} -j {jobs}"71 print " if not specified, output file name and working dir"72 print " are determined from the input file name"73 sys.exit(0)74def rip_dvd():75 lastarg = ""76 knownargs = ["-i", "-o", "-w", "-h", "-t", "-lf", "-j", "-br"]77 e = Encoder()78 listfile = ""79 jobs = 1...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!