Best Python code snippet using yandex-tank
missile.py
Source:missile.py
...49 '''50 self.missiles = cycle([(missile_sample.to_s(), None)])51 def __iter__(self):52 for m in self.missiles:53 info.status.inc_loop_count()54 yield m55class UriStyleGenerator(object):56 '''57 Generates GET ammo based on given URI list.58 '''59 def __init__(self, uris, headers, http_ver='1.1'):60 '''61 uris - a list of URIs as strings.62 '''63 self.uri_count = len(uris)64 self.missiles = cycle([(65 HttpAmmo(66 uri, headers, http_ver=http_ver).to_s(), None) for uri in uris])67 def __iter__(self):68 for m in self.missiles:69 yield m70 info.status.loop_count = info.status.ammo_count / self.uri_count71class AmmoFileReader(object):72 '''Read missiles from ammo file'''73 def __init__(self, filename, **kwargs):74 self.filename = filename75 self.log = logging.getLogger(__name__)76 self.log.info("Loading ammo from '%s'" % filename)77 def __iter__(self):78 def read_chunk_header(ammo_file):79 chunk_header = ''80 while chunk_header is '':81 line = ammo_file.readline()82 if line is '':83 return line84 chunk_header = line.strip('\r\n')85 return chunk_header86 opener = resource.get_opener(self.filename)87 with opener() as ammo_file:88 info.status.af_size = opener.data_length89 # if we got StopIteration here, the file is empty90 chunk_header = read_chunk_header(ammo_file)91 while chunk_header:92 if chunk_header is not '':93 try:94 fields = chunk_header.split()95 chunk_size = int(fields[0])96 if chunk_size == 0:97 if info.status.loop_count == 0:98 self.log.info(99 'Zero-sized chunk in ammo file at %s. Starting over.'100 % ammo_file.tell())101 ammo_file.seek(0)102 info.status.inc_loop_count()103 chunk_header = read_chunk_header(ammo_file)104 continue105 marker = fields[1] if len(fields) > 1 else None106 missile = ammo_file.read(chunk_size)107 if len(missile) < chunk_size:108 raise AmmoFileError(109 "Unexpected end of file: read %s bytes instead of %s"110 % (len(missile), chunk_size))111 yield (missile, marker)112 except (IndexError, ValueError) as e:113 raise AmmoFileError(114 "Error while reading ammo file. Position: %s, header: '%s', original exception: %s"115 % (ammo_file.tell(), chunk_header, e))116 chunk_header = read_chunk_header(ammo_file)117 if chunk_header == '':118 ammo_file.seek(0)119 info.status.inc_loop_count()120 chunk_header = read_chunk_header(ammo_file)121 info.status.af_position = ammo_file.tell()122class SlowLogReader(object):123 '''Read missiles from SQL slow log. Not usable with Phantom'''124 def __init__(self, filename, **kwargs):125 self.filename = filename126 def __iter__(self):127 opener = resource.get_opener(self.filename)128 with opener() as ammo_file:129 info.status.af_size = opener.data_length130 request = ""131 while True:132 for line in ammo_file:133 info.status.af_position = ammo_file.tell()134 if line.startswith('#'):135 if request != "":136 yield (request, None)137 request = ""138 else:139 request += line140 ammo_file.seek(0)141 info.status.af_position = 0142 info.status.inc_loop_count()143class LineReader(object):144 '''One line -- one missile'''145 def __init__(self, filename, **kwargs):146 self.filename = filename147 def __iter__(self):148 opener = resource.get_opener(self.filename)149 with opener() as ammo_file:150 info.status.af_size = opener.data_length151 while True:152 for line in ammo_file:153 info.status.af_position = ammo_file.tell()154 yield (line.rstrip('\r\n'), None)155 ammo_file.seek(0)156 info.status.af_position = 0157 info.status.inc_loop_count()158class CaseLineReader(object):159 '''One line -- one missile with case, tab separated'''160 def __init__(self, filename, **kwargs):161 self.filename = filename162 def __iter__(self):163 opener = resource.get_opener(self.filename)164 with opener() as ammo_file:165 info.status.af_size = opener.data_length166 while True:167 for line in ammo_file:168 info.status.af_position = ammo_file.tell()169 parts = line.rstrip('\r\n').split('\t', 1)170 if len(parts) == 2:171 yield (parts[1], parts[0])172 elif len(parts) == 1:173 yield (parts[0], None)174 else:175 raise RuntimeError("Unreachable branch")176 ammo_file.seek(0)177 info.status.af_position = 0178 info.status.inc_loop_count()179class AccessLogReader(object):180 '''Missiles from access log'''181 def __init__(self, filename, headers=[], http_ver='1.1', **kwargs):182 self.filename = filename183 self.warned = False184 self.headers = set(headers)185 self.log = logging.getLogger(__name__)186 def warn(self, message):187 if not self.warned:188 self.warned = True189 self.log.warning(190 "There are some skipped lines. See full log for details.")191 self.log.debug(message)192 def __iter__(self):193 opener = resource.get_opener(self.filename)194 with opener() as ammo_file:195 info.status.af_size = opener.data_length196 while True:197 for line in ammo_file:198 info.status.af_position = ammo_file.tell()199 try:200 request = line.split('"')[1]201 method, uri, proto = request.split()202 http_ver = proto.split('/')[1]203 if method == "GET":204 yield (205 HttpAmmo(206 uri,207 headers=self.headers,208 http_ver=http_ver, ).to_s(), None)209 else:210 self.warn(211 "Skipped line: %s (unsupported method)" % line)212 except (ValueError, IndexError) as e:213 self.warn("Skipped line: %s (%s)" % (line, e))214 ammo_file.seek(0)215 info.status.af_position = 0216 info.status.inc_loop_count()217def _parse_header(header):218 return dict([(h.strip() for h in header.split(':', 1))])219class UriReader(object):220 def __init__(self, filename, headers=[], http_ver='1.1', **kwargs):221 self.filename = filename222 self.headers = {}223 for header in headers:224 self.headers.update(_parse_header(header))225 self.http_ver = http_ver226 self.log = logging.getLogger(__name__)227 self.log.info("Loading ammo from '%s' using URI format." % filename)228 def __iter__(self):229 opener = resource.get_opener(self.filename)230 with opener() as ammo_file:231 info.status.af_size = opener.data_length232 while True:233 for line in ammo_file:234 info.status.af_position = ammo_file.tell()235 if line.startswith('['):236 self.headers.update(237 _parse_header(line.strip('\r\n[]\t ')))238 elif len(line.rstrip('\r\n')):239 fields = line.split()240 uri = fields[0]241 if len(fields) > 1:242 marker = fields[1]243 else:244 marker = None245 yield (246 HttpAmmo(247 uri,248 headers=[249 ': '.join(header)250 for header in self.headers.items()251 ],252 http_ver=self.http_ver, ).to_s(), marker)253 if info.status.ammo_count == 0:254 self.log.error("No ammo in uri-style file")255 raise AmmoFileError("No ammo! Cover me!")256 ammo_file.seek(0)257 info.status.af_position = 0258 info.status.inc_loop_count()259class UriPostReader(object):260 '''Read POST missiles from ammo file'''261 def __init__(self, filename, headers=None, http_ver='1.1', **kwargs):262 self.filename = filename263 self.headers = {}264 for header in headers:265 self.headers.update(_parse_header(header))266 self.http_ver = http_ver267 self.log = logging.getLogger(__name__)268 self.log.info("Loading ammo from '%s' using URI+POST format", filename)269 def __iter__(self):270 def read_chunk_header(ammo_file):271 chunk_header = ''272 while chunk_header is '':273 line = ammo_file.readline()274 if line.startswith('['):275 self.headers.update(_parse_header(line.strip('\r\n[]\t ')))276 elif line is '':277 return line278 else:279 chunk_header = line.strip('\r\n')280 return chunk_header281 opener = resource.get_opener(self.filename)282 with opener() as ammo_file:283 info.status.af_size = opener.data_length284 # if we got StopIteration here, the file is empty285 chunk_header = read_chunk_header(ammo_file)286 while chunk_header:287 if chunk_header is not '':288 try:289 fields = chunk_header.split()290 chunk_size = int(fields[0])291 if chunk_size == 0:292 self.log.debug(293 'Zero-sized chunk in ammo file at %s. Starting over.'294 % ammo_file.tell())295 ammo_file.seek(0)296 info.status.inc_loop_count()297 chunk_header = read_chunk_header(ammo_file)298 continue299 uri = fields[1]300 marker = fields[2] if len(fields) > 2 else None301 missile = ammo_file.read(chunk_size)302 if len(missile) < chunk_size:303 raise AmmoFileError(304 "Unexpected end of file: read %s bytes instead of %s"305 % (len(missile), chunk_size))306 yield (307 HttpAmmo(308 uri=uri,309 headers=[310 ': '.join(header)311 for header in self.headers.items()312 ],313 method='POST',314 body=missile,315 http_ver=self.http_ver, ).to_s(), marker)316 except (IndexError, ValueError) as e:317 raise AmmoFileError(318 "Error while reading ammo file. Position: %s, header: '%s', original exception: %s"319 % (ammo_file.tell(), chunk_header, e))320 chunk_header = read_chunk_header(ammo_file)321 if chunk_header == '':322 self.log.debug(323 'Reached the end of ammo file. Starting over.')324 ammo_file.seek(0)325 info.status.inc_loop_count()326 chunk_header = read_chunk_header(ammo_file)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!