Best Python code snippet using fMBT_python
data.py
Source:data.py
1import sys2import math3import readVCF4import fileinput5#Reads a tab file with name string given by toRead.6#Constructs a list of chromosome items, one per chromosome, and inserts7#chromosome name, start bp, end bp, coverage per 1000 bp in these items.8def readTab(toRead):9 totalReadLines = 010 tabFileName = toRead11 chromosomes = []12 coverageNorm = 013 coverageNormLog = 014 totalBP = 015 numChr = 016 with open(toRead, 'r') as tab:17 #Read the first line in the file, should start with #CHR.18 line = tab.readline()19 if (not line.startswith("#CHR")):20 print('TAB file is not in correct format')21 return None22 else:23 print('TAB file seems ok, continuing read')24 #Read the second line and create the first Chromosome object.25 #All following lines should be formatted as: chrName\tstart\tend\tcoverage26 line = tab.readline()27 fields = line.split('\t')28 if (not len(fields) == 4):29 print("TAB file not formatted correctly on line 2")30 return None31 else:32 curChrName = fields[0]33 chrom = Chromosome(curChrName)34 chrom.addCoverage(float(fields[3]))35 coverageNorm += float(fields[3])36 if(float(fields[3])) > 0:37 coverageNormLog += math.log(float(fields[3]),2)38 totalReadLines += 139 lastRead = line40 chromosomes.append(chrom)41 numChr += 142 #Iterate over the rest of the lines in the file43 for line in tab:44 fields = line.split('\t')45 if (not len(fields) == 4):46 print("TAB file not formatted correctly")47 return -148 #If we come across a new chromosome, assign end on current chromosome (contained in last read line)49 #Then create a new Chromosome object, assign name & start. Add to list.50 if fields[0] != curChrName:51 chrom.setEnd(lastRead.split('\t')[2])52 curChrName = fields[0]53 chrom = Chromosome(curChrName)54 chromosomes.append(chrom)55 numChr += 156 #Every line contains coverage data of interest, for current chromosome57 chrom.addCoverage(float(fields[3]))58 coverageNorm += float(fields[3])59 if(float(fields[3])) > 0:60 coverageNormLog += math.log(float(fields[3]),2)61 totalReadLines += 162 #Store last read line and go to next line63 lastRead = line64 chrom.setEnd(lastRead.split('\t')[2])65 coverageNorm = coverageNorm / totalReadLines66 coverageNormLog = coverageNormLog / totalReadLines67 #sum total read bp68 for chromo in chromosomes:69 totalBP += int(chromo.end)70 return (chromosomes,coverageNorm,coverageNormLog,totalBP)71def readCytoTab(toRead):72 cytoTabName = toRead73 cytoTabInfo = []74 with open(toRead, 'r') as tab:75 #Read the first line in the file, should start with #chromosome.76 line = tab.readline()77 if (not line.startswith("chr1")):78 print('TAB file is not in correct format')79 return None80 else:81 print('CytoBandTAB file seems ok, continuing read')82 #The fields are as following: #chromosome, startPos, endPos, cytoband, stain value83 fields = line.split('\t')84 fields[0] = fields[0].strip('chr')85 fields[4] = fields[4].strip('\n')86 cytoTab = [fields[0], fields[1], fields[2], fields[3], fields[4]]87 cytoTabInfo.append(cytoTab)88 for line in tab:89 fields = line.split('\t')90 fields[0] = fields[0].strip('chr')91 fields[4] = fields[4].strip('\n')92 cytoTab = [fields[0], fields[1], fields[2], fields[3], fields[4]]93 cytoTabInfo.append(cytoTab)94 return cytoTabInfo95def readVCFFile(toRead, chromosomes):96 vcfFileName = toRead97 vcfInfoLines = []98 with open(toRead, 'r') as vcf:99 #The first lines should be a number of meta-information lines, prepended by ##.100 #Should begin with fileformat. Store these. Check first line for correct format.101 line = vcf.readline()102 if (not line.startswith("##fileformat=")):103 print("VCF file is not in correct format")104 return None105 else:106 print("VCF file seems ok, continuing read")107 while (line.startswith("##")):108 vcfInfoLines.append(line)109 line = vcf.readline()110 #A header line prepended by # should follow containing 8 fields, tab-delimited.111 #These are in order CHROM, POS, ID, REF, ALT, QUAL, FILTER, INFO. Store in info line list.112 fields = line.split('\t')113 if (not (line.startswith('#') or len(fields) == 8) ):114 print("Header columns missing in VCF file")115 return None116 else:117 vcfInfoLines.append(line)118 #All following lines are tab-delmited data lines.119 #Store variant data in chromosome item corresponding to CHROM field120 numvars = 0121 for line in vcf:122 numvars += 1123 #Feed every data line into readVCF, returning: (chrA,posA,chrB,posB,event_type,description,format)124 (chrA,posA,chrB,posB,event_type,description,format) = readVCF.readVCFLine(line)125 #Iterate through chromosome list to find match to insert data into126 for chromo in chromosomes:127 if chromo.name == chrA:128 chromo.addVariant(chrA,posA,chrB,posB,event_type,description,format)129 break130 return (chromosomes,vcfInfoLines)131#Reads a general tab delimited file (such as a bed file)132def readGeneralTab(toRead):133 tabLines = []134 with open(toRead, 'r') as tab:135 #Skip first line136 line = tab.readline()137 for line in tab:138 #The fields are as following: #chromosome, startPos, endPos, text139 fields = line.split('\t')140 fields[0] = fields[0].replace("chr","").replace("Chr","").replace("CHR","")141 fields[-1] = fields[-1].strip('\n')142 tabLines.append(fields)143 return tabLines144def readConfig(toRead):145 circularConfig = {}146 coverageConfig = {}147 karyoConfig = {}148 heatmapConfig = {}149 colorConfig = {}150 with open(toRead, 'r') as config:151 activeSection = "CIRCULAR"152 for line in config:153 if line.startswith('#'):154 continue155 if line.startswith('['):156 if line.startswith('[CIRCULAR]'):157 activeSection = 'CIRCULAR'158 elif line.startswith('[COVERAGE]'):159 activeSection = 'COVERAGE'160 elif line.startswith('[KARYOGRAM]'):161 activeSection = 'KARYOGRAM'162 elif line.startswith('[HEATMAP]'):163 activeSection = 'HEATMAP'164 elif line.startswith('[COLORS]'):165 activeSection = 'COLORS'166 else:167 if activeSection == "CIRCULAR":168 fields = line.split('=')169 circularConfig[fields[0]] = fields[1].strip('\n')170 elif activeSection == "COVERAGE":171 fields = line.split('=')172 coverageConfig[fields[0]] = fields[1].strip('\n')173 elif activeSection == "KARYOGRAM":174 fields = line.split('=')175 karyoConfig[fields[0]] = fields[1].strip('\n')176 elif activeSection == "HEATMAP":177 fields = line.split('=')178 heatmapConfig[fields[0]] = fields[1].strip('\n')179 elif activeSection == 'COLORS':180 fields = line.split('=')181 fields[1] = fields[1]182 colorConfig[fields[0]] = fields[1].strip('\n')183 return (circularConfig,coverageConfig,karyoConfig,heatmapConfig,colorConfig)184def saveConfig(fileName,circularConfig,coverageConfig,karyoConfig,heatmapConfig,colorConfig):185 with open(fileName,'r+') as config:186 configData = config.readlines()187 activeSection = "CIRCULAR"188 newData = []189 for line in configData:190 if line.startswith('#'):191 line = line.strip('\n')192 newData.append(line)193 continue194 if line.startswith('['):195 if line.startswith('[CIRCULAR]'):196 activeSection = 'CIRCULAR'197 elif line.startswith('[COVERAGE]'):198 activeSection = 'COVERAGE'199 elif line.startswith('[KARYOGRAM]'):200 activeSection = 'KARYOGRAM'201 elif line.startswith('[HEATMAP]'):202 activeSection = 'HEATMAP'203 elif line.startswith('[COLORS]'):204 activeSection = 'COLORS'205 else:206 if activeSection == "CIRCULAR":207 fields = line.split('=')208 line = line.replace(fields[1],circularConfig[fields[0]])209 elif activeSection == "COVERAGE":210 fields = line.split('=')211 line = line.replace(fields[1],coverageConfig[fields[0]])212 elif activeSection == "KARYOGRAM":213 fields = line.split('=')214 line = line.replace(fields[1],karyoConfig[fields[0]])215 elif activeSection == "HEATMAP":216 fields = line.split('=')217 line = line.replace(fields[1],heatmapConfig[fields[0]])218 elif activeSection == 'COLORS':219 pass220 line = line.strip('\n')221 newData.append(line)222 config.seek(0)223 config.truncate()224 for line in newData:225 config.write(line + '\n')226class Chromosome():227 def __init__(self, name):228 self.name = name229 self.coverage = []230 self.coverageLog = []231 self.display = False232 self.variants = []233 self.connections = []234 self.display_connections = False235 self.display_cytoBandNames = False236 def addCoverage(self, coverageValue):237 self.coverage.append(coverageValue)238 if(coverageValue > 0):239 self.coverageLog.append(math.log(coverageValue,2))240 else:241 self.coverageLog.append(0)242 def setEnd(self,end):243 self.end = end244 def addVariant(self,chrA,posA,chrB,posB,event_type,description,format):245 #The variants are by default set to be shown246 display_variant = True247 marked = False248 #For every variant we would like the genes in CSQ, if this exists249 if "CSQ" in description:250 csqField = description["CSQ"]251 #The CSQ field has several sub-fields, each separated with ','252 subList = csqField.split(',')253 geneList = []254 for subIndex in range(len(subList)):255 #The gene name field is always the fourth element in the CSQ field separated with '|'256 subSubList = subList[subIndex].split('|')257 geneList.append(subSubList[3])258 #Convert the list to a set to remove any duplicates259 geneSet = set(geneList)260 s = ', '261 allGenes = s.join(geneSet)262 else:263 allGenes = ""264 #We would also like the CYTOBAND field, if this exists265 if "CYTOBAND" in description:266 cband = description["CYTOBAND"]267 else:268 cband = None269 if "RankScore" in description:270 rankScore = description["RankScore"]271 else:272 rankScore = None273 #Add the variant data to this chromosome274 variant = [chrA,posA,chrB,posB,event_type,description,format,allGenes,cband,display_variant,rankScore, marked]275 self.variants.append(variant)276 def createConnections(self):277 #These corresponding values for the variant are added to the list: CHRA,CHRB,WINA,WINB,CYTOBAND278 self.connections = []279 for variant in self.variants:280 if not variant[9]:281 continue282 else:283 description = variant[5]284 if "CYTOBAND" in description:285 cband = description["CYTOBAND"]286 else:287 cband = None288 if variant[0] is not variant[2]:289 connection = [variant[0],variant[2],description["WINA"],description["WINB"],cband]290 self.connections.append(connection)291 else:292 connection = [variant[0], variant[2], str(variant[1]) + "," + str(variant[1]), str(variant[3]) + "," + str(variant[3]), cband]...
coverage_data.py
Source:coverage_data.py
1from __future__ import annotations2from typing import List, Set3from dataclasses import dataclass, field, asdict4import enum5from pathlib import Path6import json7from ros_metrics_reporter.color import Color8import pandas as pd9from datetime import datetime10class CoverageKeys(enum.Enum):11 Lines = "Lines"12 Functions = "Functions"13 Branches = "Branches"14@dataclass15class CoverageValue:16 label: str = ""17 line: float = 0.018 function: float = 0.019 branch: float = 0.020 def get(self, coverage_key: CoverageKeys):21 if coverage_key == CoverageKeys.Lines:22 return self.line23 elif coverage_key == CoverageKeys.Functions:24 return self.function25 elif coverage_key == CoverageKeys.Branches:26 return self.branch27 else:28 raise Exception("Unknown coverage key")29@dataclass30class Coverage:31 package: str = ""32 value: List[CoverageValue] = field(default_factory=list)33 def write(self, file: Path):34 with open(file, "w") as f:35 json.dump(asdict(self), f, indent=2)36 def read(self, file: Path) -> Coverage:37 with open(file, "r") as f:38 data = json.load(f)39 self.package = data["package"]40 self.value = [CoverageValue(**value) for value in data["value"]]41 return self42 def get_label_value(self, label: str) -> CoverageValue:43 for value in self.value:44 if value.label == label:45 return value46 return CoverageValue(label=label)47 def get_labels(self) -> Set[str]:48 return {value.label for value in self.value}49@dataclass50class CoverageStamped(Coverage):51 date: datetime = field(default_factory=datetime.now)52@dataclass53class Threshold:54 high: float = 0.055 med: float = 0.056 def write(self, file: Path):57 with open(file, "w") as f:58 json.dump(asdict(self), f, indent=2)59@dataclass60class CoverageData:61 coverage: List[Coverage] = field(default_factory=list)62 threshold: Threshold = field(default_factory=Threshold)63 def add_threshold(self, high, med):64 self.threshold.high = high65 self.threshold.med = med66 def add_coverage(self, coverage: Coverage):67 for i, item in enumerate(self.coverage):68 if item.package == coverage.package:69 self.coverage[i].value.extend(coverage.value)70 return71 self.coverage.append(coverage)72 def add_coverages(self, coverages: List[Coverage]):73 for coverage in coverages:74 self.add_coverage(coverage)75 def save_coverage(self, output_dir: Path):76 for item in self.coverage:77 output_json_dir = output_dir / item.package78 output_json_dir.mkdir(parents=True, exist_ok=True)79 item.write(output_json_dir / "coverage.json")80 def save_threshold_value(self, output_path: Path):81 self.threshold.write(output_path)82 def get_coverage(self, package: str) -> Coverage:83 for item in self.coverage:84 if item.package == package:85 return item86 return Coverage(package=package)87 def get_color(self, value: str, coverage_key: CoverageKeys) -> Color:88 if coverage_key == CoverageKeys.Lines:89 if value >= self.threshold.high:90 return Color.GREEN91 elif value >= self.threshold.med:92 return Color.YELLOW93 else:94 return Color.RED95 elif coverage_key == CoverageKeys.Functions:96 if value >= self.threshold.high:97 return Color.GREEN98 elif value >= self.threshold.med:99 return Color.YELLOW100 else:101 return Color.RED102 elif coverage_key == CoverageKeys.Branches:103 if value >= self.threshold.high:104 return Color.GREEN105 elif value >= self.threshold.med:106 return Color.YELLOW107 else:108 return Color.RED109 else:...
makeCoverageDB.py
Source:makeCoverageDB.py
1# -*- coding: utf-8 -*-2"""3Created on Fri Oct 23 21:58:09 20154@author: jiun5"""6import os7import re8import sys9from pymongo import MongoClient10# main function11# get list of txt.gz files, extract, parse and print to screen12# input main directory13def main():14 dirName = sys.argv[1]15 db = connecttomongo()16 filenames = next(os.walk(dirName))[2]17 for iF in filenames:18 if iF.endswith(".gz"):19 fName = os.path.join(dirName, iF)20 fObj = unzipCoverageFile(fName)21 22 uid = os.path.splitext(os.path.splitext(iF)[0])[0]23 parselines_insert(db, fObj, uid)24 25# function to parse coverage text file26# takes in the file object and prints the each coverage value to screen27# in the following format:28# chromosomeName GeneName Location CoverageValue29def parselines_insert(db, fileObj, uid):30 31 for line in fileObj:32 bits = re.split("\t", line)33 #coverage = re.split(",", re.sub("\n","",bits[3]))34 coverage = [int(i) for i in re.sub("\n", "", bits[3]).split(",")]35 startPos = int(bits[2])36 endPos = int(startPos) + len(coverage) 37 # seqLen = len(coverage)38 # seqIdx = range(startPos,startPos + seqLen)39 40 content = {"UID": uid, "chr": bits[0], "genename": bits[1], "startpos": startPos, "endpos": endPos, "coverage": coverage}41 42 #print content # print it43 insertrecord(db, content) # insert it44# files are stored in gz format, so unzip and get unique id 45# based on the filename 46def unzipCoverageFile(fileName):47 import gzip48 fileObj = gzip.open(fileName, 'rb')49 50 return fileObj51def insertrecord(db, r):52 db.insert(r)53 54def connecttomongo():55 #connection = MongoClient("mongodb://localhost:27017")56 connection = MongoClient("mongodb://146.118.98.44:27017")57 db_test = connection.healthhack.testcoverage58 return db_test59if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!