Best Python code snippet using hypothesis
OutputTestResultInMatrix.py
Source:OutputTestResultInMatrix.py
1#!/usr/bin/env python2"""3Examples:4 OutputTestResultInMatrix.py -l 6 -o /tmp/rank_test_call_method_6_m40000.tsv -m 400005 6 OutputTestResultInMatrix.py -l 6,7 -o /tmp/rank_test_call_method_6_7_a.tsv -a7 8 #check TopSNP test results9 OutputTestResultInMatrix.py -l 17 -x /tmp/top_snp_test_call_method_17_g_f200.png -g -y 2 -f 20010 11Description:12 Output Test result in data matrix. either CandidateGeneRankSumTestResult or CandidateGeneTopSNPTest13 In the outputted matrix, -1 = NA. -2 = (pvalue=0).14 15"""16import sys, os, math17#bit_number = math.log(sys.maxint)/math.log(2)18#if bit_number>40: #64bit19sys.path.insert(0, os.path.expanduser('~/lib/python'))20sys.path.insert(0, os.path.join(os.path.expanduser('~/script')))21import getopt, csv, math22import Numeric, cPickle23from pymodule import PassingData, importNumericArray, write_data_matrix, SNPData24import Stock_250kDB25from Stock_250kDB import Snps, SnpsContext, ResultsMethod, GeneList, GeneListType, \26 CandidateGeneTopSNPTest, CandidateGeneRankSumTestResult, AnalysisMethod, PhenotypeMethod, ResultsByGene27from sets import Set28from pymodule.DrawMatrix import drawMatrix, drawLegend, drawContinousLegend, get_font, combineTwoImages, Value2Color29num = importNumericArray()30class OutputTestResultInMatrix(object):31 __doc__ = __doc__32 option_default_dict = {('drivername', 1,):['mysql', 'v', 1, 'which type of database? mysql or postgres', ],\33 ('hostname', 1, ): ['papaya.usc.edu', 'z', 1, 'hostname of the db server', ],\34 ('dbname', 1, ): ['stock_250k', 'd', 1, 'database name', ],\35 ('schema', 0, ): [None, 'k', 1, 'database schema name', ],\36 ('db_user', 1, ): [None, 'u', 1, 'database username', ],\37 ('db_passwd', 1, ): [None, 'p', 1, 'database password', ],\38 ("min_distance", 1, int): [20000, 'm', 1, 'minimum distance allowed from the SNP to gene'],\39 ("get_closest", 0, int): [0, 'g', 0, 'only get genes closest to the SNP within that distance'],\40 ('min_MAF', 1, float): [0.1, 'n', 1, 'minimum Minor Allele Frequency.'],\41 ('max_pvalue_per_gene', 0, int): [0, 'a', 0, 'take the most significant among all SNPs associated with one gene'],\42 ('min_sample_size', 0, int): [5, 'i', 1, 'minimum size for both candidate and non-candidate sets to do wilcox.test'],\43 ('no_of_top_snps', 1, int): [100, 'f', 1, 'For test_result_type=2. how many number of top snps based on score or -log10(pvalue).'],\44 ('call_method_id_ls', 1, ):[None, 'l', 1, 'Restrict results based on these call_methods, coma-separated list of ids.'],\45 ('analysis_method_id_ls', 0, ):[None, 'j', 1, 'Restrict results based on these analysis, coma-separated. Default is no such restriction.'],\46 ("test_result_type", 1, int): [1, 'y', 1, 'Which test result to output. 1: CandidateGeneRankSumTestResult, 2: CandidateGeneTopSNPTest, 3: CandidateGeneRankSumTestResultMethod'],\47 ("super_type_id", 0, int): [1, 'q', 1, 'Super Type ID for candidate gene lists'],\48 ('font_path', 1, ):['/usr/share/fonts/truetype/freefont/FreeSerif.ttf', 'e', 1, 'path of the font used to draw labels'],\49 ('font_size', 1, int):[20, 's', 1, 'size of font, which determines the size of the whole figure.'],\50 ("output_fname", 0, ): [None, 'o', 1, 'Filename to store data matrix'],\51 ("fig_fname", 1, ): [None, 'x', 1, 'File name for the figure'],\52 ("no_of_ticks", 1, int): [15, 't', 1, 'Number of ticks on the legend'],\53 ("test_type", 1, int): [0, 'w', 1, 'which type of test in each result_type. column test_type. Check GeneListRankTest.py or TopSNPTest.py for info.'],\54 ('commit', 0, int):[0, 'c', 0, 'commit the db operation. this commit happens after every db operation, not wait till the end.'],\55 ('debug', 0, int):[0, 'b', 0, 'toggle debug mode'],\56 ('report', 0, int):[0, 'r', 0, 'toggle report, more verbose stdout/stderr.']}57 58 def __init__(self, **keywords):59 """60 2008-07-2461 split results_method_id_ls if it exists, to accomodate MpiGeneListRankTest which removed this option62 2008-07-1063 """64 from pymodule import ProcessOptions65 self.ad = ProcessOptions.process_function_arguments(keywords, self.option_default_dict, error_doc=self.__doc__, class_to_have_attr=self)66 67 def getListTypeInfo(self, db, where_condition):68 """69 2008-08-2970 add -1 as a separator into list_type_id_ls71 """72 sys.stderr.write("Getting list type info ...")73 rows = db.metadata.bind.execute("select distinct c.list_type_id, g.biology_category_id from %s order by g.biology_category_id, list_type_id"\74 %(where_condition))75 list_type_id_ls = []76 prev_biology_category_id = None77 for row in rows:78 if prev_biology_category_id == None:79 prev_biology_category_id = row.biology_category_id80 elif row.biology_category_id!=prev_biology_category_id:81 prev_biology_category_id = row.biology_category_id82 list_type_id_ls.append(-1)83 list_type_id_ls.append(row.list_type_id)84 sys.stderr.write("Done.\n")85 return list_type_id_ls86 87 88 def getAnalysisMethodInfo(self, db, where_condition):89 sys.stderr.write("Getting analysis method info ...")90 rows = db.metadata.bind.execute("select distinct r.analysis_method_id from %s order by analysis_method_id"\91 %(where_condition))92 analysis_method_id_ls = []93 for row in rows:94 analysis_method_id_ls.append(row.analysis_method_id)95 sys.stderr.write("Done.\n")96 return analysis_method_id_ls97 98 def getPhenotypeInfo(self, db, where_condition):99 """100 2008-08-29101 add -1 as a separator into phenotype_method_id_ls and others102 """103 sys.stderr.write("Getting phenotype method info ...")104 rows = db.metadata.bind.execute("select distinct r.phenotype_method_id, p.biology_category_id from %s p, %s and p.id=r.phenotype_method_id order by p.biology_category_id, r.phenotype_method_id"\105 %(PhenotypeMethod.table.name, where_condition))106 phenotype_method_id_ls = []107 phenotype_method_id2index = {}108 phenotype_method_label_ls = []109 prev_biology_category_id = None110 no_of_separators = 0111 for row in rows:112 if prev_biology_category_id == None:113 prev_biology_category_id = row.biology_category_id114 elif row.biology_category_id!=prev_biology_category_id:115 prev_biology_category_id = row.biology_category_id116 #add a blank phenotype id as separator117 no_of_separators += 1118 phenotype_method_id2index[-no_of_separators] = len(phenotype_method_id_ls)119 phenotype_method_id_ls.append(-no_of_separators)120 phenotype_method_label_ls.append('')121 phenotype_method_id2index[row.phenotype_method_id] = len(phenotype_method_id_ls)122 phenotype_method_id_ls.append(row.phenotype_method_id)123 pm = PhenotypeMethod.get(row.phenotype_method_id)124 phenotype_method_label_ls.append('%s_%s'%(pm.id, pm.short_name))125 phenotype_info = PassingData()126 phenotype_info.phenotype_method_id2index = phenotype_method_id2index127 phenotype_info.phenotype_method_id_ls = phenotype_method_id_ls128 phenotype_info.phenotype_method_label_ls = phenotype_method_label_ls129 sys.stderr.write("Done.\n")130 return phenotype_info131 132 def orderListTypeAnalysisMethodID(self, list_type_id_ls, analysis_method_id_ls):133 """134 2008-08-29135 deal with separator (list_type_id=-1) in list_type_id_ls136 """137 sys.stderr.write("Orderinig list type id and analysis_method id ... ")138 list_type_id_analysis_method_id_ls = []139 list_type_id_analysis_method_id2index = {}140 list_type_analysis_method_label_ls = []141 no_of_separators = 0142 for list_type_id in list_type_id_ls:143 if list_type_id==-1: #separator144 no_of_separators += 1145 tup = (-no_of_separators,-1)146 list_type_id_analysis_method_id2index[tup] = len(list_type_id_analysis_method_id_ls)147 list_type_id_analysis_method_id_ls.append(tup)148 list_type_analysis_method_label_ls.append('')149 continue150 list_type_short_name = GeneListType.get(list_type_id).short_name151 for analysis_method_id in analysis_method_id_ls:152 analysis_method_short_name = AnalysisMethod.get(analysis_method_id).short_name153 tup = (list_type_id, analysis_method_id)154 list_type_id_analysis_method_id2index[tup] = len(list_type_id_analysis_method_id_ls)155 list_type_id_analysis_method_id_ls.append(tup)156 list_type_analysis_method_label_ls.append('%s_%s_%s'%(analysis_method_short_name, list_type_short_name, list_type_id))157 return_data = PassingData()158 return_data.list_type_id_analysis_method_id_ls = list_type_id_analysis_method_id_ls159 return_data.list_type_id_analysis_method_id2index = list_type_id_analysis_method_id2index160 return_data.list_type_analysis_method_label_ls = list_type_analysis_method_label_ls161 sys.stderr.write("Done.\n")162 return return_data163 164 def get_data_matrix(self, db, phenotype_info, list_type_analysis_method_info, where_condition):165 sys.stderr.write("Getting data matrix ...")166 data_matrix = num.zeros([len(list_type_analysis_method_info.list_type_id_analysis_method_id2index), len(phenotype_info.phenotype_method_id2index)], num.float)167 data_matrix[:] = -1168 i = 0169 rows = db.metadata.bind.execute("select r.analysis_method_id, r.phenotype_method_id, c.* from %s order by analysis_method_id"\170 %(where_condition))171 min_value = None172 max_value = None173 for row in rows:174 tup = (row.list_type_id, row.analysis_method_id)175 row_index = list_type_analysis_method_info.list_type_id_analysis_method_id2index[tup]176 col_index = phenotype_info.phenotype_method_id2index[row.phenotype_method_id]177 if row.pvalue>0:178 data_value = -math.log10(row.pvalue)179 if min_value==None:180 min_value = data_value181 elif data_value<min_value:182 min_value = data_value183 184 if max_value==None:185 max_value=data_value186 elif data_value>max_value:187 max_value =data_value188 else:189 data_value = -2 #0 pvalue190 data_matrix[row_index, col_index] = data_value191 sys.stderr.write("Done.\n")192 return_data = PassingData()193 return_data.data_matrix = data_matrix194 return_data.min_value = min_value195 return_data.max_value = max_value196 return return_data197 198 def markDataMatrixBoundary(self, data_matrix, phenotype_info, list_type_analysis_method_info):199 """200 2008-09-01201 mark boundary between different groups of rows and columns202 all those separators have id <0 and mark them with a value different from NA_value, -3.203 """204 sys.stderr.write("Marking data matrix boundaries ...")205 for row_id in list_type_analysis_method_info.list_type_id_analysis_method_id2index:206 if row_id[0]<0: #row_id is a tuple207 row_index = list_type_analysis_method_info.list_type_id_analysis_method_id2index[row_id]208 data_matrix[row_index,:] = -3209 for col_id in phenotype_info.phenotype_method_id2index:210 if col_id<0:211 col_index = phenotype_info.phenotype_method_id2index[col_id]212 data_matrix[:, col_index] = -3213 sys.stderr.write("Done.\n")214 return data_matrix215 216 def run(self): 217 if self.debug:218 import pdb219 pdb.set_trace()220 db = Stock_250kDB.Stock_250kDB(drivername=self.drivername, username=self.db_user,221 password=self.db_passwd, hostname=self.hostname, database=self.dbname, schema=self.schema)222 db.setup()223 session = db.session224 225 if self.test_result_type==1:226 test_result_class_table = CandidateGeneRankSumTestResult.table.name227 test_result_class_table = 'candidate_gene_rank_sum_test_result_2008_09_15'228 elif self.test_result_type==2:229 test_result_class_table = CandidateGeneTopSNPTest.table.name230 elif self.test_result_type==3:231 test_result_class_table = Stock_250kDB.CandidateGeneRankSumTestResultMethod.table.name232 else:233 sys.stderr.write(" test_result_type %s not supported.\n"%(self.test_result_type))234 sys.exit(2)235 #the condition for min_MAF is tricky because of the floating precision.236 if self.test_result_type==1:237 where_condition = "%s r, %s c, %s g where g.id=c.list_type_id and r.analysis_method_id is not null \238 and c.results_id=r.id and c.get_closest=%s and c.min_distance=%s and abs(c.min_MAF-%s)<0.00001"\239 %(ResultsMethod.table.name, test_result_class_table, GeneListType.table.name, self.get_closest, self.min_distance, self.min_MAF)240 elif self.test_result_type==2:241 where_condition = "%s r, %s rg, %s c, %s g where g.id=c.list_type_id and r.analysis_method_id is not null and r.id=rg.results_method_id \242 and c.results_id=rg.id and c.get_closest=%s and c.min_distance=%s and abs(c.min_MAF-%s)<0.00001"\243 %(ResultsMethod.table.name, ResultsByGene.table.name, test_result_class_table, GeneListType.table.name, self.get_closest, self.min_distance, self.min_MAF)244 elif self.test_result_type==3:245 where_condition = "%s r, %s c, %s g where g.id=c.list_type_id and r.analysis_method_id is not null \246 and c.results_id=r.id and c.get_closest=%s and c.min_distance=%s and abs(c.min_MAF-%s)<0.00001"\247 %(ResultsMethod.table.name, test_result_class_table, GeneListType.table.name, self.get_closest, self.min_distance, self.min_MAF)248 if self.call_method_id_ls:249 where_condition += " and r.call_method_id in (%s)"%self.call_method_id_ls250 251 if self.analysis_method_id_ls:252 where_condition += " and r.analysis_method_id in (%s)"%self.analysis_method_id_ls253 if self.super_type_id:254 where_condition += " and g.super_type_id=%s"%self.super_type_id255 256 if self.test_type:257 where_condition += " and c.test_type=%s"%self.test_type258 259 if self.test_result_type==1:260 pass261 where_condition += " and c.max_pvalue_per_gene=%s"%(self.max_pvalue_per_gene)262 elif self.test_result_type==2:263 where_condition += " and c.no_of_top_snps=%s"%(self.no_of_top_snps) 264 265 list_type_id_ls = self.getListTypeInfo(db, where_condition)266 analysis_method_id_ls = self.getAnalysisMethodInfo(db, where_condition)267 list_type_analysis_method_info = self.orderListTypeAnalysisMethodID(list_type_id_ls, analysis_method_id_ls)268 phenotype_info = self.getPhenotypeInfo(db, where_condition)269 rdata = self.get_data_matrix(db, phenotype_info, list_type_analysis_method_info, where_condition)270 271 rdata.data_matrix = self.markDataMatrixBoundary(rdata.data_matrix, phenotype_info, list_type_analysis_method_info)272 273 header = ['list_type_analysis_method', ''] + phenotype_info.phenotype_method_label_ls274 strain_acc_list = list_type_analysis_method_info.list_type_analysis_method_label_ls275 category_list = list_type_analysis_method_info.list_type_id_analysis_method_id_ls276 if SNPData.isDataMatrixEmpty(rdata.data_matrix):277 sys.stderr.write("Nothing fetched from database.\n")278 sys.exit(3)279 if self.output_fname:280 write_data_matrix(rdata.data_matrix, self.output_fname, header, strain_acc_list, category_list)281 282 if self.fig_fname:283 font = get_font(self.font_path, font_size=self.font_size) #2008-08-01284 value2color_func = lambda x: Value2Color.value2HSLcolor(x, rdata.min_value, rdata.max_value)285 im_legend = drawContinousLegend(rdata.min_value, rdata.max_value, self.no_of_ticks, value2color_func, font)286 #im.save('%s_legend.png'%self.fig_fname_prefix)287 im = drawMatrix(rdata.data_matrix, value2color_func, list_type_analysis_method_info.list_type_analysis_method_label_ls,\288 phenotype_info.phenotype_method_label_ls, with_grid=1, font=font)289 im = combineTwoImages(im, im_legend, font=font)290 im.save(self.fig_fname)291 292if __name__ == '__main__':293 from pymodule import ProcessOptions294 main_class = OutputTestResultInMatrix295 po = ProcessOptions(sys.argv, main_class.option_default_dict, error_doc=main_class.__doc__)296 297 instance = main_class(**po.long_option2value)...
test.py
Source:test.py
1import argparse2import collections3import itertools4import json5import os6import re7import subprocess as sp8import sys9DEFAULT_TIME_LIMIT = 210TEST_RESULT_TYPE = {11 'pass': 'OK ',12 'file_not_found': " ? ",13 'compile_error': 'CE ',14 'wrong_answer': 'WA ',15 'runtime_error': 'RE ',16 'time_limit_exceed': 'TLE',17 'memory_limit_exceed': 'MLE',18}19MEMORY_LIMIT = 51220COMPILE_OUT = 'a.out'21RUN_OUT = 'b.out'22ERROR_OUT = 'e.out'23CPP_COMPILE_CMD = ['g++', '-O2', '-static', '-o', COMPILE_OUT, '']24RUN_CMD = {25 'cpp': ['/usr/bin/time', '--verbose', './' + COMPILE_OUT],26 'py': ['/usr/bin/time', '--verbose', 'python3', '', '<', '']27}28COMPARE_CMD = ['diff', '-Z', RUN_OUT, '']29TEST_HARNESS_CMD = ['python3', '', '', RUN_OUT, '']30FAIL_SCRIPT = 'Test Failed.\n Error Code: {}\n'31class _TestEntry:32 __slots__ = {'setid', 'caseid', 'fin', 'fout', 'rtype', 'time', 'memory'}33 def __init__(self, setid, caseid):34 self.setid = setid35 self.caseid = caseid36 self.fin = self.fout = self.rtype = self.time = self.memory = None37 38 def __lt__(self, other):39 if self.setid != other.setid:40 return self.setid < other.setid41 42 return self.caseid < other.caseid43 44 def __repr__(self):45 rep = {46 'setid': self.setid,47 'caseid': self.caseid,48 'fin': self.fin,49 'fout': self.fout,50 'rtype': self.rtype,51 'time': self.time, 52 'memory': self.memory53 }54 return json.dumps(rep, separators=(',', ':'), indent=2)55def find_test_cases(folder):56 files = os.listdir(folder)57 entry_dict = {}58 for f in files:59 name = f60 if f.endswith('.in'):61 out = False62 f = f[:-3]63 elif f.endswith('.out'):64 out = True65 f = f[:-4]66 else:67 continue68 69 if f in entry_dict:70 if not entry_dict[f].fout:71 entry_dict[f].fout = os.path.join(folder, name)72 else:73 entry_dict[f].fin = os.path.join(folder, name)74 else:75 i = f.find('.')76 f = f[i+1:]77 if '.' in f:78 i = f.find('.')79 setid, caseid = f[:i], f[i+1:]80 elif '-' in f:81 i = f.find('-')82 setid, caseid = f[:i], f[i+1:]83 elif f.startswith('sample'):84 setid, caseid = 'sample', f[6:]85 if not caseid:86 caseid = 187 elif f.startswith('samp'):88 setid, caseid = 'sample', f[4:]89 if not caseid:90 caseid = 191 elif re.match(r'(\d+)([a-z])', f):92 setid, caseid = f[:-1], ord(f[-1]) - ord('a') + 193 else:94 setid, caseid = f, 195 96 setid = int(setid) if setid != 'sample' else 097 caseid = int(caseid) if caseid != 'sample' else 198 entry = _TestEntry(setid, caseid)99 if out:100 entry.fout = os.path.join(folder, name)101 else:102 entry.fin = os.path.join(folder, name)103 entry_dict[name[:name.rfind('.')]] = entry104 105 entries = sorted(list(entry_dict.values()))106 group, setid = [], ''107 for entry in entries:108 if entry.setid != setid:109 group.append({110 'setid': entry.setid,111 'cases': []112 })113 setid = entry.setid114 115 group[-1]['cases'].append(entry)116 117 return group118def compile(src):119 if src.endswith('.py'):120 RUN_CMD['py'][-3] = src121 return True, []122 123 CPP_COMPILE_CMD[-1] = src124 proc = sp.Popen(CPP_COMPILE_CMD, stderr=sp.PIPE)125 err = [line.decode('utf-8').strip() for line in proc.stderr]126 success = os.path.isfile(COMPILE_OUT)127 return success, err128def run(case_group, lang, limit):129 total = passed = 0130 cmd = RUN_CMD[lang]131 for group in case_group:132 setid = 'sample' if group['setid'] == 0 else group['setid']133 print(f'Test Set {setid}')134 success = True135 for case in group['cases']:136 if args.early_stop and not success:137 print(f'Case #{case.caseid:02d}: -- | Runtime: --, Memory: --')138 continue139 stats = {}140 proc = None141 with open(case.fin, 'r') as fin, open(RUN_OUT, 'w') as fout, open(ERROR_OUT, 'w') as ferr:142 try:143 if lang == 'py':144 cmd[-1] = case.fin145 proc = sp.run(cmd, stdin=fin, stdout=fout, stderr=ferr, timeout=limit+1)146 except sp.TimeoutExpired:147 stats['time'] = limit+1148 stats['memory'] = -1149 150 151 with open(ERROR_OUT, 'r') as f:152 for line in f:153 line = line.strip()154 if line.startswith('User time'):155 stats['time'] = float(line[line.rfind(':')+1:])156 elif line.startswith('Maximum resident set size'):157 stats['memory'] = float(line[line.rfind(':')+1:]) / 1000158 159 case.time = stats['time']160 case.memory = stats['memory']161 if proc is not None and proc.returncode != 0:162 if proc.stderr:163 print(proc.stderr.decode('utf-8'))164 if proc.stdout:165 print(proc.stderr.decode('utf-8'))166 case.rtype = TEST_RESULT_TYPE['runtime_error']167 elif stats['time'] > limit:168 case.rtype = TEST_RESULT_TYPE['time_limit_exceed']169 elif stats['memory'] > MEMORY_LIMIT:170 case.rtype = TEST_RESULT_TYPE['memory_limit_exceed']171 else:172 ok = test(case)173 case.rtype = TEST_RESULT_TYPE['pass'] if ok else TEST_RESULT_TYPE['wrong_answer']174 175 ok = case.rtype == TEST_RESULT_TYPE['pass']176 success &= ok177 178 if case.rtype == TEST_RESULT_TYPE['time_limit_exceed']:179 time = f'{limit:.2f}+'180 memory = ' -- '181 else:182 time = f'{case.time:.2f}s'183 memory = f'{case.memory:.3f}'184 print(f'Case #{case.caseid:02d}: {case.rtype} | Runtime: {time}, Memory: {memory}MB')185 print()186 187 group['status'] = 'Pass' if success else 'Fail'188 if setid != 'sample':189 total += 1190 if success:191 passed += 1192 print(f'Total {passed}/{total} test sets passed.')193 194def test(case):195 fout = case.fout196 if args.tol != -1:197 return _test_num_close(RUN_OUT, fout)198 elif args.harness:199 return _test_harness(args.harness, case.fin, RUN_OUT, fout)200 else:201 return _test_diff(fout)202def _test_diff(sol):203 COMPARE_CMD[-1] = sol204 proc = sp.run(COMPARE_CMD, stdout=sp.PIPE, stderr=sp.PIPE)205 if proc.stdout:206 print(proc.stdout.decode('utf-8'))207 if proc.stderr:208 print(proc.stderr.decode('utf-8'))209 210 return not proc.stderr and not proc.stdout211def _test_num_close(ans, sol):212 with open(ans, 'r') as fans, open(sol, 'r') as fsol:213 for la, ls in itertools.zip_longest(iter(fans), iter(fsol), fillvalue=''):214 215 if not la or not ls:216 return False217 try:218 la, ls = float(la), float(ls)219 if abs(la - ls) > args.tol:220 return False221 except Exception:222 return False223 return True224def _test_harness(test_file, input_file, ans, sol):225 TEST_HARNESS_CMD[1] = test_file226 TEST_HARNESS_CMD[2] = input_file227 TEST_HARNESS_CMD[-1] = sol228 proc = sp.run(TEST_HARNESS_CMD, stdout=sp.PIPE, stderr=sp.PIPE)229 if proc.stdout:230 print(proc.stdout.decode('utf-8'))231 if proc.stderr:232 print(proc.stderr.decode('utf-8'))233 return not proc.stderr and not proc.stdout234def print_result(case_group):235 # not used236 total = ok = 0237 for group in case_group:238 setid = 'Sample' if group['setid'] == 0 else group['setid']239 print(f'Test Set {setid}: {group["status"]}')240 if setid != 'Sample':241 total += 1242 if (group['status'] == 'Pass'):243 ok += 1244 for case in group['cases']:245 print(f'Case #{case.caseid}: {case.rtype} | Runtime: {case.time:.2f}s, Memory: {case.memory:.3f}MB')246 print()247 248 print(f'Total {ok}/{total} test sets passed.')249def cleanup(compile_file=True):250 if compile_file and os.path.exists(COMPILE_OUT):251 os.remove(COMPILE_OUT)252 if os.path.exists(RUN_OUT):253 os.remove(RUN_OUT)254 if os.path.exists(ERROR_OUT):255 os.remove(ERROR_OUT)256def check(src, test_folder, lang, limit):257 # cleanup old files258 cleanup()259 # file src file260 if not os.path.isfile(src):261 print(f"File {os.path.abspath(src)} does not exist.")262 263 # list test cases264 if not os.path.isdir(test_folder):265 print(f"Path {os.path.abspath(test_folder)} does not exist or is not a folder.")266 case_group = find_test_cases(test_folder)267 # compile src if needed268 success, error = compile(src)269 if error:270 for line in error:271 print(line)272 if not success:273 print(FAIL_SCRIPT.format(TEST_RESULT_TYPE['compile_error']))274 sys.exit(1)275 # run src on test cases276 run(case_group, lang, limit)277 # cleanup, keep compile files in case manual debugging is needed278 cleanup(args.cleanup)279if __name__ == "__main__":280 parser = argparse.ArgumentParser()281 parser.add_argument('--src', '-s', type=str, help='source program file', required=True)282 parser.add_argument('--data', '-d', type=str, help='test folder', required=True)283 parser.add_argument('--lang', '-l', type=str, help='language used. Now support cpp and py', required=True)284 parser.add_argument('--tol', '-t', type=float, help='Threshold to numeric answers.', required=False, default=-1)285 parser.add_argument('--harness', '-H', type=str, help='program to test correctness', required=False)286 parser.add_argument('--timeout', '-T', type=float, help='timeout limit on each test case.', required=False, default=2)287 parser.add_argument('--cleanup', '-c', type=str, help='delete compiled file after finishing the testing', required=False, default='')288 parser.add_argument('--early-stop', '-e', type=str, help='stop current test set when one case if failed', default='')289 args = parser.parse_args()290 if args.lang not in {'cpp', 'py'}:291 print('Language only support cpp or py now.')292 sys.exit(1)293 if args.tol != -1 and args.harness:294 print('tol and harness cannot be set together.')295 sys.exit(1)...
test__mysql.py
Source:test__mysql.py
1import pytest2from MySQLdb import _mysql3def test_result_type():4 with pytest.raises(TypeError):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!