Best Python code snippet using fMBT_python
blast_using_grid.py
Source:blast_using_grid.py
1#!/usr/bin/python2__author__ = "Kishori M Konwar"3__copyright__ = "Copyright 2013, MetaPathways"4__credits__ = ["r"]5__version__ = "1.0"6__maintainer__ = "Kishori M Konwar"7__status__ = "Release"8try:9 from optparse import make_option10 from os import makedirs, path, listdir, remove11 import os, sys, re, errno, shutil12 from glob import glob13 from datetime import date14 import traceback15 from libs.python_modules.utils.sysutil import getstatusoutput, pathDelim16 from libs.python_modules.parsers.parse import parse_metapaths_parameters17 from libs.python_modules.utils.metapathways_utils import fprintf, printf, WorkflowLogger, generate_log_fp18 from libs.python_modules.utils import *19 from libs.python_modules.grid.BlastService import BlastService20 from libs.python_modules.grid.GridParam import GridParam21 from libs.python_modules.grid.BlastBroker import BlastBroker22 import libs.python_scripts23except:24 print traceback.print_exc(10)25 print """ Could not load some user defined module functions"""26 print """ Make sure your typed \"source MetaPathwaysrc\""""27 print """ """28 sys.exit(3)29PATHDELIM = pathDelim()30def copyFile(src, dst):31 try:32 shutil.copytree(src, dst)33 except OSError as exc: # python >2.534 if exc.errno == errno.ENOTDIR:35 shutil.copy(src, dst)36 else: raise37def dry_run_status( commands ):38 for command in commands:39 #printf("%s", command[0])40 if command[4] == True:41 printf("%s", " Required")42 else:43 printf("%s", " Not Required")44 printf("\n")45def format_db(formatdb_executable, seqType, refdb_sequence_file, algorithm):46 if algorithm=='BLAST':47 cmd='%s -dbtype %s -in %s' %(formatdb_executable, seqType, refdb_sequence_file)48 if algorithm=='LAST':49 dirname = os.path.dirname(refdb_sequence_file) 50 cmd='%s -p -c %s %s' %(formatdb_executable, refdb_sequence_file, refdb_sequence_file)51 52 result= getstatusoutput(cmd)53 if result[0]==0:54 return True55 else:56 return False57# decide if a command should be run if it is overlay,58# when results are alread computed decide not to run59def shouldRunStep(run_type, expected_output):60 if run_type =='overlay' and path.exists(expected_output):61 return False62 else:63 return True 64 #print enable_flag65def formatted_db_exists(dbname):66 fileList = glob(dbname) 67 if len(fileList) > 0:68 return True69 else: 70 return False71def check_if_db_exists(dbname):72 if path.exists(dbname):73 return True74 else: 75 return False76def make_sure_map_file_exists(dbmapfile):77 if not doFilesExist( [dbmapfile ] ):78 print 'WARNING: ' + 'Creating the database map file'79 fullRefDbName = re.sub(r'-names.txt','',dbmapfile)80 mapfile = open(dbmapfile,'w')81 fullRefDbFile = open(fullRefDbName,'r')82 for line in fullRefDbFile:83 if re.match(r'>', line):84 fprintf(mapfile, "%s\n",line.strip())85 mapfile.close()86 fullRefDbFile.close()87#gets the parameter value from a category as specified in the 88# parameter file89def get_parameter(config_params, category, field, default = None):90 if config_params == None:91 return default92 if category in config_params:93 if field in config_params[category]:94 return config_params[category][field]95 else:96 return default97 return default98# parameter file99def get_make_parameter(config_params,category, field, default = False):100 if category in config_params:101 if field in config_params[category]:102 return config_params[category][field]103 else:104 return default105 return default106def get_pipeline_steps(steps_log_file):107 #print steps_log_file108 try:109 logfile = open(steps_log_file, 'r')110 except IOError:111 print "Did not find " + logfile + "!" 112 print "Try running in \'complete\' run-type"113 else:114 lines = logfile.readlines()115# for line in lines:116# print line117 pipeline_steps = None118 return pipeline_steps119# This function reads the pipeline configuration file and sets the 120# paths to differenc scripts and executables the pipeline call121def read_pipeline_configuration( file ):122 try:123 configfile = open(file, 'r')124 except IOError:125 print "Did not find pipeline config " + file + "!" 126 else:127 lines = configfile.readlines()128 config_settings = {}129 for line in lines:130 if not re.match("#",line) and len(line.strip()) > 0 :131 line = line.strip()132 line = re.sub('\t+', ' ', line)133 line = re.sub('\s+', ' ', line)134 line = re.sub('\'', '', line)135 line = re.sub('\"', '', line)136 fields = re.split('\s', line)137 if not len(fields) == 2:138 print """ The following line in your config settings files is set set up yet"""139 print """ Please rerun the pipeline after setting up this line"""140 print """ Error ine line :""" + line141 sys.exit(-1);142 143# print fields[0] + "\t" + fields[1]144 if PATHDELIM=='\\':145 config_settings[fields[0]] = re.sub(r'/',r'\\',fields[1]) 146 else:147 config_settings[fields[0]] = re.sub(r'\\','/',fields[1]) 148 149 config_settings['METAPATHWAYS_PATH'] = config_settings['METAPATHWAYS_PATH'] + PATHDELIM150 config_settings['REFDBS'] = config_settings['REFDBS'] + PATHDELIM151 152 #check_config_settings(config_settings, file);153 return config_settings154# has the results to use155def hasResults(expected_output):156 if path.exists(expected_output):157 return True158 else:159 return False160# has the results to use161def hasResults1(dir , expected_outputs):162 if doFilesExist(expected_outputs, dir = dir):163 return True164 else:165 return False166# if the directory is empty then there is not precomputed results167# and so you should decide to run the command168def shouldRunStepOnDirectory(run_type, dirName):169 dirName = dirName + PATHDELIM + '*'170 files = glob(dirName)171 if len(files)==0:172 return True173 else:174 return False175# if the command is "redo" then delete all the files176# in the folder and then delete the folder too177def removeDirOnRedo(command_Status, origFolderName):178 if command_Status=='redo' and path.exists(origFolderName) :179 folderName = origFolderName + PATHDELIM + '*'180 files = glob(folderName)181 for f in files:182 remove(f)183 if path.exists(origFolderName): 184 shutil.rmtree(origFolderName)185# if the command is "redo" then delete the file186def removeFileOnRedo(command_Status, fileName):187 if command_Status=='redo' and path.exists(fileName) :188 remove(fileName)189 return True190 else:191 return False192# remove all the files in the directory on Redo193def cleanDirOnRedo(command_Status, folderName):194 if command_Status=='redo':195 cleanDirectory(folderName)196# remove all the files in the directory197def cleanDirectory( folderName):198 folderName = folderName + PATHDELIM + '*'199 files = glob(folderName)200 for f in files:201 remove(f)202# if folder does not exist then create one203def checkOrCreateFolder( folderName ):204 if not path.exists(folderName) :205 makedirs(folderName)206 return False207 else:208 return True209#does the file Exist?210def doFilesExist( fileNames, dir="" ):211 for fileName in fileNames:212 file = fileName213 if dir!='':214 file = dir + PATHDELIM + fileName215 if not path.exists(file):216 return False217 return True218# check if all of the metapaths_steps have 219# settings from the valid list [ yes, skip stop, redo]220def checkParam_values(allcategorychoices, parameters):221 for category in allcategorychoices:222 for choice in allcategorychoices[category]:223 if choice in parameters: 224# print choice + " " + parameters[choice] 225# print allcategorychoices[category][choice] 226 if not parameters[choice] in allcategorychoices[category][choice]:227 print "ERROR: Incorrect setting in your parameter file"228 print " for step " + choice + " as " + parameters[choices]229 sys.exit(0)230def checkMetapaths_Steps(config_params):231 choices = { 'metapaths_steps':{}, 'annotation':{}, 'INPUT':{} }232 choices['INPUT']['format'] = ['fasta', 'gbk_unannotated', 'gbk_annotated', 'gff_unannotated', 'gff_annotated']233 choices['annotation']['algorithm'] = ['last', 'blast'] 234 choices['metapaths_steps']['PREPROCESS_FASTA'] = ['yes', 'skip', 'stop', 'redo']235 choices['metapaths_steps']['ORF_PREDICTION'] = ['yes', 'skip', 'stop', 'redo']236 choices['metapaths_steps']['GFF_TO_AMINO'] = ['yes', 'skip', 'stop', 'redo']237 choices['metapaths_steps']['FILTERED_FASTA'] = ['yes', 'skip', 'stop', 'redo']238 choices['metapaths_steps']['COMPUTE_REFSCORE'] = ['yes', 'skip', 'stop', 'redo']239 choices['metapaths_steps']['BLAST_REFDB'] = ['yes', 'skip', 'stop', 'redo', 'grid']240 choices['metapaths_steps']['PARSE_BLAST'] = ['yes', 'skip', 'stop', 'redo']241 choices['metapaths_steps']['SCAN_rRNA'] = ['yes', 'skip', 'stop', 'redo']242 choices['metapaths_steps']['STATS_rRNA'] = ['yes', 'skip', 'stop', 'redo']243 choices['metapaths_steps']['ANNOTATE'] = ['yes', 'skip', 'stop', 'redo']244 choices['metapaths_steps']['PATHOLOGIC_INPUT'] = ['yes', 'skip', 'stop', 'redo']245 choices['metapaths_steps']['GENBANK_FILE'] = ['yes', 'skip', 'stop', 'redo']246 choices['metapaths_steps']['CREATE_SEQUIN_FILE'] = ['yes', 'skip', 'stop', 'redo']247 choices['metapaths_steps']['CREATE_REPORT_FILES'] = ['yes', 'skip', 'stop', 'redo']248 choices['metapaths_steps']['SCAN_tRNA'] = ['yes', 'skip', 'stop', 'redo']249 choices['metapaths_steps']['MLTREEMAP_CALCULATION'] = ['yes', 'skip', 'stop', 'redo']250 choices['metapaths_steps']['MLTREEMAP_IMAGEMAKER'] = ['yes', 'skip', 'stop', 'redo']251 choices['metapaths_steps']['PATHOLOGIC'] = ['yes', 'skip', 'stop', 'redo']252 if config_params['metapaths_steps']:253 checkParam_values(choices, config_params['metapaths_steps'])254def isValidInput(output_dir, samples_and_input, dbs, gridSettings, config_settings, messagelogger): 255 if not doesFolderExist(output_dir): 256 messagelogger.write("ERROR: Output folder \"%s\" does not exist!\n" %(output_dir)) 257 return False258 else:259 messagelogger.write("OK: Output folder \"%s\" exists!\n" %(output_dir)) 260 for sample, inputfile in samples_and_input.iteritems():261 if not doesFolderExist(output_dir + PATHDELIM + sample): 262 messagelogger.write("ERROR: Sample folder \"%s\" for sample \"%s\" not found!\n" %(output_dir + PATHDELIM + sample, sample)) 263 return False264 else:265 messagelogger.write("OK: Sample folder \"%s\" for sample \"%s\" exists!\n" %(output_dir + PATHDELIM + sample, sample)) 266 if not doesFileExist(inputfile): 267 messagelogger.write("ERROR: Input file \"%s\" does not exist!\n" %(inputfile)) 268 return False269 else:270 messagelogger.write("OK: Input file \"%s\" exists!\n" %(inputfile)) 271 for db in dbs:272 if not doesFileExist(config_settings['REFDBS'] + PATHDELIM + 'functional' + PATHDELIM + db): 273 messagelogger.write("ERROR: Database file \"%s\" does not exist!\n" %(config_settings['REFDBS'] + PATHDELIM + db)) 274 return False275 else:276 messagelogger.write("OK: Database file \"%s\" found!\n" %(config_settings['REFDBS'] + PATHDELIM + db)) 277 for gridsetting in gridSettings:278 if 'server' in gridsetting:279 if not 'user' in gridsetting:280 messagelogger.write("ERROR: User in grid servers \"%s\" not specified!\n" %(gridsetting['server'])) 281 return False282 else:283 messagelogger.write("OK: Specification for Grid\"%s\" with user \"%s\" found!\n" %(gridsetting['server'], gridsetting['user'])) 284 return True285#################################################################################286########################### BLAST ##############################################287#################################################################################288def blast_in_grid(input_files, output_dir, config_params, metapaths_config, config_file, run_type):289 algorithm = get_parameter(config_params, 'annotation', 'algorithm', default='BLAST').upper()290 messagelogger = WorkflowLogger(generate_log_fp(output_dir, basefile_name='metapathways_messages', suffix='txt'),\291 open_mode='w')292 command_Status= get_parameter(config_params,'metapaths_steps','BLAST_REFDB')293 config_settings = read_pipeline_configuration( config_file )294# preprocessed_dir = output_dir + PATHDELIM + "preprocessed" + PATHDELIM295 orf_prediction_dir = "orf_prediction" 296# genbank_dir = output_dir + PATHDELIM + "genbank" + PATHDELIM297 output_run_statistics_dir = output_dir + PATHDELIM + "run_statistics" +PATHDELIM298 blast_results_dir = output_dir + PATHDELIM + "blast_results" + PATHDELIM299 output_results = output_dir + PATHDELIM + "results" + PATHDELIM 300 #---301 # create the sample and input pairs 302 samples_and_input = {}303 for input_file in input_files:304 sample_name = re.sub(r'[.][a-zA-Z]*$','',input_file)305 sample_name = path.basename(sample_name)306 sample_name = re.sub('[.]','_',sample_name)307 samples_and_input[sample_name] = output_dir + PATHDELIM + sample_name + PATHDELIM + orf_prediction_dir + PATHDELIM + sample_name + ".qced.faa" 308 309 310 # BLAST THE ORFs AGAINST THE REFERENCE DATABASES FOR FUNCTIONAL ANNOTATION311 dbstring = get_parameter(config_params, 'annotation', 'dbs', default=None)312 dbs= dbstring.split(",")313 #parse the grid settings from the param file314 gridEnginePATTERN = re.compile(r'(grid_engine\d+)')315 trueOrYesPATTERN = re.compile(r'^[yYTt]')316 gridSettings = []317 for key in config_params:318 match = gridEnginePATTERN.match(key)319 if match ==None:320 continue321 if 'active' in config_params[key]:322 trueOrYes = trueOrYesPATTERN.match(config_params[key]['active'])323 if trueOrYes: # this grid is inactive324 # proceed with adding the grid325 match = gridEnginePATTERN.match(key)326 if match:327 gridSettings.append(config_params[key])328 329 if not isValidInput(output_dir, samples_and_input, dbs, gridSettings, config_settings = config_settings,\330 messagelogger = messagelogger): 331 sys.exit(0)332 333 blastbroker = BlastBroker(messagelogger) # setup the broker with a message logger334 blastbroker.setBaseOutputFolder(output_dir) #set up the output folder 335 blastbroker.addSamples(samples_and_input) # add the samples and the input files336 337 # add databases against the samples338 for sample in samples_and_input:339 for db in dbs:340 blastbroker.addDatabase(sample, db)341 blastbroker.addAlgorithm(sample, algorithm) # add the algorithms342 343 # setup services and add them to the Broker 344 for gridsetting in gridSettings:345 gridsetting['messagelogger']=messagelogger346 gridsetting['MetaPathwaysDir']=config_settings['METAPATHWAYS_PATH']347 gridsetting['base_output_folder']=blastbroker.base_output_folder348 gridsetting['blast_db_folder']=config_settings['REFDBS'] + PATHDELIM + 'functional'349 try:350 blastservice = BlastService(gridsetting)351 except:352 print traceback.format_exc(10)353 blastbroker.addService(blastservice)354 # create the work space folders355 if blastbroker.are_working_folders_available():356 messagelogger.write("STATUS: Local working folders for Grid found!\n")357 elif blastbroker.create_working_folders():358 messagelogger.write("OK: Successfully created the grid related local working folders!\n")359 else:360 messagelogger.write("ERROR: Cannot create the grid working folders!\n")361 messagelogger.write("ERROR: Exiting blast in grid mode!\n")362 return363 364 # check if the input files are already split365 messagelogger.write("STATUS: Checking if input files are already split!\n")366# for s in blastbroker.getSamples():367# if not blastbroker.doesValidSplitExist(s):368# messagelogger.write("STATUS: Did not find any previously split files for sample \"%s\"!\n" %(s))369# if not blastbroker.splitInput(s): #if not then split370# messagelogger.write("ERROR: Cannot split the files for some or all of the samples!\n")371# sys.exit(0)372# else:373# messagelogger.write("SUCCESS: Successfully split the files for some or all of the samples!\n")374# else:375# messagelogger.write("OK: Found previously split files for sample \"%s\"!\n" %(s))376# 377 messagelogger.write("STATUS: Competed checks for file splits!\n")378 batch_size = int(get_parameter(config_params, 'grid_submission', 'batch_size', default=1000))379 blastbroker.setBatchSize(batch_size)380 381 382 # check if the input files are already split383 for s in blastbroker.getSamples():384 if not blastbroker.doesValidSplitExist(s):385 messagelogger.write("STATUS: Did not find any previously split files for sample \"%s\"!\n" %(s))386 if not blastbroker.splitInput(s): #if not then split387 print ("ERROR: Cannot split the files for some or all of the samples!\n")388 messagelogger.write("ERROR: Cannot split the files for some or all of the samples!\n")389 sys.exit(0)390 else:391 messagelogger.write("SUCCESS: Successfully split the files for some or all of the samples!\n")392 else:393 messagelogger.write("OK: Found previously split files for sample \"%s\"!\n" %(s))394 395 # load the list of splits396 blastbroker.load_list_splits()397 messagelogger.write("SUCCESS: Successfully loaded the list of file splits!\n")398 399 # create the databse and split combinations as jobs for each sample400 blastbroker.createJobs(redo=False)401 messagelogger.write("SUCCESS: Successfully created the (split, database) pairs!\n")402 403 # make sure you loaded the latest job lists on file404 blastbroker.load_job_lists()405 messagelogger.write("SUCCESS: Successfully recovered the old/existing job list!\n")406 # for each sample load the submitted and completed lists407 # and compute the loadper Server408 blastbroker.load_job_status_lists()409 messagelogger.write("SUCCESS: Successfully loaded the status of the jobs!\n")410 blastbroker.compute_performance()411 try:412 blastbroker.compute_server_loads()413 except:414 print traceback.format_exc(10)415 #print blastbroker.list_jobs_submitted416 #print blastbroker.list_jobs_completed417 #blastbroker.launch_AWS_grid()418 blastbroker.setupStatsVariables()419 messagelogger.write("STATUS: Getting ready to submit jobs to the servers!\n")420 blastbroker.Do_Work()421 #blastbroker.stop_AWS_grid()422 blastbroker.Delete_Remote_Directories()423 424 #print output_dir 425 #print samples_and_input426 #print dbs427 #print gridSettings428 429 message = "\n6. Blasting using Grid ORFs against reference database - "430 #################################...
processFileOps.py
Source:processFileOps.py
1import json2import logging3import os4import com.dataanalytics.utility.constants as const5import com.dataanalytics.utility.messageLogger as ml6import pandas as pd7from pyspark.sql import SparkSession8from pyspark.sql.types import StructType9class ProcessFileOps:10 def __init__(self, spark):11 self.spark = spark12 self.messageLogger = ml.MessageLogger(const.getProjectName(__file__), "Logger Started")13 self.messageLogger.logInfo("Processing read and write operations from files")14 def processCSVoutput(self, df, file_name):15 fn_csv = const.processedFile + file_name +"_csv"16 msg = self.createFolder(fn_csv)17 self.messageLogger.logInfo(msg)18 try:19 self.messageLogger.logInfo("starting csv write operation")20 # pandasDF = df.toPandas()21 # pandasDF.to_csv(fn_csv, index=False, line_terminator='\n')22 df.write.csv(fn_csv, header=True)23 self.messageLogger.logInfo("successfully loaded data into csv file")24 except Exception as e:25 self.messageLogger.logError("!!!Error writing to csv ::: " + str(e.__class__) + "occurred.")26 def processParquetOutput(self, df, file_name, partion_size, partition_col):27 print(partition_col)28 # fn_parquet = const.processedFile + file_name + ".gzip"29 fn_parquet = const.processedFile + file_name + "_parquet"30 msg = self.createFolder(fn_parquet)31 self.messageLogger.logInfo(msg)32 try:33 self.messageLogger.logInfo("starting parquet write operation")34 # pandasDF = df.toPandas()35 # pandasDF.to_parquet(fn_parquet, compression='gzip')36 df.repartition(partion_size).write.partitionBy(partition_col).parquet(fn_parquet)37 self.messageLogger.logInfo("successfully loaded data into parquet")38 except Exception as e:39 self.messageLogger.logError("!!!Error writing to parquet ::: " + str(e.__class__) + "occurred.")40 # Test Parquet Data41 # a = pd.read_parquet(self.fn_parquet)42 # print(a.to_string(index=False))43 # pandasDF.to_parquet(self.fn_parquet,compression = 'gzip') #, index=False, line_terminator='\n')44 def processJsonOutput(self, df, file_name):45 # fn_json = open(const.processedFile + file_name + ".json", 'w+')46 fn_json = const.processedFile + file_name + "_json"47 msg = self.createFolder(fn_json)48 self.messageLogger.logInfo(msg)49 try:50 self.messageLogger.logInfo("starting Json write operation")51 # pandasDF = df.toPandas().reset_index()52 # pandasDF.to_json(fn_json)53 df.write.json(fn_json)54 self.messageLogger.logInfo("successfully loaded data into json")55 except Exception as e:56 self.messageLogger.logError("!!!Error writing to Json ::: " + str(e.__class__) + "occurred.")57 def readCSV(self, file_name):58 df = const.createEmptyDF(self.spark)59 try:60 self.messageLogger.logInfo("Reading from CSV file.")61 df = self.spark.read.csv(file_name, header=True)62 self.messageLogger.logInfo("File reading finished successfully.")63 except Exception as e:64 self.messageLogger.logError(65 "unable to read the file, exception occurred: " + str(e.__class__) + "occurred.")66 return df67 def createFolder(self, fp):68 msg = ""69 try:70 import shutil71 shutil.rmtree(fp)72 msg = "folder successfully deleted"73 except Exception as e:74 msg = "folder not found"75 def writeTextFile(self, file_name, text):76 fp = open(file_name,'w+')77 fp.write(text)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!