How to use seq_generator method in pyresttest

Best Python code snippet using pyresttest_python

randread.py

Source:randread.py Github

copy

Full Screen

1#!/usr/bin/python32"""3Generate random FASTQ reads.4"""5import argparse6import random7import sys8from abc import ABCMeta, abstractmethod9class QualityGenerator(ABCMeta):10 @abstractmethod11 def get_quality_string(self):12 pass13class UniformQualityGenerator(metaclass=QualityGenerator):14 def __init__(self, params):15 tok = params.split(':')16 if len(tok) > 2:17 raise ValueError('Uniform generator: Received more than 2 parameter: {}'.format(params))18 if len(tok) == 1:19 tok[1] = tok[0]20 self.low = int(tok[0]) + 3321 self.high = int(tok[1]) + 3322 def get_quality_string(self, length):23 return ''.join(chr(random.choice(range(self.low, self.high))) for i in range(int(length)))24class FastaGenerator:25 def __init__(self, seq_generator):26 self.seq_generator = seq_generator27 def generate(self, num_sequence, read_len, start_index=1):28 for seq_index in range(start_index, start_index + num_sequence):29 yield '>Seq-{}\n{}'.format(seq_index, self.seq_generator.next_sequence(read_len))30class FastqGenerator:31 def __init__(self, seq_generator, quality_generator):32 self.seq_generator = seq_generator33 self.quality_generator = quality_generator34 def generate(self, num_sequence, read_len, start_index=1):35 for seq_index in range(start_index, start_index + num_sequence):36 yield '@Seq-{}\n{}\n+Seq-{}\n{}'.format(37 seq_index,38 self.seq_generator.next_sequence(read_len),39 seq_index,40 self.quality_generator.get_quality_string(read_len))41class RawGenerator:42 def __init__(self, seq_generator):43 self.seq_generator = seq_generator44 def generate(self, num_sequence, read_len, start_index=1):45 for seq_index in range(start_index, start_index + num_sequence):46 yield self.seq_generator.next_sequence(read_len)47class SequenceGenerator:48 def __init__(self, prob_base=1.0, seq_rna=False, all_iupac=False, mix_case=False):49 """50 Create a new sequence generator.51 :param prob_base: Probability of A, C, G, or T.52 :param n_only: Generate only N (not all IUPAC characters) for non-ACGT bases if prob_base is less than 1.0.53 """54 self.prob_base = prob_base55 self.all_iupac = all_iupac56 if self.prob_base < 0.0:57 self.prob_base = 0.058 elif self.prob_base > 1.0:59 self.prob_base = 1.060 # Set bases61 self.base = ['A', 'C', 'G', 'T']62 if seq_rna:63 self.base[3] = 'U'64 # Set other characters65 if all_iupac:66 self.other = ['R', 'Y', 'S', 'W', 'K', 'M', 'B', 'D', 'H', 'V', 'N', '.', '-']67 else:68 self.other = ['N']69 if mix_case:70 add_bases = list()71 for seq_char in self.base:72 add_bases.append(seq_char.lower())73 self.base += add_bases74 add_bases = list()75 for seq_char in self.other:76 if seq_char.isalpha():77 add_bases.append(seq_char.lower())78 self.other += add_bases79 return80 def next_sequence(self, num_bases):81 return ''.join(self.next_base(num_bases))82 def next_base(self, num_bases):83 """84 Generate the next ``len`` bases.85 :param len: Number of bases to generate.86 :return: A generator for ``len`` bases.87 """88 for i in range(num_bases):89 rand = random.random()90 if rand < self.prob_base:91 yield self.base[int((self.prob_base - rand) / self.prob_base * len(self.base))]92 else:93 yield self.other[int((rand - self.prob_base) / (1 - self.prob_base) * len(self.other))]94def main(args=None):95 # Check arguments96 if args is None:97 args = sys.argv[1:]98 # Get parsed arguments99 parser = argparse.ArgumentParser(description='Generate random FASTQ reads.', prog='randread')100 parser.add_argument('-d', '--dist', dest='score_dist', type=str, default='u:0:93',101 help='Distribution of FASTQ quality scores (default = u:0:93).')102 parser.add_argument('-f', '--format', dest='format', type=str, default='fastq',103 help='Format of the file to generate (default = fastq).')104 parser.add_argument('-l', '--readlen', dest='read_len', type=int, default=100,105 help='Read length (default = 100).')106 parser.add_argument('-m', '--mixcase', dest='mix_case', default=False, action='store_true',107 help='Generate sequences with a mix of upper and lower case characters.')108 parser.add_argument('-n', '--numreads', dest='num_reads', type=int, default=1,109 help='Number of reads to generate (default = 1).')110 parser.add_argument('-p', '--pbase', dest='prob_base', type=float, default=1.0,111 help='The probability that a base in a sequence is A, C, G, or T (default = 1.0).')112 parser.add_argument('-r', '--rna', dest='seq_rna', default=False, action='store_true',113 help='Generate U in place of T')114 parser.add_argument('-s', '--seed', dest='seed', type=int, default=None,115 help='Set random seed, which will result in the same sequence being generated for the '116 'same seed. By default, generated sequences are random on each run.')117 parser.add_argument('-u', '--iupac', dest='all_iupac', action='store_true', default=False,118 help='If the probability of generating A, C, G, or T is less than 1.0, the default behavior is '119 'to output N for all non-ACGT characters. This option will output any IUPAC nucleotide '120 'character instead of just N for each non-ACGT character.')121 parsed_args = parser.parse_args(args=args)122 # Set base probability123 seq_generator = SequenceGenerator(parsed_args.prob_base, parsed_args.seq_rna, parsed_args.all_iupac, parsed_args.mix_case)124 # Set random seed125 random.seed(parsed_args.seed)126 # Check parsed arguments127 if parsed_args.num_reads < 1:128 raise ValueError('Cannot generate less than 1 read: {}'.format(parsed_args.num_reads))129 if parsed_args.read_len < 1:130 raise ValueError('Cannot generate reads with less than 1 base: {}'.format(parsed_args.read_len))131 # Determine distribution132 tok = parsed_args.score_dist.split(':', 1)133 tok[0] = tok[0].strip()134 if len(tok) == 1:135 tok[1] = ''136 else:137 tok[1] = tok[1].strip()138 # Get generator139 if tok[0] == 'u':140 quality_generator = UniformQualityGenerator(tok[1])141 else:142 raise ValueError('Cannot find generator for type: {}'.format(tok[0]))143 # Get sequence generator144 format = parsed_args.format.strip().lower()145 if format == 'fasta':146 seq_generator = FastaGenerator(seq_generator)147 join_str = '\n'148 elif format == 'fastq':149 seq_generator = FastqGenerator(seq_generator, quality_generator)150 join_str = '\n'151 elif format == 'raw':152 seq_generator = RawGenerator(seq_generator)153 join_str = '\n\n'154 else:155 raise ValueError('Unrecognized sequence format: {}: Must be fasta or fastq'.format(parsed_args.format))156 # Generate sequences157 print(join_str.join(seq_generator.generate(parsed_args.num_reads, parsed_args.read_len)))158 # for seq_index in range(1, parsed_args.num_reads + 1):159 # print('@Seq-{}'.format(seq_index))160 # print(generator.next_sequence(parsed_args.read_len))161 # print('+')162 # print(quality_generator.get_quality_string(parsed_args.read_len))163if __name__ == '__main__':...

Full Screen

Full Screen

batch_processor.py

Source:batch_processor.py Github

copy

Full Screen

1#------------------------------------------------------------------------------2#3# This file is part of sgidspace.4#5# sgidspace is free software: you can redistribute it and/or modify6# it under the terms of the GNU Affero General Public License as published by7# the Free Software Foundation, either version 3 of the License, or8# (at your option) any later version.9#10# sgidspace is distributed in the hope that it will be useful,11# but WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13# GNU Affero General Public License for more details.14#15# You should have received a copy of the GNU Affero General Public License16# along with sgidspace. If not, see <http://www.gnu.org/licenses/>.17#18#------------------------------------------------------------------------------19import numpy as np20from sgidspace.sequence_generator import SGISequenceGenerator, IUPAC_CODES21from sgidspace.filters import filter_records, get_nested_key22from sgidspace.keywords import extract_keywords23from sgidspace.gene_names import process_gene_name24import keras.backend as K25def make_batch_processor(26 main_datadir,27 subdir,28 batch_size,29 outputs,30 inference=False,31 classflags=None,32 unbounded_iteration=True,33 from_embed=False34):35 seq_generator = SGISequenceGenerator(36 main_datadir + '/' + subdir + '/*.json*',37 unbounded_iteration=unbounded_iteration38 )39 return BatchProcessor(40 seq_generator,41 batch_size,42 outputs,43 inference=inference,44 classflags=classflags,45 from_embed=from_embed46 )47def flatten_ec(ecnum):48 if ecnum is None:49 return None50 elif ecnum is '':51 return ''52 out = []53 x = ecnum.split('.')54 for i in xrange(len(x)):55 if x[i] != '-':56 out.append('.'.join(x[:(i+1)]))57 return out58def transform_records(generator, data_dicts):59 """60 modify records from the form they take on disk to the format used by batch processor61 """62 for data in generator:63 if 'translation_description' in data:64 data['translation_description_keywords'] = extract_keywords(data['translation_description'])65 if 'gene_name' in data:66 data['gene_name'] = [process_gene_name(data['gene_name'])]67 if 'ec_number' in data:68 data['ec_number_ecflat'] = flatten_ec(data['ec_number'])69 if 'gene3d' in data:70 x = []71 for v in data['gene3d']:72 x += flatten_ec(v)73 data['gene3d_ecflat'] = list(np.unique(x))74 if 'diamond' in data:75 data['diamond_cluster_id'] = [data['diamond']['cluster_id']]76 for field in data_dicts:77 if field in data:78 dtransform = data_dicts[field]79 if type(data[field]) is list:80 s = set()81 for x in data[field]:82 for v in dtransform.get(str(x), []):83 s.add(v)84 data[field + '_dict'] = list(s)85 else:86 data[field + '_dict'] = dtransform.get(str(data[field]), [])87 yield data88class BatchProcessor():89 """90 Generates batches of data for training91 SGISequenceGenerator is used as a starting point which is subsequently92 filtered, transformed, and grouped into batches of size `batch_size`.93 """94 def __iter__(self):95 return self96 def next(self):97 if self.done:98 raise StopIteration()99 return self.fetch_batch()100 def __init__(101 self,102 seq_generator,103 batch_size,104 outputs,105 inference=False,106 classflags=None,107 from_embed=False108 ):109 self.batch_size = batch_size110 self.seq_generator = seq_generator111 self.outputs = outputs112 self.inference = inference113 self.done = False114 self.input_symbols = {label: i for i, label in enumerate(IUPAC_CODES)}115 self.from_embed = from_embed116 if from_embed:117 self.esize = 256118 self.class_index = {}119 data_dicts = {}120 for o in outputs:121 if 'class_labels' in o:122 self.class_index[o['name']] = {label: i for i, label in enumerate(o['class_labels'])}123 if type(o['datafun']) is dict:124 data_dicts[o['field']] = o['datafun']125 self.seq_generator = transform_records(self.seq_generator, data_dicts)126 if not from_embed:127 self.seq_generator = filter_records(self.seq_generator, outputs, classflags=classflags, inference=inference)128 def fetch_records(self):129 records = []130 for i in xrange(self.batch_size):131 r = next(self.seq_generator, None)132 if r is None:133 self.done = True134 break135 records.append(r)136 return records137 def fetch_batch(self):138 records = self.fetch_records()139 # initialize input140 X = {}141 if self.from_embed:142 X['embedding'] = np.zeros(143 [144 len(records),145 self.esize,146 ],147 dtype=K.floatx(),148 )149 else:150 X['sequence_input'] = np.zeros(151 [152 len(records),153 2000,154 len(IUPAC_CODES),155 ],156 dtype=K.floatx(),157 )158 Y = {}159 # initialize output buffer160 for o in self.outputs:161 dtype = K.floatx()162 shape = [len(records), o['classcount']]163 Y[o['name']] = np.zeros(shape, dtype=dtype)164 # Copy record information165 for i in xrange(len(records)):166 record = records[i]167 if self.from_embed:168 X['embedding'][i,:] = np.array(record['embedding'], dtype=K.floatx())169 else:170 # input_sequence171 input_sequence = record['protein_sequence']172 for sequence_index in xrange(len(input_sequence)):173 symbol_index = self.input_symbols.get(input_sequence[sequence_index])174 if symbol_index is not None:175 X['sequence_input'][i, sequence_index, symbol_index] = 1176 # add zero padding for the rest177 aai = self.input_symbols.get("*")178 X['sequence_input'][i, sequence_index + 1:, aai] = 1179 for o in self.outputs:180 output_name = o['name']181 r = get_nested_key(record, o['name'])182 if r is not None:183 if o['type'] == 'numeric':184 Y[output_name][i] = r185 elif o['type'] == 'boolean':186 Y[output_name][i, int(r)] = 1187 elif o['type'] in ['onehot', 'multihot']:188 for label in r:189 index = self.class_index[output_name].get(str(label))190 if index is not None:191 Y[output_name][i, index] = 1192 elif o['type'] == 'onehot':193 Y[output_name][i, o['classcount'] - 1] = 1194 else:195 if o['type'] == 'onehot':196 Y[output_name][i, o['classcount'] - 1] = 1197 198 if self.inference:199 return X, records200 else:...

Full Screen

Full Screen

binary_distr.py

Source:binary_distr.py Github

copy

Full Screen

1from ycsb_zipf import AccurateDistributionGenerator2def generate_sequence(N, flip_to = -1):3 current_sequence = [0]4 5 cur_len = 16 cur_i = 17 while cur_i < N:8 current_sequence += current_sequence9 cur_len *= 210 current_sequence[-1] = cur_i11 cur_i += 112 if flip_to >= 0:13 old_seq = current_sequence14 first = True15 current_sequence = []16 for i in old_seq:17 if i == flip_to:18 if first:19 first = False20 current_sequence.append(i)21 else:22 current_sequence.append(cur_i)23 cur_i += 124 else:25 current_sequence.append(i)26 return current_sequence27def generate_freqs(N, flip_to = -1):28 lambdas = [1.0 / 2**i for i in range(1, N)]29 lambdas.append(lambdas[-1])30 if flip_to >= 0:31 seq_size = 2**(N - 1)32 item_freq = 1.0 / seq_size33 lambdas[flip_to] = item_freq34 original_cnt = 2**(N - flip_to - 2)35 lambdas += [item_freq for i in range(original_cnt - 1)]36 return lambdas37def check_freqs(N, f):38 seq = generate_sequence(N, f)39 freqs = generate_freqs(N, f)40 counts = [0 for i in freqs]41 csum = 042 for i in seq:43 counts[i] += 144 csum += 145 freqs_m = [float(c) / csum for c in counts]46 return sum([abs(a - b) for (a, b) in zip(freqs_m, freqs)])47def seq_generator_number_of_counts(N, flip_to):48 """ tells you how many big the universe generated by those parameters will be """49 return ((2 ** (N - flip_to - 2)) + N - 1)50class DetermBinaryGenerator(object):51 def __init__(self, count, theta):52 self.N = count53 54 flip_to = 055 seq_generator = 156 cur_universe = seq_generator_number_of_counts(seq_generator, flip_to)57 while cur_universe < count:58 seq_generator += 159 cur_universe = seq_generator_number_of_counts(seq_generator, flip_to)60 61 if cur_universe != count:62 raise Exception("You gave me an unachievable count. Enjoy the exception.")63 self.sequence = generate_sequence(seq_generator, flip_to)64 self.continuation = 065 self.lambdas = generate_freqs(seq_generator, flip_to)66 def get_popularity(self, item):67 return self.lambdas[item]68 def get_next(self, rand_float):69 item = self.sequence[self.continuation]70 self.continuation = (self.continuation + 1) % self.N71 return (item, self.get_popularity(item))72class RandBinaryGenerator(AccurateDistributionGenerator):73 def __init__(self, count, theta):74 flip_to = 075 seq_generator = 176 cur_universe = seq_generator_number_of_counts(seq_generator, flip_to)77 while cur_universe < count:78 seq_generator += 179 cur_universe = seq_generator_number_of_counts(seq_generator, flip_to)80 81 if cur_universe != count:82 raise Exception("You gave me an unachievable count. Enjoy the exception.")...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pyresttest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful