Best Python code snippet using yandex-tank
plugin.py
Source:plugin.py
...176 if duration:177 self.lock_target_duration = duration178 loop_count = info.loop_count179 lp_job = self.lp_job180 self.locked_targets = self.check_and_lock_targets(strict=bool(181 int(self.get_option('strict_lock', '0'))), ignore=self.ignore_target_lock)182 try:183 if lp_job._number:184 self.make_symlink(lp_job._number)185 self.check_task_is_open()186 else:187 self.check_task_is_open()188 lp_job.create()189 self.make_symlink(lp_job.number)190 self.core.publish(self.SECTION, 'jobno', lp_job.number)191 except (APIClient.JobNotCreated, APIClient.NotAvailable, APIClient.NetworkError) as e:192 logger.error(e.message)193 logger.error(194 'Failed to connect to Lunapark, disabling DataUploader')195 self.start_test = lambda *a, **kw: None196 self.post_process = lambda *a, **kw: None197 self.on_aggregated_data = lambda *a, **kw: None198 self.monitoring_data = lambda *a, **kw: None199 return200 cmdline = ' '.join(sys.argv)201 lp_job.edit_metainfo(202 instances=instances,203 ammo_path=ammo_path,204 loop_count=loop_count,205 is_regression=self.get_option(206 'regress',207 '0'),208 regression_component=self.regression_component,209 cmdline=cmdline,210 ) # todo: tanktype?211 self.core.job.subscribe_plugin(self)212 try:213 console = self.core.get_plugin_of_type(ConsolePlugin)214 except KeyError:215 logger.debug("Console plugin not found", exc_info=True)216 console = None217 if console:218 console.add_info_widget(JobInfoWidget(self))219 self.set_option('target_host', self.target)220 self.set_option('target_port', port)221 self.set_option('cmdline', cmdline)222 self.set_option('ammo_path', ammo_path)223 self.set_option('loop_count', loop_count)224 self.__save_conf()225 def start_test(self):226 self.on_air = True227 status_sender = threading.Thread(target=self.__send_status)228 status_sender.daemon = True229 status_sender.start()230 self.status_sender = status_sender231 upload = threading.Thread(target=self.__data_uploader)232 upload.daemon = True233 upload.start()234 self.upload = upload235 monitoring = threading.Thread(target=self.__monitoring_uploader)236 monitoring.daemon = True237 monitoring.start()238 self.monitoring = monitoring239 web_link = urljoin(self.lp_job.api_client.base_url, str(self.lp_job.number))240 logger.info("Web link: %s", web_link)241 self.publish("jobno", self.lp_job.number)242 self.publish("web_link", web_link)243 self.set_option("jobno", str(self.lp_job.number))244 if self.jobno_file:245 logger.debug("Saving jobno to: %s", self.jobno_file)246 fdes = open(self.jobno_file, 'w')247 fdes.write(str(self.lp_job.number))248 fdes.close()249 self.__save_conf()250 def is_test_finished(self):251 return self.retcode252 def end_test(self, retcode):253 self.on_air = False254 self.monitoring_queue.put(None)255 self.data_queue.put(None)256 self.__save_conf()257 timeout = int(self.get_option('threads_timeout', '60'))258 logger.info(259 'Waiting for sender threads to join for {} seconds ("meta.threads_timeout" config option)'.format(timeout))260 self.monitoring.join(timeout=timeout)261 if self.monitoring.isAlive():262 logger.error('Monitoring thread joining timed out. Terminating.')263 self.upload.join(timeout=timeout)264 if self.upload.isAlive():265 logger.error('Upload thread joining timed out. Terminating.')266 self.unlock_targets(self.locked_targets)267 return retcode268 def post_process(self, rc):269 try:270 self.lp_job.close(rc)271 except Exception: # pylint: disable=W0703272 logger.warning("Failed to close job", exc_info=True)273 logger.info(274 "Web link: %s%s",275 self.lp_job.api_client.base_url,276 self.lp_job.number)277 autostop = None278 try:279 autostop = self.core.get_plugin_of_type(AutostopPlugin)280 except KeyError:281 logger.debug("No autostop plugin loaded", exc_info=True)282 if autostop and autostop.cause_criterion:283 rps = 0284 if autostop.cause_criterion.cause_second:285 rps = autostop.cause_criterion.cause_second[286 1]["metrics"]["reqps"]287 if not rps:288 rps = autostop.cause_criterion.cause_second[0][289 "overall"]["interval_real"]["len"]290 self.lp_job.set_imbalance_and_dsc(291 int(rps), autostop.cause_criterion.explain())292 else:293 logger.debug("No autostop cause detected")294 self.__save_conf()295 return rc296 def on_aggregated_data(self, data, stats):297 """298 @data: aggregated data299 @stats: stats about gun300 """301 if self.lp_job.is_alive:302 self.data_queue.put((data, stats))303 def monitoring_data(self, data_list):304 if self.lp_job.is_alive:305 if len(data_list) > 0:306 if self.is_telegraf:307 # telegraf308 self.monitoring_queue.put(data_list)309 else:310 # monitoring311 [self.monitoring_queue.put(data) for data in data_list]312 @property313 def is_telegraf(self):314 if self._is_telegraf is None:315 self._is_telegraf = 'Telegraf' in self.core.job.monitoring_plugin.__module__316 return self._is_telegraf317 def _core_with_tank_api(self):318 """319 Return True if we are running under Tank API320 """321 api_found = False322 try:323 import yandex_tank_api.worker # pylint: disable=F0401324 except ImportError:325 logger.debug("Attempt to import yandex_tank_api.worker failed")326 else:327 api_found = isinstance(self.core, yandex_tank_api.worker.TankCore)328 logger.debug("We are%s running under API server", '' if api_found else ' likely not')329 return api_found330 def __send_status(self):331 logger.info('Status sender thread started')332 lp_job = self.lp_job333 while lp_job.is_alive and self.on_air:334 try:335 self.lp_job.send_status(self.core.status)336 time.sleep(self.send_status_period)337 except (APIClient.NetworkError, APIClient.NotAvailable) as e:338 logger.warn('Failed to send status')339 logger.debug(e.message)340 break341 except APIClient.StoppedFromOnline:342 logger.info("Test stopped from Lunapark")343 lp_job.is_alive = False344 self.retcode = 8345 break346 logger.info("Closing Status sender thread")347 def __data_uploader(self):348 logger.info('Data uploader thread started')349 lp_job = self.lp_job350 queue = self.data_queue351 while lp_job.is_alive:352 try:353 entry = queue.get(timeout=1)354 if entry is not None:355 data, stats = entry356 else:357 logger.info("Data uploader queue returned None")358 break359 lp_job.push_test_data(data, stats)360 except Empty:361 continue362 except APIClient.StoppedFromOnline:363 logger.info("Test stopped from Lunapark")364 lp_job.is_alive = False365 self.retcode = 8366 break367 except Exception as e:368 logger.info("Mysterious exception: %s", e)369 self.retcode = 8370 break371 logger.info("Closing Data uploader thread")372 def __monitoring_uploader(self):373 logger.info('Monitoring uploader thread started')374 lp_job = self.lp_job375 queue = self.monitoring_queue376 while lp_job.is_alive:377 try:378 data = queue.get(timeout=1)379 if data is not None:380 lp_job.push_monitoring_data(data)381 else:382 logger.info('Monitoring queue returned None')383 break384 except Empty:385 continue386 except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:387 logger.warn('Failed to push monitoring data')388 logger.warn(e.message)389 break390 except APIClient.StoppedFromOnline:391 logger.info("Test stopped from Lunapark")392 lp_job.is_alive = False393 self.retcode = 8394 break395 logger.info('Closing Monitoring uploader thread')396 def __save_conf(self):397 config_copy = self.get_option('copy_config_to', '')398 if config_copy:399 self.core.config.flush(config_copy)400 config = copy.copy(self.core.config.config)401 try:402 config_filename = self.core.job.monitoring_plugin.config403 if config_filename and config_filename not in ['none', 'auto']:404 with open(config_filename) as config_file:405 config.set(406 self.core.job.monitoring_plugin.SECTION,407 "config_contents",408 config_file.read())409 except Exception: # pylint: disable=W0703410 logger.warning("Can't get monitoring config", exc_info=True)411 output = StringIO()412 config.write(output)413 self.lp_job.send_config_snapshot(output.getvalue())414 with open(os.path.join(self.core.artifacts_dir, 'saved_conf.ini'), 'w') as f:415 config.write(f)416 def parse_lock_targets(self):417 # prepare target lock list418 locks_list_cfg = self.get_option('lock_targets', 'auto').strip()419 def no_target():420 logging.warn("Target lock set to 'auto', but no target info available")421 return ''422 locks_list = (self.target or no_target() if locks_list_cfg.lower() == 'auto' else locks_list_cfg).split('\n')423 targets_to_lock = [host for host in locks_list if host]424 return targets_to_lock425 def lock_targets(self, targets_to_lock, ignore, strict):426 locked_targets = [target for target in targets_to_lock427 if self.lp_job.lock_target(target, self.lock_target_duration, ignore, strict)]428 return locked_targets429 def unlock_targets(self, locked_targets):430 logger.info("Unlocking targets: %s", locked_targets)431 for target in locked_targets:432 logger.info(target)433 self.lp_job.api_client.unlock_target(target)434 def check_and_lock_targets(self, strict, ignore):435 targets_list = self.parse_lock_targets()436 logger.info('Locking targets: %s', targets_list)437 locked_targets = self.lock_targets(targets_list, ignore=ignore, strict=strict)438 logger.info('Locked targets: %s', locked_targets)439 return locked_targets440 def make_symlink(self, name):441 PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, 'lunapark')442 if not os.path.exists(PLUGIN_DIR):443 os.makedirs(PLUGIN_DIR)444 os.symlink(445 os.path.relpath(446 self.core.artifacts_dir,447 PLUGIN_DIR),448 os.path.join(...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!