Best Python code snippet using lemoncheesecake
test_ncbi.py
Source:test_ncbi.py
1#!/usr/bin/python2__author__ = "tomkinsc@broadinstitute.org"3# built-ins4import unittest5import os6import argparse7import pickle8import shutil9import tempfile10from collections import OrderedDict11import logging12# module-specific13from test import TestCaseWithTmp14import ncbi15import util.file16log = logging.getLogger(__name__)17skip_test = True18@unittest.skipIf(skip_test, "test is marked to be skipped")19class TestNcbiFetch(TestCaseWithTmp):20 def setUp(self):21 super(TestNcbiFetch, self).setUp()22 # these are Orungo accessions23 self.accessions = ["JQ610675.1", "JQ610676.1", "JQ610677.1", "JQ610678.1", "JQ610679.1", "JQ610680.1",24 "JQ610681.1", "JQ610682.1", "JQ610683.1", "JQ610684.1"]25 self.myInputDir = util.file.get_test_input_path(self)26 def perform_download_and_check(self, parser_func, additional_args, expected_files, null_files):27 temp_dir = tempfile.gettempdir()28 args = ["viral-ngs-test@example.com", temp_dir]29 args.extend(self.accessions)30 args.extend(additional_args)31 args = parser_func(argparse.ArgumentParser()).parse_args(args)32 args.func_main(args)33 # check that each file that each expected file was downloaded34 # and that the contents match what they should be35 for fileName in expected_files:36 createdFilePath = os.path.join(temp_dir, fileName)37 log.info("createdFilePath: {}".format(createdFilePath))38 assert os.path.exists(39 createdFilePath), "File that should have been created does not exist: %s" % createdFilePath40 self.assertEqualContents(createdFilePath, os.path.join(self.myInputDir, fileName))41 for fileName in null_files:42 shouldNotExistFilePath = os.path.join(temp_dir, fileName)43 assert not os.path.exists(44 shouldNotExistFilePath), "File exists but it should not: %s" % shouldNotExistFilePath45class TestFastaFetch(TestNcbiFetch):46 def setUp(self):47 super(TestFastaFetch, self).setUp()48 @unittest.skipIf(skip_test, "test is marked to be skipped")49 def test_download(self):50 args = []51 expectedFiles = [a + ".fasta" for a in self.accessions]52 null_files = []53 self.perform_download_and_check(ncbi.parser_fetch_fastas,54 additional_args=args,55 expected_files=expectedFiles,56 null_files=null_files)57 @unittest.skipIf(skip_test, "test is marked to be skipped")58 def test_concat(self):59 args = ["--combinedFilePrefix", "orungo"]60 expectedFiles = ["orungo.fasta"]61 null_files = []62 self.perform_download_and_check(ncbi.parser_fetch_fastas,63 additional_args=args,64 expected_files=expectedFiles,65 null_files=null_files)66 @unittest.skipIf(skip_test, "test is marked to be skipped")67 def test_removal_of_intermediates(self):68 args = ["--combinedFilePrefix", "orungo", "--removeSeparateFiles"]69 expectedFiles = ["orungo.fasta"]70 null_files = [a + ".fasta" for a in self.accessions]71 self.perform_download_and_check(ncbi.parser_fetch_fastas,72 additional_args=args,73 expected_files=expectedFiles,74 null_files=null_files)75 @unittest.skipIf(skip_test, "test is marked to be skipped")76 def test_individual_preexistance(self):77 # since the arguments are positional, including an accession here makes a duplicate that should78 # raise an Error79 args = [self.accessions[0]]80 args.extend(["--combinedFilePrefix", "orungo"])81 expectedFiles = ["orungo.fasta"]82 null_files = []83 with self.assertRaises(AssertionError):84 self.perform_download_and_check(ncbi.parser_fetch_fastas,85 additional_args=args,86 expected_files=expectedFiles,87 null_files=null_files)88 @unittest.skipIf(skip_test, "test is marked to be skipped")89 def test_combined_preexistance(self):90 args = ["--combinedFilePrefix", "orungo"]91 expectedFiles = ["orungo.fasta"]92 null_files = []93 # call once to create the combined file94 self.perform_download_and_check(ncbi.parser_fetch_fastas,95 additional_args=args,96 expected_files=expectedFiles,97 null_files=null_files)98 # an error should be raised the second time the call is made99 with self.assertRaises(AssertionError):100 self.perform_download_and_check(ncbi.parser_fetch_fastas,101 additional_args=args,102 expected_files=expectedFiles,103 null_files=null_files)104 @unittest.skipIf(skip_test, "test is marked to be skipped")105 def test_overwrite(self):106 args = ["--combinedFilePrefix", "orungo", "--forceOverwrite"]107 expectedFiles = ["orungo.fasta"]108 null_files = []109 # call once to create the combined file110 self.perform_download_and_check(ncbi.parser_fetch_fastas,111 additional_args=args,112 expected_files=expectedFiles,113 null_files=null_files)114 # no error should be raised the second time the call is made115 self.perform_download_and_check(ncbi.parser_fetch_fastas,116 additional_args=args,117 expected_files=expectedFiles,118 null_files=null_files)119 @unittest.skipIf(skip_test, "test is marked to be skipped")120 def test_different_file_extension(self):121 args = ["--fileExt", "fa", "--combinedFilePrefix", "orungo"]122 expectedFiles = [a + ".fa" for a in self.accessions]123 expectedFiles.append("orungo.fa")124 null_files = []125 self.perform_download_and_check(ncbi.parser_fetch_fastas,126 additional_args=args,127 expected_files=expectedFiles,128 null_files=null_files)129class TestFeatureTableFetch(TestNcbiFetch):130 def setUp(self):131 super(TestFeatureTableFetch, self).setUp()132 @unittest.skipIf(skip_test, "test is marked to be skipped")133 def test_download(self):134 args = []135 expectedFiles = [a + ".tbl" for a in self.accessions]136 null_files = []137 self.perform_download_and_check(ncbi.parser_fetch_feature_tables,138 additional_args=args,139 expected_files=expectedFiles,140 null_files=null_files)141 @unittest.skipIf(skip_test, "test is marked to be skipped")142 def test_concat(self):143 args = ["--combinedFilePrefix", "orungo"]144 expectedFiles = ["orungo.tbl"]145 null_files = []146 self.perform_download_and_check(ncbi.parser_fetch_feature_tables,147 additional_args=args,148 expected_files=expectedFiles,149 null_files=null_files)150 @unittest.skipIf(skip_test, "test is marked to be skipped")151 def test_removal_of_intermediates(self):152 args = ["--combinedFilePrefix", "orungo", "--removeSeparateFiles"]153 expectedFiles = ["orungo.tbl"]154 null_files = [a + ".tbl" for a in self.accessions]155 self.perform_download_and_check(ncbi.parser_fetch_feature_tables,156 additional_args=args,157 expected_files=expectedFiles,158 null_files=null_files)159 @unittest.skipIf(skip_test, "test is marked to be skipped")160 def test_individual_preexistance(self):161 # since the arguments are positional, including an accession here makes a duplicate that should162 # raise an Error163 args = [self.accessions[0]]164 args.extend(["--combinedFilePrefix", "orungo"])165 expectedFiles = ["orungo.tbl"]166 null_files = []167 with self.assertRaises(AssertionError):168 self.perform_download_and_check(ncbi.parser_fetch_feature_tables,169 additional_args=args,170 expected_files=expectedFiles,171 null_files=null_files)172 @unittest.skipIf(skip_test, "test is marked to be skipped")173 def test_combined_preexistance(self):174 args = ["--combinedFilePrefix", "orungo"]175 expectedFiles = ["orungo.tbl"]176 null_files = []177 # call once to create the combined file178 self.perform_download_and_check(ncbi.parser_fetch_feature_tables,179 additional_args=args,180 expected_files=expectedFiles,181 null_files=null_files)182 # an error should be raised the second time the call is made183 with self.assertRaises(AssertionError):184 self.perform_download_and_check(ncbi.parser_fetch_feature_tables,185 additional_args=args,186 expected_files=expectedFiles,187 null_files=null_files)188 @unittest.skipIf(skip_test, "test is marked to be skipped")189 def test_overwrite(self):190 args = ["--combinedFilePrefix", "orungo", "--forceOverwrite"]191 expectedFiles = ["orungo.tbl"]192 null_files = []193 # call once to create the combined file194 self.perform_download_and_check(ncbi.parser_fetch_feature_tables,195 additional_args=args,196 expected_files=expectedFiles,197 null_files=null_files)198 # no error should be raised the second time the call is made199 self.perform_download_and_check(ncbi.parser_fetch_feature_tables,200 additional_args=args,201 expected_files=expectedFiles,202 null_files=null_files)203 @unittest.skipIf(skip_test, "test is marked to be skipped")204 def test_different_file_extension(self):205 args = ["--fileExt", "table", "--combinedFilePrefix", "orungo"]206 expectedFiles = [a + ".table" for a in self.accessions]207 expectedFiles.append("orungo.table")208 null_files = []209 self.perform_download_and_check(ncbi.parser_fetch_feature_tables,210 additional_args=args,211 expected_files=expectedFiles,212 null_files=null_files)213class TestGenbankRecordFetch(TestNcbiFetch):214 def setUp(self):215 super(TestGenbankRecordFetch, self).setUp()216 @unittest.skipIf(skip_test, "test is marked to be skipped")217 def test_download(self):218 args = []219 expectedFiles = [a + ".gbk" for a in self.accessions]220 null_files = []221 self.perform_download_and_check(ncbi.parser_fetch_genbank_records,222 additional_args=args,223 expected_files=expectedFiles,224 null_files=null_files)225 @unittest.skipIf(skip_test, "test is marked to be skipped")226 def test_concat(self):227 args = ["--combinedFilePrefix", "orungo"]228 expectedFiles = ["orungo.gbk"]229 null_files = []230 self.perform_download_and_check(ncbi.parser_fetch_genbank_records,231 additional_args=args,232 expected_files=expectedFiles,233 null_files=null_files)234 @unittest.skipIf(skip_test, "test is marked to be skipped")235 def test_removal_of_intermediates(self):236 args = ["--combinedFilePrefix", "orungo", "--removeSeparateFiles"]237 expectedFiles = ["orungo.gbk"]238 null_files = [a + ".gbk" for a in self.accessions]239 self.perform_download_and_check(ncbi.parser_fetch_genbank_records,240 additional_args=args,241 expected_files=expectedFiles,242 null_files=null_files)243 @unittest.skipIf(skip_test, "test is marked to be skipped")244 def test_individual_preexistance(self):245 # since the arguments are positional, including an accession here makes a duplicate that should246 # raise an Error247 args = [self.accessions[0]]248 args.extend(["--combinedFilePrefix", "orungo"])249 expectedFiles = ["orungo.gbk"]250 null_files = []251 with self.assertRaises(AssertionError):252 self.perform_download_and_check(ncbi.parser_fetch_genbank_records,253 additional_args=args,254 expected_files=expectedFiles,255 null_files=null_files)256 @unittest.skipIf(skip_test, "test is marked to be skipped")257 def test_combined_preexistance(self):258 args = ["--combinedFilePrefix", "orungo"]259 expectedFiles = ["orungo.gbk"]260 null_files = []261 # call once to create the combined file262 self.perform_download_and_check(ncbi.parser_fetch_genbank_records,263 additional_args=args,264 expected_files=expectedFiles,265 null_files=null_files)266 # an error should be raised the second time the call is made267 with self.assertRaises(AssertionError):268 self.perform_download_and_check(ncbi.parser_fetch_genbank_records,269 additional_args=args,270 expected_files=expectedFiles,271 null_files=null_files)272 @unittest.skipIf(skip_test, "test is marked to be skipped")273 def test_overwrite(self):274 args = ["--combinedFilePrefix", "orungo", "--forceOverwrite"]275 expectedFiles = ["orungo.gbk"]276 null_files = []277 # call once to create the combined file278 self.perform_download_and_check(ncbi.parser_fetch_genbank_records,279 additional_args=args,280 expected_files=expectedFiles,281 null_files=null_files)282 # no error should be raised the second time the call is made283 self.perform_download_and_check(ncbi.parser_fetch_genbank_records,284 additional_args=args,285 expected_files=expectedFiles,286 null_files=null_files)287 @unittest.skipIf(skip_test, "test is marked to be skipped")288 def test_different_file_extension(self):289 args = ["--fileExt", "gb", "--combinedFilePrefix", "orungo"]290 expectedFiles = [a + ".gb" for a in self.accessions]291 expectedFiles.append("orungo.gb")292 null_files = []293 self.perform_download_and_check(ncbi.parser_fetch_genbank_records,294 additional_args=args,295 expected_files=expectedFiles,...
validate_api.py
Source:validate_api.py
1#!/usr/bin/env python32"""3 @author TELEMAC-MASCARET Consortium4 @brief Function for validation of the Python API5"""6from os import path, chdir, remove, listdir, sep7from filecmp import cmp8import shutil9from argparse import ArgumentParser10from vvytel import copy_file_to_tmp11from vvytel import get_result_file_name12from vvytel import run_telemac_api13from vvytel import run_telemac_normal14from config import add_config_argument, update_config, CFGS15MODULE_HANDLED = ['telemac2d', 'telemac3d', 'artemis', 'tomawac']16def main(modules, example, nncsize, clean):17 """18 Main function19 """20 # Running main function21 root_dir = CFGS.get_root()22 if path.exists('ValidationTelApy.log'):23 remove('ValidationTelApy.log')24 fichier = open('ValidationTelApy.log', 'a')25 fichier.write("-----Listing Validation telapy-------\n")26 seq_only = {}27 skip_test = {}28 # Specifcation for each module29 for module in MODULE_HANDLED:30 seq_only[module] = []31 skip_test[module] = []32 # Sequential only test cases33 seq_only['telemac2d'].append('t2d_hydraulic_jump_v1p0.cas')34 seq_only['telemac2d'].append('t2d_hydraulic_jump_v2p0.cas')35 seq_only['telemac2d'].append('t2d_wesel.cas')36 seq_only['telemac2d'].append('t2d_wesel_pos.cas')37 seq_only['telemac2d'].append('t2d_delwaq.cas')38 seq_only['telemac2d'].append('t2d_ruptmoui.cas')39 seq_only['telemac2d'].append('t2d_triangular_shelf.cas')40 seq_only['telemac2d'].append('t2d_island.cas')41 seq_only['telemac2d'].append('t2d_tide-jmj_real_gen.cas')42 seq_only['telemac2d'].append('t2d_tide-jmj_type_gen.cas')43 seq_only['telemac2d'].append('t2d_dambreak_v1p0.cas')44 seq_only['telemac3d'].append('t3d_delwaq.cas')45 seq_only['telemac3d'].append('t3d_pluie.cas')46 seq_only['telemac3d'].append('t3d_tide-jmj_real_gen.cas')47 seq_only['artemis'].append('none')48 seq_only['tomawac'].append('tom_turning_wind.cas')49 seq_only['tomawac'].append('tom_manche.cas')50 seq_only['tomawac'].append('tom_manchelim.cas')51 # Test case that can not work with api52 # Using homere_adj not handle by api53 skip_test['telemac2d'].append('estimation')54 # Reruning telemac from homere not handled by api55 skip_test['telemac2d'].append('convergence')56 # Case that are not run by validation57 skip_test['telemac2d'].append('t2d_tide-jmj_type_med.cas')58 skip_test['telemac2d'].append('t2d_tide-ES_real.cas')59 # Non telemac3d case in folder60 skip_test['telemac3d'].append('t2d_canal.cas')61 skip_test['telemac3d'].append('p3d_amr.cas')62 skip_test['telemac3d'].append('p3d_bump.cas')63 skip_test['telemac3d'].append('p3d_canal.cas')64 skip_test['telemac3d'].append('p3d_cooper.cas')65 skip_test['telemac3d'].append('p3d_depot.cas')66 skip_test['telemac3d'].append('p3d_flume_slope.cas')67 skip_test['telemac3d'].append('p3d_gouttedo.cas')68 skip_test['telemac3d'].append('p3d_lock-hydro.cas')69 skip_test['telemac3d'].append('p3d_lock-nonhydro.cas')70 skip_test['telemac3d'].append('p3d_nonlinearwave.cas')71 skip_test['telemac3d'].append('p3d_piledepon.cas')72 skip_test['telemac3d'].append('p3d_piledepon-nonhydro.cas')73 skip_test['telemac3d'].append('p3d_pluie.cas')74 skip_test['telemac3d'].append('p3d_rouse.cas')75 skip_test['telemac3d'].append('p3d_stratification.cas')76 skip_test['telemac3d'].append('p3d_tetra.cas')77 skip_test['telemac3d'].append('p3d_vent.cas')78 skip_test['telemac3d'].append('p3d_V.cas')79 # Coupling test case80 skip_test['telemac3d'].append('depot')81 skip_test['telemac3d'].append('heat_exchange')82 # Artemis animated test case83 skip_test['artemis'].append('art_bj78_animated.cas')84 skip_test['artemis'].append('art_creocean_animated.cas')85 skip_test['artemis'].append('art_creocean_2.cas')86 skip_test['artemis'].append('art_creocean.cas')87 # Tomawac coupled test cases88 skip_test['tomawac'].append('3Dcoupling')89 for module in modules:90 fichier.write("-- For module " + module + "\n")91 module_dir = path.join(root_dir, 'examples', module)92 list_test_case = []93 if example != '':94 list_test_case.append(example)95 else:96 list_test_case = sorted(listdir(module_dir))97 # Sequential only test_case98 for i, test_case in enumerate(list_test_case):99 if test_case in skip_test[module]:100 continue101 case_dir = path.join(module_dir, test_case)102 tmp_dir = path.join(case_dir, 'tmp')103 print("<"+str(i+1)+"/"+str(len(list_test_case))+'> '+str(test_case))104 fichier.write('Running test case '+test_case+'\n')105 list_file = copy_file_to_tmp.copy_file_to_tmp(\106 case_dir, tmp_dir, \107 module, root_dir, skip_test[module])108 chdir(tmp_dir)109 for cas, fortran in list_file:110 #111 # Running Telemac based on telapy112 #113 if cas in skip_test[module]:114 continue115 # Get results names116 res_file = get_result_file_name.get_result_file_name(module,117 cas)118 api_res_file = res_file+'_api'119 # Running in sequential mode120 # if the case does not run in parallel121 if cas in seq_only[module]:122 ncsize = 1123 else:124 ncsize = nncsize125 passed_api = run_telemac_api.run_telemac_api(module, cas,126 ncsize, fortran)127 if passed_api:128 shutil.move(res_file, api_res_file)129 # Running Telemac classical way130 #131 passed_normal = run_telemac_normal.run_telemac_normal(module,132 cas,133 ncsize)134 #135 # Result comparison between api and136 # classical Telemac computation137 #138 if not passed_normal:139 fichier.write(' Normal run crashed\n')140 if not passed_api:141 fichier.write(' Api run crashed\n')142 if not passed_api or not passed_normal:143 fichier.write(str(cas)+' FAILED'+'\n')144 continue145 if not path.exists(res_file):146 fichier.write(' Missing '+res_file+"\n")147 fichier.write(str(cas)+' FAILED'+'\n')148 continue149 if not path.exists(api_res_file):150 fichier.write(' Missing '+api_res_file+"\n")151 fichier.write(str(cas)+' FAILED'+'\n')152 continue153 compare = cmp(res_file, api_res_file)154 if compare:155 fichier.write(str(cas)+' PASSED'+'\n')156 else:157 fichier.write(str(cas)+' FAILED'+'\n')158 if clean:159 chdir(module_dir+sep+test_case)160 shutil.rmtree(module_dir+sep+test_case+sep+'tmp')161 fichier.write('my work is done '+'\n')162if __name__ == "__main__":163# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<164# ~~ Reads config file ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~165 print('\n\nLoading Options and Configurations\n'+72*'~'+'\n')166 PARSER = ArgumentParser(\167 description='Make the validation of Telemac-Mascaret API '\168 'and/or executable using the API')169 PARSER = add_config_argument(PARSER)170 PARSER.add_argument(\171 "-m", "--module",172 dest='modules',173 default="telemac2d",174 help="specify the list of folder to validate seprated by ,")175 PARSER.add_argument(\176 "--clean",177 action="store_true",178 dest="clean",179 default=False,180 help="Remove tmp folders")181 PARSER.add_argument(\182 "-n", "--cnsize",183 dest='ncsize',184 default=4,185 help="specify the number of processor the test case will be run with")186 PARSER.add_argument(\187 "-e", "--example",188 dest='example',189 default="",190 help="specify the name of the test case to compute")191 ARGS = PARSER.parse_args()192 update_config(ARGS)193 main(ARGS.modules.split(','), \...
ensembler.py
Source:ensembler.py
1import sys, gzip, json, os, math, pickle, re, copy2import numpy as np3import multiprocessing as mp4pool_size = int(mp.cpu_count())5from datetime import datetime6from math import exp, log, sqrt7root_path = '/home/marsan/workspace/tnative'8sys.path.append(root_path)9from lib import dgen as dgen10from lib import top as top11#==========================================12# machein learning flow wrapper13#========================================== 14# [container of model unit]15class ml_unit(object):16 def __init__(self, alg, D, data, tmin, tmax, en_plot, en_fast_data, skip_test, debug):17 self.ml = top.ml(alg=alg, D=D, en_plot=en_plot, en_fast_data=en_fast_data, debug=debug)18 self.data = data19 self.tmin = tmin20 self.tmax = tmax21 self.vrng = abs(tmax - tmin)22 self.vmin = 1 - self.vrng if self.vrng < 0.5 else self.vrng23 self.vmax = 124 self.y2p = []25 self.pnorm = -126 self.skip_test = skip_test27 # self.q = mp.Queue()28 # update data filter29 for fea, val in self.data.items():30 self.ml.dgen.tbl = eval("self.ml.dgen.tbl.filter(%s=val)" % (fea))31 32 # for multiprocess33 def train_unit(self):34 self.ml.train(self.tmin, self.tmax, self.vmin, self.vmax, skip_test=self.skip_test)35 # q.put([self.ml.learner])36 return self.ml.learner37class ensembler(object):38 def __init__(self, segments, vrate, D=2**24, en_fast_data=None, en_plot=False, en_pnorm=False, skip_test=False, debug=False):39 # samples40 self.D = D41 self.vmin = 1 - vrate42 self.vmax = 143 44 # ctrl45 self.en_plot = en_plot46 self.en_pnorm = en_pnorm47 self.skip_test = skip_test48 self.en_fast_data = en_fast_data49 self.debug = debug50 51 # initialize models 52 self.dgen = dgen.data_gen(D=self.D, en_fast_data=self.en_fast_data, debug=self.debug) # samples for final test53 self.ml_group = []54 for s in segments:55 item = ml_unit(alg=s['alg'], D=self.D, data=s['data'], tmin=s['tmin'], tmax=s['tmax'], en_plot=self.en_plot, en_fast_data=self.en_fast_data, skip_test=self.skip_test, debug=self.debug)56 self.ml_group.append(item)57 58 #-------------------------59 # convension60 #-------------------------61 def finitize(self, n, e=35):62 return max(min(n, e), -e) # make -e <= n <= e63 def merge_sigmoid(self, nlist, e=35):64 nlist = [-log(max((1/max(n, 1e-35) - 1), 1e-35)) for n in nlist]65 nmean = np.mean(nlist)66 nsig = 1. / (1. + exp(-self.finitize(nmean, e)))67 return nsig68 #-------------------------69 # train & test70 #-------------------------71 def train_and_test(self, en_multi_threads=False):72 self.train_all(en_multi_threads)73 roc_auc, yr_ens, yp_ens = self.test()74 self.save(roc_auc)75 return roc_auc, yr_ens, yp_ens76 def train_all(self, en_multi_threads=True):77 processes = []78 mp_pool = mp.Pool(pool_size)79 for l in self.ml_group:80 if not en_multi_threads:81 l.ml.train(l.tmin, l.tmax, l.vmin, l.vmax, skip_test=self.skip_test) # [single process for debug]82 else:83 p = mp_pool.apply_async(l.train_unit, ())84 processes.append((l, p))85 # p = mp.Process(target=l.train_unit, args=(l.q,))86 # processes.append(p)87 # p.start()88 if en_multi_threads:89 for l, p in processes:90 l.ml.learner = p.get()91 # for l in self.ml_group:92 # l.ml.learner = l.q.get()[0]93 # for p in processes: p.join()94 print("[Ensembler] models training done @ %s" % datetime.now())95 return self.ml_group96 def test(self):97 print("\n%s\n# [Ensembler] start grader %.2f - %.2f @ %s\n%s" % ("-"*60, 100*self.vmin, 100*self.vmax, datetime.now(), "-"*60))98 yr_ens = {}99 yp_ens = {}100 for s in self.ml_group:101 sdgen = copy.copy(self.dgen)102 for fea, val in s.data.items():103 sdgen.tbl = eval("sdgen.tbl.filter(%s=val)" % (fea))104 ids = [str(r.id) for r in sdgen.raw_range(self.vmin, self.vmax).only('id')]105 raw = sdgen.gen_data(self.vmin, self.vmax)106 # get y2p107 s.y2p = s.ml.learner.train(raw, training=False, info={'all_cnt': -1})108 if self.en_pnorm:109 s.pnorm = s.ml.grader.find_pnorm(s.y2p)110 s.y2p = [[y, min(1, p/s.pnorm)] for y, p in s.y2p]111 # map to ensembled y2p112 for i in range(len(ids)):113 key = ids[i]114 if key not in yp_ens: 115 yr_ens[key] = []116 yp_ens[key] = []117 yr_ens[key].append(s.y2p[i][0])118 yp_ens[key].append(s.y2p[i][1])119 y2p_ens = [(np.mean(yrs), self.merge_sigmoid(yp_ens[rid])) for rid, yrs in yr_ens.items()]120 grader = self.ml_group[0].ml.grader121 roc_auc = grader.auc_curve(y2p_ens)122 scan = grader.scan_all_threshold(y2p_ens)123 print("[Ensembler] ensembled ROC: %.3f%% @ %s" % (roc_auc*100, datetime.now()))124 return roc_auc, yr_ens, yp_ens125 #-------------------------126 # layer-2127 #-------------------------128 def train_layer2(self):129 # collect samples130 Xt = []131 Yt = []132 sdgen = copy.copy(self.dgen)133 pass134 def test_layer2(self):135 pass136 #-------------------------137 # model reuse138 #-------------------------139 def save(self, auc):140 filepath = "%s/models/m%i_v%i_auc_%i_%s" % (root_path, len(self.ml_group), (self.vmax-self.vmin)*100, auc*1000, datetime.now().strftime("%Y%m%d_%H%M"))141 trained_models = [mlu.ml.learner for mlu in self.ml_group]142 pickle.dump(trained_models, open(filepath, 'wb'))143 print("ensemble model saved in %s @ %s" % (filepath, datetime.now()))144 return filepath145#==========================================146# experiments147#==========================================148def k_fold_ensemble(alg, k, vrate=0.1, en_plot=False, en_fast_data=None, skip_compare=True):149 if not skip_compare:150 print("="*5, '[train by single thread]', '='*40)151 top.ml(alg=alg, en_plot=en_plot).train(0, 1-vrate)152 print("[%i_fold_%s_ensemble] start @ %s" % (k, alg, datetime.now()))153 segments = []154 step = (1.0 - vrate)/k155 for i in range(k):156 segments.append({157 'alg': alg,158 'tmin': step*(i+1),159 'tmax': step*i,160 'data': {161 'isad__ne': None,162 # 'label__ne': None,163 }164 })165 ens = ensembler(segments, vrate=vrate, en_plot=en_plot, en_fast_data=en_fast_data, skip_test=True) # k-fold MUST skip_test=True!! since we block ens.vmin166 for item in ens.ml_group: # prevent models from getting test samples167 item.ml.dgen.tbl = item.ml.dgen.tbl.filter(rand__lt=ens.vmin) 168 return ens.train_and_test()169def xgboost_sklr(vrate=0.1, en_plot=False):170 segments = [171 {172 'alg': alg, 173 'tmin': 0, 174 'tmax': 1-vrate,175 'data': {176 'isad__ne': None,177 # 'label__ne': None,178 },179 } for alg in ['sklr', 'xgboost']180 ]181 return ensembler(segments, vrate=vrate, en_plot=en_plot).train_and_test()182#==========================================183# dnq ensemble (divide-and-conquer)184#==========================================185dnq_segments = {186 'status' : [('status', 'bad'), ('status', 'normal')],187 'lang' : [('meta_lang__icontains', 'en'), ('meta_lang', '')],188 'domain' : [('domain__icontains', '.net'), ('domain__icontains', '.com'), ('domain__icontains', '.org'), ('domain__icontains', '.uk')],189}190def dnq_ensemble(alg, segname, vrate=0.1, en_plot=False, skip_compare=True):191 srate = 1-vrate if vrate > 0.5 else vrate192 if not skip_compare:193 print("="*5, '[train by single thread]', '='*40)194 auc_single = top.ml(alg='sklr', en_plot=en_plot).train(0, srate)195 print("="*5, '[train by ensemble]', '='*40)196 if segname == 'all':197 segs = ([('fid__ne', -1)] + [j for k in dnq_segments.values() for j in k])198 else:199 segs = ([('fid__ne', -1)] + dnq_segments[segname])200 segments = [{201 'alg': alg, 202 'tmin': 0, 203 'tmax': srate,204 'data': {205 'isad__ne': None,206 s: v,207 },208 } for s,v in segs]209 print("[dnq_ensemble] condition: ", segments)210 ens = ensembler(segments, vrate=vrate, en_plot=en_plot)211 return ens.train_and_test()212#==========================================213# verify214#==========================================215if __name__ == '__main__':216 cmd = str(sys.argv[1])217 vrate = float(sys.argv[2])218 if (len(sys.argv) >= 4): cmd2 = str(sys.argv[3])219 #220 if cmd == '5_fold_xgboost':221 k_fold_ensemble('xgboost', k=5, vrate=vrate, en_plot=False)222 elif cmd == '5_fold_sklr':223 k_fold_ensemble('sklr', k=5, vrate=vrate, en_plot=False, en_fast_data='D_20_tfidf_cnts')224 elif cmd == 'xgboost_sklr':225 xgboost_sklr(vrate=vrate, en_plot=False)226 elif cmd == 'dnq_ensemble':227 dnq_ensemble('sklr', cmd2, vrate=vrate, en_plot=False) 228 elif cmd == 'dnq_all':...
test_carrito_compras.py
Source:test_carrito_compras.py
1import pytest2skip_test = pytest.mark.skipif(False, reason="skip")3#34@skip_test5def test_create_carrito_quantity_is_cero():6 from module.producto import Producto7 from module.detalleproducto import DetalleProducto8 from module.carrito import Carrito9 name = "producto 1"10 stock= 511 description= "description"12 producto_valido = Producto.create_product(13 name=name, 14 stock=stock, 15 description=description)16 17 assert producto_valido["name"] == name18 assert producto_valido["stock"] == stock19 assert producto_valido["description"] == description20 cantidad = 021 with pytest.raises(ValueError, match="cantidad can not be 0"):22 detalle_producto = DetalleProducto.crear_detalle(23 cantidad=cantidad,24 producto= producto_valido25 )26#527@skip_test28def test_create_carrito():29 from module.producto import Producto30 from module.detalleproducto import DetalleProducto31 from module.carrito import Carrito32 name = "producto 1"33 stock=1034 description= "description"35 producto_valido = Producto.create_product(36 name=name, 37 stock=stock, 38 description=description)39 40 assert producto_valido["name"] == name41 assert producto_valido["stock"] == stock42 assert producto_valido["description"] == description43 producto_valido_2 = Producto.create_product(44 name=name, 45 stock=stock, 46 description=description)47 48 assert producto_valido_2["name"] == name49 assert producto_valido_2["stock"] == stock50 assert producto_valido_2["description"] == description51 cantidad = 152 detalle_producto = DetalleProducto.crear_detalle(53 cantidad=cantidad,54 producto= producto_valido55 )56 assert detalle_producto["cantidad"] == cantidad57 detalle_producto_2 = DetalleProducto.crear_detalle(58 cantidad=cantidad,59 producto= producto_valido60 )61 assert detalle_producto_2["cantidad"] == cantidad62 lista_detalles =[detalle_producto, detalle_producto_2]63 carrito = Carrito.agregar_producto(lista_productos=lista_detalles)64 assert len(carrito) > 065#666@skip_test67def test_nombre_producto_distinto_null():68 from module.producto import Producto69 with pytest.raises(ValueError, match="none is not an allowed value"):70 crear_producto = Producto.create_product(71 name=None, 72 stock=1, 73 description="descripcion")74#775@skip_test76def test_nombre_producto_numerico():77 from module.producto import Producto78 with pytest.raises(ValueError, match="name can not be an integer"):79 crear_producto = Producto.create_product(80 name=11111, 81 stock=1, 82 description="descripcion")83#984@skip_test85def test_descripcion_producto_distinto_null():86 from module.producto import Producto87 with pytest.raises(ValueError, match="none is not an allowed value"):88 crear_producto = Producto.create_product(89 name="Juan", 90 stock=1, 91 description=None)92#1293@skip_test94def test_stock_producto_no_negativo():95 from module.producto import Producto96 with pytest.raises(ValueError, match="ensure this value is greater than -1"):97 crear_producto = Producto.create_product(98 name="juan", 99 stock=-1, 100 description="descripcion")101#8102@skip_test103def test_nombre_producto_name_too_long():104 from module.producto import Producto105 from random import choice106 from string import ascii_uppercase107 name = ''.join(choice(ascii_uppercase) for i in range(11))108 with pytest.raises(ValueError, match="ensure this value has at most 10 characters"):109 crear_producto = Producto.create_product(110 name=name, 111 stock=1, 112 description="descripcion")113#11114@skip_test115def test_create_product():116 from module.producto import Producto117 name = "juan"118 stock=10119 description= "description"120 crear_producto = Producto.create_product(121 name=name, 122 stock=stock, 123 description=description)124 125 assert crear_producto["name"] == name126 assert crear_producto["stock"] == stock127 assert crear_producto["description"] == description128#13129@skip_test130def test_stock_producto_puede_ser_cero():131 from module.producto import Producto132 crear_producto = Producto.create_product(133 name="juan", 134 stock=0, 135 description="descripcion")136 137 assert crear_producto["stock"] == 0138#15139@skip_test140def test_stock_producto_distinto_null():141 from module.producto import Producto142 with pytest.raises(ValueError, match="none is not an allowed value"):143 crear_producto = Producto.create_product(144 name="juan", 145 stock=None, 146 description="descripcion")147#17148@skip_test149def test_stock_valido_name_nulo_descripcion_nula():150 from module.producto import Producto151 with pytest.raises(ValueError, match="none is not an allowed value"):152 crear_producto = Producto.create_product(153 name=None, 154 stock=0, 155 description=None)156#18157@skip_test158def test_descripcion_valido_name_nulo_stock_nulo():159 from module.producto import Producto160 with pytest.raises(ValueError, match="none is not an allowed value"):161 crear_producto = Producto.create_product(162 name=None, 163 stock=None, 164 description="descripcion")165#19166@skip_test167def test_create_detalle_producto():168 from module.producto import Producto169 from module.detalleproducto import DetalleProducto170 name = "producto 1"171 stock=10172 description= "description"173 producto_valido = Producto.create_product(174 name=name, 175 stock=stock, 176 description=description)177 178 assert producto_valido["name"] == name179 assert producto_valido["stock"] == stock180 assert producto_valido["description"] == description181 cantidad = 1182 detalle_producto = DetalleProducto.crear_detalle(183 cantidad=cantidad,184 producto= producto_valido185 )186 assert detalle_producto["cantidad"] == cantidad187 assert detalle_producto["producto"]["stock"] == stock-cantidad188#20189@skip_test190def test_create_detalle_producto_cantidad_negativa():191 from module.producto import Producto192 from module.detalleproducto import DetalleProducto193 name = "producto 1"194 stock=10195 description= "description"196 producto_valido = Producto.create_product(197 name=name, 198 stock=stock, 199 description=description)200 201 assert producto_valido["name"] == name202 assert producto_valido["stock"] == stock203 assert producto_valido["description"] == description204 cantidad = 0205 with pytest.raises(ValueError, match="cantidad can not be 0"):206 detalle_producto = DetalleProducto.crear_detalle(207 cantidad=cantidad,208 producto=producto_valido...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!