How to use push_events_data method in yandex-tank

Best Python code snippet using yandex-tank

plugin.py

Source:plugin.py Github

copy

Full Screen

...79 @abstractmethod80 def push_monitoring_data(self):81 """method for pushing monitoring data"""82 @abstractmethod83 def push_events_data(self):84 """method for pushing event data"""85 @abstractmethod86 def lock_target(self):87 """method for locking target during..."""88 @abstractmethod89 def set_imbalance_and_dsc(self):90 """method for imbalance detection"""91 @abstractmethod92 def is_target_locked(self):93 """method for checking target lock"""94def chop(data_list, chunk_size):95 if sys.getsizeof(str(data_list)) <= chunk_size:96 return [data_list]97 elif len(data_list) == 1:98 logger.info("Too large piece of Telegraf data. Might experience upload problems.")99 return [data_list]100 else:101 mid = len(data_list) / 2102 return chop(data_list[:mid], chunk_size) + chop(data_list[mid:], chunk_size)103class Plugin(AbstractPlugin, AggregateResultListener,104 MonitoringDataListener):105 RC_STOP_FROM_WEB = 8106 VERSION = '3.0'107 SECTION = 'uploader'108 def __init__(self, core, cfg, name):109 AbstractPlugin.__init__(self, core, cfg, name)110 self.data_queue = Queue()111 self.monitoring_queue = Queue()112 if self.core.error_log:113 self.events_queue = Queue()114 self.events_reader = EventsReader(self.core.error_log)115 self.events_processing = Drain(self.events_reader, self.events_queue)116 self.add_cleanup(self.stop_events_processing)117 self.events_processing.start()118 self.events = threading.Thread(target=self.__events_uploader)119 self.events.daemon = True120 self.retcode = -1121 self._target = None122 self.task_name = ''123 self.token_file = None124 self.version_tested = None125 self.send_status_period = 10126 self.status_sender = threading.Thread(target=self.__send_status)127 self.status_sender.daemon = True128 self.upload = threading.Thread(target=self.__data_uploader)129 self.upload.daemon = True130 self.monitoring = threading.Thread(target=self.__monitoring_uploader)131 self.monitoring.daemon = True132 self._is_telegraf = None133 self.backend_type = BackendTypes.identify_backend(self.cfg['api_address'], self.cfg_section_name)134 self._task = None135 self._api_token = ''136 self._lp_job = None137 self._lock_duration = None138 self._info = None139 self.locked_targets = []140 self.web_link = None141 self.finished = False142 def set_option(self, option, value):143 self.cfg.setdefault('meta', {})[option] = value144 self.core.publish(self.SECTION, 'meta.{}'.format(option), value)145 @staticmethod146 def get_key():147 return __file__148 @property149 def lock_duration(self):150 if self._lock_duration is None:151 info = self.get_generator_info()152 self._lock_duration = info.duration if info.duration else \153 expand_to_seconds(self.get_option("target_lock_duration"))154 return self._lock_duration155 def get_available_options(self):156 opts = [157 "api_address",158 "writer_endpoint",159 "task",160 "job_name",161 "job_dsc",162 "notify",163 "ver", "component",164 "operator",165 "jobno_file",166 "ignore_target_lock",167 "target_lock_duration",168 "lock_targets",169 "jobno",170 "upload_token",171 'connection_timeout',172 'network_attempts',173 'api_attempts',174 'maintenance_attempts',175 'network_timeout',176 'api_timeout',177 'maintenance_timeout',178 'strict_lock',179 'send_status_period',180 'log_data_requests',181 'log_monitoring_requests',182 'log_status_requests',183 'log_other_requests',184 'threads_timeout',185 'chunk_size'186 ]187 return opts188 def configure(self):189 self.core.publish(self.SECTION, 'component', self.get_option('component'))190 self.core.publish(self.SECTION, 'task', self.get_option('task'))191 self.core.publish(self.SECTION, 'job_name', self.get_option('job_name'))192 def check_task_is_open(self):193 if self.backend_type != BackendTypes.LUNAPARK:194 return195 TASK_TIP = 'The task should be connected to Lunapark.' \196 'Open startrek task page, click "actions" -> "load testing".'197 logger.debug("Check if task %s is open", self.task)198 try:199 task_data = self.lp_job.get_task_data(self.task)[0]200 try:201 task_status = task_data['status']202 if task_status == 'Open':203 logger.info("Task %s is ok", self.task)204 self.task_name = str(task_data['name'])205 else:206 logger.info("Task %s:" % self.task)207 logger.info(task_data)208 raise RuntimeError("Task is not open")209 except KeyError:210 try:211 error = task_data['error']212 raise RuntimeError(213 "Task %s error: %s\n%s" %214 (self.task, error, TASK_TIP))215 except KeyError:216 raise RuntimeError(217 'Unknown task data format:\n{}'.format(task_data))218 except requests.exceptions.HTTPError as ex:219 logger.error(220 "Failed to check task status for '%s': %s", self.task, ex)221 if ex.response.status_code == 404:222 raise RuntimeError("Task not found: %s\n%s" % (self.task, TASK_TIP))223 elif ex.response.status_code == 500 or ex.response.status_code == 400:224 raise RuntimeError(225 "Unable to check task staus, id: %s, error code: %s" %226 (self.task, ex.response.status_code))227 raise ex228 @staticmethod229 def search_task_from_cwd(cwd):230 issue = re.compile("^([A-Za-z]+-[0-9]+)(-.*)?")231 while cwd:232 logger.debug("Checking if dir is named like JIRA issue: %s", cwd)233 if issue.match(os.path.basename(cwd)):234 res = re.search(issue, os.path.basename(cwd))235 return res.group(1).upper()236 newdir = os.path.abspath(os.path.join(cwd, os.path.pardir))237 if newdir == cwd:238 break239 else:240 cwd = newdir241 raise RuntimeError(242 "task=dir requested, but no JIRA issue name in cwd: %s" %243 os.getcwd())244 def prepare_test(self):245 info = self.get_generator_info()246 port = info.port247 instances = info.instances248 if info.ammo_file is not None:249 if info.ammo_file.startswith("http://") or info.ammo_file.startswith("https://"):250 ammo_path = info.ammo_file251 else:252 ammo_path = os.path.realpath(info.ammo_file)253 else:254 logger.warning('Failed to get info about ammo path')255 ammo_path = 'Undefined'256 loop_count = int(info.loop_count)257 try:258 lp_job = self.lp_job259 self.add_cleanup(self.unlock_targets)260 self.locked_targets = self.check_and_lock_targets(strict=self.get_option('strict_lock'),261 ignore=self.get_option('ignore_target_lock'))262 if lp_job._number:263 self.make_symlink(lp_job._number)264 self.check_task_is_open()265 else:266 self.check_task_is_open()267 lp_job.create()268 self.make_symlink(lp_job.number)269 self.publish('job_no', lp_job.number)270 except (APIClient.JobNotCreated, APIClient.NotAvailable, APIClient.NetworkError) as e:271 logger.error(e)272 logger.error(273 'Failed to connect to Lunapark, disabling DataUploader')274 self.start_test = lambda *a, **kw: None275 self.post_process = lambda *a, **kw: None276 self.on_aggregated_data = lambda *a, **kw: None277 self.monitoring_data = lambda *a, **kw: None278 return279 cmdline = ' '.join(sys.argv)280 lp_job.edit_metainfo(281 instances=instances,282 ammo_path=ammo_path,283 loop_count=loop_count,284 regression_component=self.get_option("component"),285 cmdline=cmdline,286 )287 self.core.job.subscribe_plugin(self)288 try:289 console = self.core.get_plugin_of_type(ConsolePlugin)290 except KeyError as ex:291 logger.debug(ex)292 console = None293 if console:294 console.add_info_widget(JobInfoWidget(self))295 self.set_option('target_host', self.target)296 self.set_option('target_port', port)297 self.set_option('cmdline', cmdline)298 self.set_option('ammo_path', ammo_path)299 self.set_option('loop_count', loop_count)300 self.__save_conf()301 def start_test(self):302 self.add_cleanup(self.join_threads)303 self.status_sender.start()304 self.upload.start()305 self.monitoring.start()306 if self.core.error_log:307 self.events.start()308 self.web_link = urljoin(self.lp_job.api_client.base_url, str(self.lp_job.number))309 logger.info("Web link: %s", self.web_link)310 self.publish("jobno", self.lp_job.number)311 self.publish("web_link", self.web_link)312 jobno_file = self.get_option("jobno_file", '')313 if jobno_file:314 logger.debug("Saving jobno to: %s", jobno_file)315 with open(jobno_file, 'w') as fdes:316 fdes.write(str(self.lp_job.number))317 self.core.add_artifact_file(jobno_file)318 self.__save_conf()319 def is_test_finished(self):320 return self.retcode321 def end_test(self, retcode):322 if retcode != 0:323 self.lp_job.interrupted.set()324 self.__save_conf()325 self.unlock_targets()326 return retcode327 def close_job(self):328 self.lp_job.close(self.retcode)329 def join_threads(self):330 self.lp_job.interrupted.set()331 if self.monitoring.is_alive():332 self.monitoring.join()333 if self.upload.is_alive():334 self.upload.join()335 def stop_events_processing(self):336 self.events_queue.put(None)337 self.events_reader.close()338 self.events_processing.close()339 if self.events_processing.is_alive():340 self.events_processing.join()341 if self.events.is_alive():342 self.lp_job.interrupted.set()343 self.events.join()344 def post_process(self, rc):345 self.retcode = rc346 self.monitoring_queue.put(None)347 self.data_queue.put(None)348 if self.core.error_log:349 self.events_queue.put(None)350 self.events_reader.close()351 self.events_processing.close()352 self.events.join()353 logger.info("Waiting for sender threads to join.")354 if self.monitoring.is_alive():355 self.monitoring.join()356 if self.upload.is_alive():357 self.upload.join()358 self.finished = True359 logger.info(360 "Web link: %s", self.web_link)361 autostop = None362 try:363 autostop = self.core.get_plugin_of_type(AutostopPlugin)364 except KeyError as ex:365 logger.debug(ex)366 if autostop and autostop.cause_criterion:367 self.lp_job.set_imbalance_and_dsc(368 autostop.imbalance_rps, autostop.cause_criterion.explain())369 else:370 logger.debug("No autostop cause detected")371 self.__save_conf()372 return rc373 def on_aggregated_data(self, data, stats):374 """375 @data: aggregated data376 @stats: stats about gun377 """378 if not self.lp_job.interrupted.is_set():379 self.data_queue.put((data, stats))380 def monitoring_data(self, data_list):381 if not self.lp_job.interrupted.is_set():382 if len(data_list) > 0:383 [self.monitoring_queue.put(chunk) for chunk in chop(data_list, self.get_option("chunk_size"))]384 def __send_status(self):385 logger.info('Status sender thread started')386 lp_job = self.lp_job387 while not lp_job.interrupted.is_set():388 try:389 self.lp_job.send_status(self.core.info.get_info_dict())390 time.sleep(self.get_option('send_status_period'))391 except (APIClient.NetworkError, APIClient.NotAvailable) as e:392 logger.warn('Failed to send status')393 logger.debug(e)394 break395 except APIClient.StoppedFromOnline:396 logger.info("Test stopped from Lunapark")397 self.retcode = self.RC_STOP_FROM_WEB398 break399 if self.finished:400 break401 logger.info("Closed Status sender thread")402 def __uploader(self, queue, sender_method, name='Uploader'):403 logger.info('{} thread started'.format(name))404 while not self.lp_job.interrupted.is_set():405 try:406 entry = queue.get(timeout=1)407 if entry is None:408 logger.info("{} queue returned None".format(name))409 break410 sender_method(entry)411 except Empty:412 continue413 except APIClient.StoppedFromOnline:414 logger.warning("Lunapark is rejecting {} data".format(name))415 break416 except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:417 logger.warn('Failed to push {} data'.format(name))418 logger.warn(e)419 self.lp_job.interrupted.set()420 except Exception:421 exc_type, exc_value, exc_traceback = sys.exc_info()422 logger.error("Mysterious exception:\n%s\n%s\n%s", (exc_type, exc_value, exc_traceback))423 break424 # purge queue425 while not queue.empty():426 if queue.get_nowait() is None:427 break428 logger.info("Closing {} thread".format(name))429 def __data_uploader(self):430 self.__uploader(self.data_queue,431 lambda entry: self.lp_job.push_test_data(*entry),432 'Data Uploader')433 def __monitoring_uploader(self):434 self.__uploader(self.monitoring_queue,435 self.lp_job.push_monitoring_data,436 'Monitoring Uploader')437 def __events_uploader(self):438 self.__uploader(self.events_queue,439 self.lp_job.push_events_data,440 'Events Uploader')441 # TODO: why we do it here? should be in core442 def __save_conf(self):443 for requisites, content in self.core.artifacts_to_send:444 self.lp_job.send_config(requisites, content)445 def parse_lock_targets(self):446 # prepare target lock list447 locks_list_cfg = self.get_option('lock_targets', 'auto')448 def no_target():449 logging.warn("Target lock set to 'auto', but no target info available")450 return {}451 locks_set = {self.target} or no_target() if locks_list_cfg == 'auto' else set(locks_list_cfg)452 targets_to_lock = [host for host in locks_set if host]453 return targets_to_lock454 def lock_targets(self, targets_to_lock, ignore, strict):455 locked_targets = [target for target in targets_to_lock456 if self.lp_job.lock_target(target, self.lock_duration, ignore, strict)]457 return locked_targets458 def unlock_targets(self):459 logger.info("Unlocking targets: %s", self.locked_targets)460 for target in self.locked_targets:461 logger.info(target)462 self.lp_job.api_client.unlock_target(target)463 def check_and_lock_targets(self, strict, ignore):464 targets_list = self.parse_lock_targets()465 logger.info('Locking targets: %s', targets_list)466 locked_targets = self.lock_targets(targets_list, ignore=ignore, strict=strict)467 logger.info('Locked targets: %s', locked_targets)468 return locked_targets469 def make_symlink(self, name):470 PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, 'lunapark')471 if not os.path.exists(PLUGIN_DIR):472 os.makedirs(PLUGIN_DIR)473 try:474 os.symlink(475 os.path.relpath(476 self.core.artifacts_dir,477 PLUGIN_DIR),478 os.path.join(479 PLUGIN_DIR,480 str(name)))481 # this exception catch for filesystems w/o symlinks482 except OSError:483 logger.warning('Unable to create symlink for artifact: %s', name)484 def _get_user_agent(self):485 plugin_agent = 'Uploader/{}'.format(self.VERSION)486 return ' '.join((plugin_agent,487 self.core.get_user_agent()))488 def __get_operator(self):489 try:490 return self.get_option(491 'operator') or pwd.getpwuid(492 os.geteuid())[0]493 except: # noqa: E722494 logger.error(495 "Couldn't get username from the OS. Please, set the 'meta.operator' option explicitly in your config "496 "file.")497 raise498 def __get_api_client(self):499 logging.info('Using {} backend'.format(self.backend_type))500 self._api_token = None501 if self.backend_type == BackendTypes.LUNAPARK:502 client = APIClient503 elif self.backend_type == BackendTypes.OVERLOAD:504 client = OverloadClient505 self._api_token = self.read_token(self.get_option("token_file"))506 elif self.backend_type == BackendTypes.CLOUD:507 return CloudGRPCClient(core_interrupted=self.interrupted,508 base_url=self.get_option('api_address'),509 api_attempts=self.get_option('api_attempts'),510 connection_timeout=self.get_option('connection_timeout'))511 else:512 raise RuntimeError("Backend type doesn't match any of the expected")513 return client(base_url=self.get_option('api_address'),514 writer_url=self.get_option('writer_endpoint'),515 network_attempts=self.get_option('network_attempts'),516 api_attempts=self.get_option('api_attempts'),517 maintenance_attempts=self.get_option('maintenance_attempts'),518 network_timeout=self.get_option('network_timeout'),519 api_timeout=self.get_option('api_timeout'),520 maintenance_timeout=self.get_option('maintenance_timeout'),521 connection_timeout=self.get_option('connection_timeout'),522 user_agent=self._get_user_agent(),523 api_token=self.api_token,524 core_interrupted=self.interrupted)525 @property526 def lp_job(self):527 """528 :rtype: LPJob529 """530 if self._lp_job is None:531 self._lp_job = self.__get_lp_job()532 self.core.publish(self.SECTION, 'job_no', self._lp_job.number)533 self.core.publish(self.SECTION, 'web_link', self._lp_job.web_link)534 self.core.publish(self.SECTION, 'job_name', self._lp_job.name)535 self.core.publish(self.SECTION, 'job_dsc', self._lp_job.description)536 self.core.publish(self.SECTION, 'person', self._lp_job.person)537 self.core.publish(self.SECTION, 'task', self._lp_job.task)538 self.core.publish(self.SECTION, 'version', self._lp_job.version)539 self.core.publish(self.SECTION, 'component', self.get_option('component'))540 self.core.publish(self.SECTION, 'meta', self.cfg.get('meta', {}))541 return self._lp_job542 def __get_lp_job(self):543 """544 :rtype: LPJob545 """546 api_client = self.__get_api_client()547 info = self.get_generator_info()548 port = info.port549 loadscheme = [] if isinstance(info.rps_schedule, (str, dict)) else info.rps_schedule550 if self.backend_type == BackendTypes.CLOUD:551 lp_job = CloudLoadTestingJob(client=api_client,552 target_host=self.target,553 target_port=port,554 tank_job_id=self.core.test_id,555 storage=self.core.storage,556 name=self.get_option('job_name', 'untitled'),557 description=self.get_option('job_dsc'),558 load_scheme=loadscheme)559 else:560 lp_job = LPJob(client=api_client,561 target_host=self.target,562 target_port=port,563 number=self.cfg.get('jobno'),564 token=self.get_option('upload_token'),565 person=self.__get_operator(),566 task=self.task,567 name=self.get_option('job_name', 'untitled'),568 description=self.get_option('job_dsc'),569 tank=self.core.job.tank,570 notify_list=self.get_option("notify"),571 load_scheme=loadscheme,572 version=self.get_option('ver'),573 log_data_requests=self.get_option('log_data_requests'),574 log_monitoring_requests=self.get_option('log_monitoring_requests'),575 log_status_requests=self.get_option('log_status_requests'),576 log_other_requests=self.get_option('log_other_requests'),577 add_cleanup=lambda: self.add_cleanup(self.close_job))578 lp_job.send_config(LPRequisites.CONFIGINITIAL, yaml.dump(self.core.configinitial))579 return lp_job580 @property581 def task(self):582 if self._task is None:583 task = self.get_option('task')584 if task == 'dir':585 task = self.search_task_from_cwd(os.getcwd())586 self._task = task587 return self._task588 @property589 def api_token(self):590 if self._api_token == '':591 if self.backend_type in [BackendTypes.LUNAPARK, BackendTypes.CLOUD]:592 self._api_token = None593 elif self.backend_type == BackendTypes.OVERLOAD:594 self._api_token = self.read_token(self.get_option("token_file", ""))595 else:596 raise RuntimeError("Backend type doesn't match any of the expected")597 return self._api_token598 @staticmethod599 def read_token(filename):600 if filename:601 logger.debug("Trying to read token from %s", filename)602 try:603 with open(filename, 'r') as handle:604 data = handle.read().strip()605 logger.info(606 "Read authentication token from %s, "607 "token length is %d bytes", filename, len(str(data)))608 except IOError:609 logger.error(610 "Failed to read Overload API token from %s", filename)611 logger.info(612 "Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"613 )614 raise RuntimeError("API token error")615 return data616 else:617 logger.error("Overload API token filename is not defined")618 logger.info(619 "Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"620 )621 raise RuntimeError("API token error")622 def get_generator_info(self):623 return self.core.job.generator_plugin.get_info()624 @property625 def target(self):626 if self._target is None:627 self._target = self.get_generator_info().address628 logger.info("Detected target: %s", self.target)629 return self._target630class JobInfoWidget(AbstractInfoWidget):631 def __init__(self, sender):632 # type: (Plugin) -> object633 AbstractInfoWidget.__init__(self)634 self.owner = sender635 def get_index(self):636 return 1637 def render(self, screen):638 template = "Author: " + screen.markup.RED + "%s" + \639 screen.markup.RESET + \640 "%s\n Job: %s %s\n Task: %s %s\n Web: %s%s"641 data = (self.owner.lp_job.person[:1], self.owner.lp_job.person[1:],642 self.owner.lp_job.number, self.owner.lp_job.name, self.owner.lp_job.task,643 # todo: task_name from api_client.get_task_data()644 self.owner.lp_job.task, self.owner.lp_job.api_client.base_url,645 self.owner.lp_job.number)646 return template % data647class LPJob(Job):648 def __init__(649 self,650 client,651 target_host,652 target_port,653 person,654 task,655 name,656 description,657 tank,658 log_data_requests=False,659 log_other_requests=False,660 log_status_requests=False,661 log_monitoring_requests=False,662 number=None,663 token=None,664 notify_list=None,665 version=None,666 detailed_time=None,667 load_scheme=None,668 add_cleanup=lambda: None669 ):670 """671 :param client: APIClient672 :param log_data_requests: bool673 :param log_other_request: bool674 :param log_status_requests: bool675 :param log_monitoring_requests: bool676 """677 assert bool(number) == bool(678 token), 'Job number and upload token should come together'679 self.log_other_requests = log_other_requests680 self.log_data_requests = log_data_requests681 self.log_status_requests = log_status_requests682 self.log_monitoring_requests = log_monitoring_requests683 self.name = name684 self.tank = tank685 self.target_host = target_host686 self.target_port = target_port687 self.person = person688 self.task = task689 self.interrupted = threading.Event()690 self._number = number691 self._token = token692 self.api_client = client693 self.notify_list = notify_list694 self.description = description695 self.version = version696 self.detailed_time = detailed_time697 self.load_scheme = load_scheme698 self.is_finished = False699 self.web_link = ''700 self.add_cleanup = add_cleanup701 if self._number:702 self.add_cleanup()703 def push_test_data(self, data, stats):704 if not self.interrupted.is_set():705 try:706 self.api_client.push_test_data(707 self.number, self.token, data, stats, self.interrupted, trace=self.log_data_requests)708 except (APIClient.NotAvailable, APIClient.NetworkError, APIClient.UnderMaintenance):709 logger.warn('Failed to push test data')710 self.interrupted.set()711 def edit_metainfo(712 self,713 instances=0,714 ammo_path=None,715 loop_count=None,716 regression_component=None,717 cmdline=None,718 is_starred=False,719 tank_type=1720 ):721 try:722 self.api_client.edit_job_metainfo(jobno=self.number,723 job_name=self.name,724 job_dsc=self.description,725 instances=instances,726 ammo_path=ammo_path,727 loop_count=loop_count,728 version_tested=self.version,729 component=regression_component,730 cmdline=cmdline,731 is_starred=is_starred,732 tank_type=tank_type,733 trace=self.log_other_requests)734 except (APIClient.NotAvailable, APIClient.StoppedFromOnline, APIClient.NetworkError,735 APIClient.UnderMaintenance) as e:736 logger.warn('Failed to edit job metainfo on Lunapark')737 logger.warn(e)738 @property739 def number(self):740 if not self._number:741 self.create()742 return self._number743 @property744 def token(self):745 if not self._token:746 self.create()747 return self._token748 def close(self, rc):749 if self._number:750 return self.api_client.close_job(self.number, rc, trace=self.log_other_requests)751 else:752 return True753 def create(self):754 self._number, self._token = self.api_client.new_job(task=self.task,755 person=self.person,756 tank=self.tank,757 loadscheme=self.load_scheme,758 target_host=self.target_host,759 target_port=self.target_port,760 detailed_time=self.detailed_time,761 notify_list=self.notify_list,762 trace=self.log_other_requests)763 self.add_cleanup()764 logger.info('Job created: {}'.format(self._number))765 self.web_link = urljoin(self.api_client.base_url, str(self._number))766 def send_status(self, status):767 if self._number and not self.interrupted.is_set():768 self.api_client.send_status(769 self.number,770 self.token,771 status,772 trace=self.log_status_requests)773 def get_task_data(self, task):774 return self.api_client.get_task_data(775 task, trace=self.log_other_requests)776 def send_config(self, lp_requisites, content):777 self.api_client.send_config(self.number, lp_requisites, content, trace=self.log_other_requests)778 def push_monitoring_data(self, data):779 if not self.interrupted.is_set():780 self.api_client.push_monitoring_data(781 self.number, self.token, data, self.interrupted, trace=self.log_monitoring_requests)782 def push_events_data(self, data):783 if not self.interrupted.is_set():784 self.api_client.push_events_data(self.number, self.person, data)785 def lock_target(self, lock_target, lock_target_duration, ignore, strict):786 lock_wait_timeout = 10787 maintenance_timeouts = iter([0]) if ignore else iter(lambda: lock_wait_timeout, 0)788 while True:789 try:790 self.api_client.lock_target(lock_target,791 lock_target_duration,792 trace=self.log_other_requests,793 maintenance_timeouts=maintenance_timeouts,794 maintenance_msg="Target is locked.\nManual unlock link: %s/%s" % (795 self.api_client.base_url,796 self.api_client.get_manual_unlock_link(lock_target)797 ))798 return True799 except (APIClient.NotAvailable, APIClient.StoppedFromOnline) as e:800 logger.info('Target is not locked due to %s', e)801 if ignore:802 logger.info('ignore_target_locks = 1')803 return False804 elif strict:805 raise e806 else:807 logger.info('strict_lock = 0')808 return False809 except APIClient.UnderMaintenance:810 logger.info('Target is locked')811 if ignore:812 logger.info('ignore_target_locks = 1')813 return False814 logger.info("Manual unlock link: %s/%s",815 self.api_client.base_url,816 self.api_client.get_manual_unlock_link(lock_target))817 continue818 def set_imbalance_and_dsc(self, rps, comment):819 return self.api_client.set_imbalance_and_dsc(self.number, rps, comment)820 def is_target_locked(self, host, strict):821 while True:822 try:823 return self.api_client.is_target_locked(824 host, trace=self.log_other_requests)825 except APIClient.UnderMaintenance:826 logger.info('Target is locked, retrying...')827 continue828 except (APIClient.StoppedFromOnline, APIClient.NotAvailable, APIClient.NetworkError):829 logger.info('Can\'t check whether target is locked\n')830 if strict:831 logger.warn('Stopping test due to strict_lock')832 raise833 else:834 logger.warn('strict_lock is False, proceeding')835 return {'status': 'ok'}836class CloudLoadTestingJob(Job):837 def __init__(838 self,839 client,840 target_host,841 target_port,842 name,843 description,844 tank_job_id,845 storage,846 load_scheme=None,847 ):848 self.target_host = target_host849 self.target_port = target_port850 self.tank_job_id = tank_job_id851 self.name = name852 self.description = description853 self._number = None # cloud job id854 self.api_client = client855 self.load_scheme = load_scheme856 self.interrupted = threading.Event()857 self.storage = storage858 # self.create() # FIXME check it out, maybe it is useless859 def push_test_data(self, data, stats):860 if not self.interrupted.is_set():861 try:862 self.api_client.push_test_data(863 self.number, data, stats, self.interrupted)864 except (CloudGRPCClient.NotAvailable, CloudGRPCClient.AgentIdNotFound, RuntimeError):865 logger.warn('Failed to push test data')866 self.interrupted.set()867 def edit_metainfo(self, *args, **kwargs):868 logger.info('Cloud service has already set metainfo')869 # cloud job id870 @property871 def number(self):872 if not self._number:873 raise self.UnknownJobNumber('Job number is unknown')874 return self._number875 def close(self, *args, **kwargs):876 logger.debug('Cannot close job in the cloud mode')877 def create(self):878 cloud_job_id = self.storage.get_cloud_job_id(self.tank_job_id)879 if cloud_job_id is None:880 response = self.api_client.create_test(self.target_host, self.target_port, self.name, self.description, self.load_scheme)881 self.storage.push_job(cloud_job_id, self.core.test_id)882 metadata = test_service_pb2.CreateTestMetadata()883 response.metadata.Unpack(metadata)884 self._number = metadata.id885 logger.info('Job was created: {}'.format(self._number))886 else:887 self._number = cloud_job_id888 def send_status(self, *args, **kwargs):889 logger.debug('Tank client is sending the status')890 def send_config(self, *args, **kwargs):891 logger.debug('Do not send config to the cloud service')892 def push_monitoring_data(self, *args, **kwargs):893 logger.debug('Do not push monitoring data for cloud service')894 def push_events_data(self, *args, **kwargs):895 logger.debug('Do not push event data for cloud service')896 def lock_target(self, *args, **kwargs):897 logger.debug('Target locking is not implemented for cloud')898 def set_imbalance_and_dsc(self, *args, **kwargs):899 logger.debug('Imbalance detection is not implemented for cloud')900 def is_target_locked(self, *args, **kwargs):901 logger.debug('Target locking is not implemented for cloud')902class EventsReader(FileScanner):903 """904 Parse lines and return stats905 """906 def __init__(self, *args, **kwargs):907 super(EventsReader, self).__init__(*args, **kwargs)908 def _read_data(self, lines):...

Full Screen

Full Screen

client.py

Source:client.py Github

copy

Full Screen

...503 time.sleep(timeout)504 continue505 except StopIteration:506 raise e507 def push_events_data(self, jobno, operator, send_data):508 if send_data:509 # logger.info('send data: %s', send_data)510 for key in send_data:511 addr = "/api/job/{jobno}/event.json".format(512 jobno=jobno,513 )514 body = dict(515 operator=operator,516 text=key[1],517 timestamp=key[0]518 )519 api_timeouts = self.api_timeouts()520 while True:521 try:522 # logger.debug('Sending event: %s', body)523 res = self.__post_raw(addr, body)524 logger.debug("API response for events push: %s", res)525 success = res == 'ok'526 return success527 except self.NotAvailable as e:528 try:529 timeout = next(api_timeouts)530 logger.warn("API error, will retry in %ss...", timeout)531 time.sleep(timeout)532 continue533 except StopIteration:534 raise e535 def send_status(self, jobno, upload_token, status, trace=False):536 addr = "api/v2/jobs/%s/?upload_token=%s" % (jobno, upload_token)537 status_line = status.get("core", {}).get("stage", "unknown")538 if "stepper" in status:539 status_line += " %s" % status["stepper"].get("progress")540 api_timeouts = self.api_timeouts()541 while True:542 try:543 self.__patch(addr, {"status": status_line}, trace=trace)544 return545 except self.NotAvailable as e:546 try:547 timeout = next(api_timeouts)548 logger.warn("API error, will retry in %ss...", timeout)549 time.sleep(timeout)550 continue551 except StopIteration:552 raise e553 def is_target_locked(self, target, trace=False):554 addr = "api/server/lock.json?action=check&address=%s" % target555 res = self.__get(addr, trace=trace)556 return res[0]557 def lock_target(self, target, duration, trace=False, maintenance_timeouts=None, maintenance_msg=None):558 addr = "api/server/lock.json?action=lock&" + \559 "address=%s&duration=%s&jobno=None" % \560 (target, int(duration))561 res = self.__get(addr, trace=trace, maintenance_timeouts=maintenance_timeouts, maintenance_msg=maintenance_msg)562 return res[0]563 def unlock_target(self, target):564 addr = self.get_manual_unlock_link(target)565 res = self.__get(addr)566 return res[0]567 def get_virtual_host_info(self, hostname):568 addr = "api/server/virtual_host.json?hostname=%s" % hostname569 res = self.__get(addr)570 try:571 return res[0]572 except KeyError:573 raise Exception(res['error'])574 @staticmethod575 def get_manual_unlock_link(target):576 return "api/server/lock.json?action=unlock&address=%s" % target577 def send_config(self, jobno, lp_requisites, config_content, trace=False):578 endpoint, field_name = lp_requisites579 logger.debug("Sending {} config".format(field_name))580 addr = "/api/job/%s/%s" % (jobno, endpoint)581 self.__post_raw(addr, {field_name: config_content}, trace=trace)582 def link_mobile_job(self, lp_key, mobile_key):583 addr = "/api/job/{jobno}/edit.json".format(jobno=lp_key)584 data = {585 'mobile_key': mobile_key586 }587 response = self.__post(addr, data)588 return response589class LPRequisites():590 CONFIGINFO = ('configinfo.txt', 'configinfo')591 MONITORING = ('jobmonitoringconfig.txt', 'monitoringconfig')592 CONFIGINITIAL = ('configinitial.txt', 'configinitial')593class OverloadClient(APIClient):594 """ mocks below for nonexistent backend methods """595 def send_status(self, jobno, upload_token, status, trace=False):596 return597 def lock_target(self, target, duration, trace=False, **kwargs):598 return599 def unlock_target(self, *args, **kwargs):600 return601 def link_mobile_job(self, lp_key, mobile_key):602 return603 def push_events_data(self, number, token, data):604 return605 def set_imbalance_and_dsc(self, **kwargs):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful