Best Python code snippet using yandex-tank
client.py
Source:client.py
...239 response_callback=lambda r: json.loads(r.content.decode('utf8')),240 maintenance_timeouts=maintenance_timeouts,241 maintenance_msg=maintenance_msg242 )243 def __post_raw(self, addr, txt_data, trace=False, interrupted_event=None):244 return self.__make_api_request(245 'POST', addr, txt_data, lambda r: r.content, trace=trace, interrupted_event=interrupted_event)246 def __post(self, addr, data, interrupted_event=None, trace=False):247 return self.__make_api_request(248 'POST',249 addr,250 json=data,251 response_callback=lambda r: r.json(),252 interrupted_event=interrupted_event,253 trace=trace)254 def __put(self, addr, data, trace=False):255 return self.__make_api_request(256 'PUT',257 addr,258 json=data,259 response_callback=lambda r: r.text,260 trace=trace)261 def __patch(self, addr, data, trace=False):262 return self.__make_api_request(263 'PATCH',264 addr,265 json=data,266 response_callback=lambda r: r.text,267 trace=trace)268 def get_task_data(self, task, trace=False):269 return self.__get("api/task/" + task + "/summary.json", trace=trace)270 def new_job(271 self,272 task,273 person,274 tank,275 target_host,276 target_port,277 loadscheme=None,278 detailed_time=None,279 notify_list=None,280 trace=False):281 """282 :return: job_nr, upload_token283 :rtype: tuple284 """285 if not notify_list:286 notify_list = []287 data = {288 'task': task,289 'person': person,290 'tank': tank,291 'host': target_host,292 'port': target_port,293 'loadscheme': loadscheme,294 'detailed_time': detailed_time,295 'notify': notify_list296 }297 logger.debug("Job create request: %s", data)298 api_timeouts = self.api_timeouts()299 while True:300 try:301 response = self.__post(302 "api/job/create.json", data, trace=trace)[0]303 # [{"upload_token": "1864a3b2547d40f19b5012eb038be6f6", "job": 904317}]304 return response['job'], response['upload_token']305 except (self.NotAvailable, self.StoppedFromOnline) as e:306 try:307 timeout = next(api_timeouts)308 logger.warn("API error, will retry in %ss..." % timeout)309 time.sleep(timeout)310 continue311 except StopIteration:312 logger.warn('Failed to create job on lunapark')313 raise self.JobNotCreated(e.message)314 except requests.HTTPError as e:315 raise self.JobNotCreated('Failed to create job on lunapark\n{}'.format(e.response.content))316 except Exception as e:317 logger.warn('Failed to create job on lunapark')318 logger.warn(repr(e), )319 raise self.JobNotCreated()320 def get_job_summary(self, jobno):321 result = self.__get('api/job/' + str(jobno) + '/summary.json')322 return result[0]323 def close_job(self, jobno, retcode, trace=False):324 params = {'exitcode': str(retcode)}325 result = self.__get('api/job/' + str(jobno) + '/close.json?'326 + urllib.parse.urlencode(params), trace=trace)327 return result[0]['success']328 def edit_job_metainfo(329 self,330 jobno,331 job_name,332 job_dsc,333 instances,334 ammo_path,335 loop_count,336 version_tested,337 component,338 cmdline,339 is_starred,340 tank_type=0,341 trace=False):342 data = {343 'name': job_name,344 'description': job_dsc,345 'instances': str(instances),346 'ammo': ammo_path,347 'loop': loop_count,348 'version': version_tested,349 'component': component,350 'tank_type': int(tank_type),351 'command_line': cmdline,352 'starred': int(is_starred)353 }354 response = self.__post(355 'api/job/' + str(jobno) + '/edit.json',356 data,357 trace=trace)358 return response359 def set_imbalance_and_dsc(self, jobno, rps, comment):360 data = {}361 if rps:362 data['imbalance'] = rps363 if comment:364 res = self.get_job_summary(jobno)365 data['description'] = (res['dsc'] + "\n" + comment).strip()366 response = self.__post('api/job/' + str(jobno) + '/edit.json', data)367 return response368 def second_data_to_push_item(self, data, stat, timestamp, overall, case):369 """370 @data: SecondAggregateDataItem371 """372 api_data = {373 'overall': overall,374 'case': case,375 'net_codes': [],376 'http_codes': [],377 'time_intervals': [],378 'trail': {379 'time': str(timestamp),380 'reqps': stat["metrics"]["reqps"],381 'resps': data["interval_real"]["len"],382 'expect': data["interval_real"]["total"] / 1000.0 / data["interval_real"]["len"],383 'disper': 0,384 'self_load':385 0, # TODO abs(round(100 - float(data.selfload), 2)),386 'input': data["size_in"]["total"],387 'output': data["size_out"]["total"],388 'connect_time': data["connect_time"]["total"] / 1000.0 / data["connect_time"]["len"],389 'send_time':390 data["send_time"]["total"] / 1000.0 / data["send_time"]["len"],391 'latency':392 data["latency"]["total"] / 1000.0 / data["latency"]["len"],393 'receive_time': data["receive_time"]["total"] / 1000.0 / data["receive_time"]["len"],394 'threads': stat["metrics"]["instances"], # TODO395 }396 }397 for q, value in zip(data["interval_real"]["q"]["q"],398 data["interval_real"]["q"]["value"]):399 api_data['trail']['q' + str(q)] = value / 1000.0400 for code, cnt in data["net_code"]["count"].items():401 api_data['net_codes'].append({'code': int(code),402 'count': int(cnt)})403 for code, cnt in data["proto_code"]["count"].items():404 api_data['http_codes'].append({'code': int(code),405 'count': int(cnt)})406 api_data['time_intervals'] = self.convert_hist(data["interval_real"][407 "hist"])408 return api_data409 def convert_hist(self, hist):410 data = hist['data']411 bins = hist['bins']412 return [413 {414 "from": 0, # deprecated415 "to": b / 1000.0,416 "count": count,417 } for b, count in zip(bins, data)418 ]419 def push_test_data(420 self,421 jobno,422 upload_token,423 data_item,424 stat_item,425 interrupted_event,426 trace=False):427 items = []428 uri = 'api/job/{0}/push_data.json?upload_token={1}'.format(429 jobno, upload_token)430 ts = data_item["ts"]431 for case_name, case_data in data_item["tagged"].items():432 if case_name == "":433 case_name = "__NOTAG__"434 push_item = self.second_data_to_push_item(case_data, stat_item, ts,435 0, case_name)436 items.append(push_item)437 overall = self.second_data_to_push_item(data_item["overall"],438 stat_item, ts, 1, '')439 items.append(overall)440 api_timeouts = self.api_timeouts()441 while not interrupted_event.is_set():442 try:443 if self.writer_url:444 res = self.__make_writer_request(445 params={446 "jobno": jobno,447 "upload_token": upload_token,448 },449 json={450 "trail": items,451 },452 trace=trace)453 logger.debug("Writer response: %s", res.text)454 return res.json()["success"]455 else:456 res = self.__post(uri, items, interrupted_event, trace=trace)457 logger.debug("API response: %s", res)458 success = int(res[0]['success'])459 return success460 except self.NotAvailable as e:461 try:462 timeout = next(api_timeouts)463 logger.warn("API error, will retry in %ss...", timeout)464 time.sleep(timeout)465 continue466 except StopIteration:467 raise e468 def push_monitoring_data(469 self,470 jobno,471 upload_token,472 send_data,473 interrupted_event,474 trace=False):475 if send_data:476 addr = "api/monitoring/receiver/push?job_id=%s&upload_token=%s" % (477 jobno, upload_token)478 api_timeouts = self.api_timeouts()479 while not interrupted_event.is_set():480 try:481 if self.writer_url:482 res = self.__make_writer_request(483 params={484 "jobno": jobno,485 "upload_token": upload_token,486 },487 json={488 "monitoring": send_data,489 },490 trace=trace)491 logger.debug("Writer response: %s", res.text)492 return res.json()["success"]493 else:494 res = self.__post_raw(495 addr, json.dumps(send_data), trace=trace, interrupted_event=interrupted_event)496 logger.debug("API response: %s", res)497 success = res == 'ok'498 return success499 except self.NotAvailable as e:500 try:501 timeout = next(api_timeouts)502 logger.warn("API error, will retry in %ss...", timeout)503 time.sleep(timeout)504 continue505 except StopIteration:506 raise e507 def push_events_data(self, jobno, operator, send_data):508 if send_data:509 # logger.info('send data: %s', send_data)510 for key in send_data:511 addr = "/api/job/{jobno}/event.json".format(512 jobno=jobno,513 )514 body = dict(515 operator=operator,516 text=key[1],517 timestamp=key[0]518 )519 api_timeouts = self.api_timeouts()520 while True:521 try:522 # logger.debug('Sending event: %s', body)523 res = self.__post_raw(addr, body)524 logger.debug("API response for events push: %s", res)525 success = res == 'ok'526 return success527 except self.NotAvailable as e:528 try:529 timeout = next(api_timeouts)530 logger.warn("API error, will retry in %ss...", timeout)531 time.sleep(timeout)532 continue533 except StopIteration:534 raise e535 def send_status(self, jobno, upload_token, status, trace=False):536 addr = "api/v2/jobs/%s/?upload_token=%s" % (jobno, upload_token)537 status_line = status.get("core", {}).get("stage", "unknown")538 if "stepper" in status:539 status_line += " %s" % status["stepper"].get("progress")540 api_timeouts = self.api_timeouts()541 while True:542 try:543 self.__patch(addr, {"status": status_line}, trace=trace)544 return545 except self.NotAvailable as e:546 try:547 timeout = next(api_timeouts)548 logger.warn("API error, will retry in %ss...", timeout)549 time.sleep(timeout)550 continue551 except StopIteration:552 raise e553 def is_target_locked(self, target, trace=False):554 addr = "api/server/lock.json?action=check&address=%s" % target555 res = self.__get(addr, trace=trace)556 return res[0]557 def lock_target(self, target, duration, trace=False, maintenance_timeouts=None, maintenance_msg=None):558 addr = "api/server/lock.json?action=lock&" + \559 "address=%s&duration=%s&jobno=None" % \560 (target, int(duration))561 res = self.__get(addr, trace=trace, maintenance_timeouts=maintenance_timeouts, maintenance_msg=maintenance_msg)562 return res[0]563 def unlock_target(self, target):564 addr = self.get_manual_unlock_link(target)565 res = self.__get(addr)566 return res[0]567 def get_virtual_host_info(self, hostname):568 addr = "api/server/virtual_host.json?hostname=%s" % hostname569 res = self.__get(addr)570 try:571 return res[0]572 except KeyError:573 raise Exception(res['error'])574 @staticmethod575 def get_manual_unlock_link(target):576 return "api/server/lock.json?action=unlock&address=%s" % target577 def send_config(self, jobno, lp_requisites, config_content, trace=False):578 endpoint, field_name = lp_requisites579 logger.debug("Sending {} config".format(field_name))580 addr = "/api/job/%s/%s" % (jobno, endpoint)581 self.__post_raw(addr, {field_name: config_content}, trace=trace)582 def link_mobile_job(self, lp_key, mobile_key):583 addr = "/api/job/{jobno}/edit.json".format(jobno=lp_key)584 data = {585 'mobile_key': mobile_key586 }587 response = self.__post(addr, data)588 return response589class LPRequisites():590 CONFIGINFO = ('configinfo.txt', 'configinfo')591 MONITORING = ('jobmonitoringconfig.txt', 'monitoringconfig')592 CONFIGINITIAL = ('configinitial.txt', 'configinitial')593class OverloadClient(APIClient):594 """ mocks below for nonexistent backend methods """595 def send_status(self, jobno, upload_token, status, trace=False):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!