Best Python code snippet using yandex-tank
plugin.py
Source:plugin.py
...100 return __file__101 @property102 def lock_duration(self):103 if self._lock_duration is None:104 info = self.get_generator_info()105 self._lock_duration = info.duration if info.duration else \106 expand_to_seconds(self.get_option("target_lock_duration"))107 return self._lock_duration108 def get_available_options(self):109 opts = [110 "api_address",111 "writer_endpoint",112 "task",113 "job_name",114 "job_dsc",115 "notify",116 "ver", "component",117 "operator",118 "jobno_file",119 "ignore_target_lock",120 "target_lock_duration",121 "lock_targets",122 "jobno",123 "upload_token",124 'connection_timeout',125 'network_attempts',126 'api_attempts',127 'maintenance_attempts',128 'network_timeout',129 'api_timeout',130 'maintenance_timeout',131 'strict_lock',132 'send_status_period',133 'log_data_requests',134 'log_monitoring_requests',135 'log_status_requests',136 'log_other_requests',137 'threads_timeout',138 'chunk_size'139 ]140 return opts141 def configure(self):142 self.core.publish(self.SECTION, 'component', self.get_option('component'))143 self.core.publish(self.SECTION, 'task', self.get_option('task'))144 self.core.publish(self.SECTION, 'job_name', self.get_option('job_name'))145 def check_task_is_open(self):146 if self.backend_type == BackendTypes.OVERLOAD:147 return148 TASK_TIP = 'The task should be connected to Lunapark.' \149 'Open startrek task page, click "actions" -> "load testing".'150 logger.debug("Check if task %s is open", self.task)151 try:152 task_data = self.lp_job.get_task_data(self.task)[0]153 try:154 task_status = task_data['status']155 if task_status == 'Open':156 logger.info("Task %s is ok", self.task)157 self.task_name = str(task_data['name'])158 else:159 logger.info("Task %s:" % self.task)160 logger.info(task_data)161 raise RuntimeError("Task is not open")162 except KeyError:163 try:164 error = task_data['error']165 raise RuntimeError(166 "Task %s error: %s\n%s" %167 (self.task, error, TASK_TIP))168 except KeyError:169 raise RuntimeError(170 'Unknown task data format:\n{}'.format(task_data))171 except requests.exceptions.HTTPError as ex:172 logger.error(173 "Failed to check task status for '%s': %s", self.task, ex)174 if ex.response.status_code == 404:175 raise RuntimeError("Task not found: %s\n%s" % (self.task, TASK_TIP))176 elif ex.response.status_code == 500 or ex.response.status_code == 400:177 raise RuntimeError(178 "Unable to check task staus, id: %s, error code: %s" %179 (self.task, ex.response.status_code))180 raise ex181 @staticmethod182 def search_task_from_cwd(cwd):183 issue = re.compile("^([A-Za-z]+-[0-9]+)(-.*)?")184 while cwd:185 logger.debug("Checking if dir is named like JIRA issue: %s", cwd)186 if issue.match(os.path.basename(cwd)):187 res = re.search(issue, os.path.basename(cwd))188 return res.group(1).upper()189 newdir = os.path.abspath(os.path.join(cwd, os.path.pardir))190 if newdir == cwd:191 break192 else:193 cwd = newdir194 raise RuntimeError(195 "task=dir requested, but no JIRA issue name in cwd: %s" %196 os.getcwd())197 def prepare_test(self):198 info = self.get_generator_info()199 port = info.port200 instances = info.instances201 if info.ammo_file is not None:202 if info.ammo_file.startswith("http://") or info.ammo_file.startswith("https://"):203 ammo_path = info.ammo_file204 else:205 ammo_path = os.path.realpath(info.ammo_file)206 else:207 logger.warning('Failed to get info about ammo path')208 ammo_path = 'Undefined'209 loop_count = int(info.loop_count)210 try:211 lp_job = self.lp_job212 self.add_cleanup(self.unlock_targets)213 self.locked_targets = self.check_and_lock_targets(strict=self.get_option('strict_lock'),214 ignore=self.get_option('ignore_target_lock'))215 if lp_job._number:216 self.make_symlink(lp_job._number)217 self.check_task_is_open()218 else:219 self.check_task_is_open()220 lp_job.create()221 self.make_symlink(lp_job.number)222 self.publish('job_no', lp_job.number)223 except (APIClient.JobNotCreated, APIClient.NotAvailable, APIClient.NetworkError) as e:224 logger.error(e)225 logger.error(226 'Failed to connect to Lunapark, disabling DataUploader')227 self.start_test = lambda *a, **kw: None228 self.post_process = lambda *a, **kw: None229 self.on_aggregated_data = lambda *a, **kw: None230 self.monitoring_data = lambda *a, **kw: None231 return232 cmdline = ' '.join(sys.argv)233 lp_job.edit_metainfo(234 instances=instances,235 ammo_path=ammo_path,236 loop_count=loop_count,237 regression_component=self.get_option("component"),238 cmdline=cmdline,239 )240 self.core.job.subscribe_plugin(self)241 try:242 console = self.core.get_plugin_of_type(ConsolePlugin)243 except KeyError:244 logger.debug("Console plugin not found", exc_info=True)245 console = None246 if console:247 console.add_info_widget(JobInfoWidget(self))248 self.set_option('target_host', self.target)249 self.set_option('target_port', port)250 self.set_option('cmdline', cmdline)251 self.set_option('ammo_path', ammo_path)252 self.set_option('loop_count', loop_count)253 self.__save_conf()254 def start_test(self):255 self.add_cleanup(self.join_threads)256 self.status_sender.start()257 self.upload.start()258 self.monitoring.start()259 if self.core.error_log:260 self.events.start()261 self.web_link = urljoin(self.lp_job.api_client.base_url, str(self.lp_job.number))262 logger.info("Web link: %s", self.web_link)263 self.publish("jobno", self.lp_job.number)264 self.publish("web_link", self.web_link)265 jobno_file = self.get_option("jobno_file", '')266 if jobno_file:267 logger.debug("Saving jobno to: %s", jobno_file)268 with open(jobno_file, 'w') as fdes:269 fdes.write(str(self.lp_job.number))270 self.core.add_artifact_file(jobno_file)271 self.__save_conf()272 def is_test_finished(self):273 return self.retcode274 def end_test(self, retcode):275 if retcode != 0:276 self.lp_job.interrupted.set()277 self.__save_conf()278 self.unlock_targets()279 return retcode280 def close_job(self):281 self.lp_job.close(self.retcode)282 def join_threads(self):283 self.lp_job.interrupted.set()284 if self.monitoring.is_alive():285 self.monitoring.join()286 if self.upload.is_alive():287 self.upload.join()288 def stop_events_processing(self):289 self.events_queue.put(None)290 self.events_reader.close()291 self.events_processing.close()292 if self.events_processing.is_alive():293 self.events_processing.join()294 if self.events.is_alive():295 self.lp_job.interrupted.set()296 self.events.join()297 def post_process(self, rc):298 self.retcode = rc299 self.monitoring_queue.put(None)300 self.data_queue.put(None)301 if self.core.error_log:302 self.events_queue.put(None)303 self.events_reader.close()304 self.events_processing.close()305 self.events.join()306 logger.info("Waiting for sender threads to join.")307 if self.monitoring.is_alive():308 self.monitoring.join()309 if self.upload.is_alive():310 self.upload.join()311 self.finished = True312 logger.info(313 "Web link: %s", self.web_link)314 autostop = None315 try:316 autostop = self.core.get_plugin_of_type(AutostopPlugin)317 except KeyError:318 logger.debug("No autostop plugin loaded", exc_info=True)319 if autostop and autostop.cause_criterion:320 self.lp_job.set_imbalance_and_dsc(321 autostop.imbalance_rps, autostop.cause_criterion.explain())322 else:323 logger.debug("No autostop cause detected")324 self.__save_conf()325 return rc326 def on_aggregated_data(self, data, stats):327 """328 @data: aggregated data329 @stats: stats about gun330 """331 if not self.lp_job.interrupted.is_set():332 self.data_queue.put((data, stats))333 def monitoring_data(self, data_list):334 if not self.lp_job.interrupted.is_set():335 if len(data_list) > 0:336 [self.monitoring_queue.put(chunk) for chunk in chop(data_list, self.get_option("chunk_size"))]337 def __send_status(self):338 logger.info('Status sender thread started')339 lp_job = self.lp_job340 while not lp_job.interrupted.is_set():341 try:342 self.lp_job.send_status(self.core.info.get_info_dict())343 time.sleep(self.get_option('send_status_period'))344 except (APIClient.NetworkError, APIClient.NotAvailable) as e:345 logger.warn('Failed to send status')346 logger.debug(e)347 break348 except APIClient.StoppedFromOnline:349 logger.info("Test stopped from Lunapark")350 self.retcode = self.RC_STOP_FROM_WEB351 break352 if self.finished:353 break354 logger.info("Closed Status sender thread")355 def __uploader(self, queue, sender_method, name='Uploader'):356 logger.info('{} thread started'.format(name))357 while not self.lp_job.interrupted.is_set():358 try:359 entry = queue.get(timeout=1)360 if entry is None:361 logger.info("{} queue returned None".format(name))362 break363 sender_method(entry)364 except Empty:365 continue366 except APIClient.StoppedFromOnline:367 logger.warning("Lunapark is rejecting {} data".format(name))368 break369 except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:370 logger.warn('Failed to push {} data'.format(name))371 logger.warn(e)372 self.lp_job.interrupted.set()373 except Exception:374 exc_type, exc_value, exc_traceback = sys.exc_info()375 logger.error("Mysterious exception:\n%s\n%s\n%s", (exc_type, exc_value, exc_traceback))376 break377 # purge queue378 while not queue.empty():379 if queue.get_nowait() is None:380 break381 logger.info("Closing {} thread".format(name))382 def __data_uploader(self):383 self.__uploader(self.data_queue,384 lambda entry: self.lp_job.push_test_data(*entry),385 'Data Uploader')386 def __monitoring_uploader(self):387 self.__uploader(self.monitoring_queue,388 self.lp_job.push_monitoring_data,389 'Monitoring Uploader')390 def __events_uploader(self):391 self.__uploader(self.events_queue,392 self.lp_job.push_events_data,393 'Events Uploader')394 # TODO: why we do it here? should be in core395 def __save_conf(self):396 for requisites, content in self.core.artifacts_to_send:397 self.lp_job.send_config(requisites, content)398 def parse_lock_targets(self):399 # prepare target lock list400 locks_list_cfg = self.get_option('lock_targets', 'auto')401 def no_target():402 logging.warn("Target lock set to 'auto', but no target info available")403 return {}404 locks_set = {self.target} or no_target() if locks_list_cfg == 'auto' else set(locks_list_cfg)405 targets_to_lock = [host for host in locks_set if host]406 return targets_to_lock407 def lock_targets(self, targets_to_lock, ignore, strict):408 locked_targets = [target for target in targets_to_lock409 if self.lp_job.lock_target(target, self.lock_duration, ignore, strict)]410 return locked_targets411 def unlock_targets(self):412 logger.info("Unlocking targets: %s", self.locked_targets)413 for target in self.locked_targets:414 logger.info(target)415 self.lp_job.api_client.unlock_target(target)416 def check_and_lock_targets(self, strict, ignore):417 targets_list = self.parse_lock_targets()418 logger.info('Locking targets: %s', targets_list)419 locked_targets = self.lock_targets(targets_list, ignore=ignore, strict=strict)420 logger.info('Locked targets: %s', locked_targets)421 return locked_targets422 def make_symlink(self, name):423 PLUGIN_DIR = os.path.join(self.core.artifacts_base_dir, 'lunapark')424 if not os.path.exists(PLUGIN_DIR):425 os.makedirs(PLUGIN_DIR)426 try:427 os.symlink(428 os.path.relpath(429 self.core.artifacts_dir,430 PLUGIN_DIR),431 os.path.join(432 PLUGIN_DIR,433 str(name)))434 # this exception catch for filesystems w/o symlinks435 except OSError:436 logger.warning('Unable to create symlink for artifact: %s', name)437 def _get_user_agent(self):438 plugin_agent = 'Uploader/{}'.format(self.VERSION)439 return ' '.join((plugin_agent,440 self.core.get_user_agent()))441 def __get_operator(self):442 try:443 return self.get_option(444 'operator') or pwd.getpwuid(445 os.geteuid())[0]446 except: # noqa: E722447 logger.error(448 "Couldn't get username from the OS. Please, set the 'meta.operator' option explicitly in your config "449 "file.")450 raise451 def __get_api_client(self):452 logging.info('Using {} backend'.format(self.backend_type))453 if self.backend_type == BackendTypes.LUNAPARK:454 client = APIClient455 self._api_token = None456 elif self.backend_type == BackendTypes.OVERLOAD:457 client = OverloadClient458 self._api_token = self.read_token(self.get_option("token_file"))459 else:460 raise RuntimeError("Backend type doesn't match any of the expected")461 return client(base_url=self.get_option('api_address'),462 writer_url=self.get_option('writer_endpoint'),463 network_attempts=self.get_option('network_attempts'),464 api_attempts=self.get_option('api_attempts'),465 maintenance_attempts=self.get_option('maintenance_attempts'),466 network_timeout=self.get_option('network_timeout'),467 api_timeout=self.get_option('api_timeout'),468 maintenance_timeout=self.get_option('maintenance_timeout'),469 connection_timeout=self.get_option('connection_timeout'),470 user_agent=self._get_user_agent(),471 api_token=self.api_token,472 core_interrupted=self.interrupted)473 @property474 def lp_job(self):475 """476 :rtype: LPJob477 """478 if self._lp_job is None:479 self._lp_job = self.__get_lp_job()480 self.core.publish(self.SECTION, 'job_no', self._lp_job.number)481 self.core.publish(self.SECTION, 'web_link', self._lp_job.web_link)482 self.core.publish(self.SECTION, 'job_name', self._lp_job.name)483 self.core.publish(self.SECTION, 'job_dsc', self._lp_job.description)484 self.core.publish(self.SECTION, 'person', self._lp_job.person)485 self.core.publish(self.SECTION, 'task', self._lp_job.task)486 self.core.publish(self.SECTION, 'version', self._lp_job.version)487 self.core.publish(self.SECTION, 'component', self.get_option('component'))488 self.core.publish(self.SECTION, 'meta', self.cfg.get('meta', {}))489 return self._lp_job490 def __get_lp_job(self):491 """492 :rtype: LPJob493 """494 api_client = self.__get_api_client()495 info = self.get_generator_info()496 port = info.port497 loadscheme = [] if isinstance(info.rps_schedule, (str, dict)) else info.rps_schedule498 lp_job = LPJob(client=api_client,499 target_host=self.target,500 target_port=port,501 number=self.cfg.get('jobno', None),502 token=self.get_option('upload_token'),503 person=self.__get_operator(),504 task=self.task,505 name=self.get_option('job_name', 'untitled'),506 description=self.get_option('job_dsc'),507 tank=self.core.job.tank,508 notify_list=self.get_option("notify"),509 load_scheme=loadscheme,510 version=self.get_option('ver'),511 log_data_requests=self.get_option('log_data_requests'),512 log_monitoring_requests=self.get_option('log_monitoring_requests'),513 log_status_requests=self.get_option('log_status_requests'),514 log_other_requests=self.get_option('log_other_requests'),515 add_cleanup=lambda: self.add_cleanup(self.close_job))516 lp_job.send_config(LPRequisites.CONFIGINITIAL, yaml.dump(self.core.configinitial))517 return lp_job518 @property519 def task(self):520 if self._task is None:521 task = self.get_option('task')522 if task == 'dir':523 task = self.search_task_from_cwd(os.getcwd())524 self._task = task525 return self._task526 @property527 def api_token(self):528 if self._api_token == '':529 if self.backend_type == BackendTypes.LUNAPARK:530 self._api_token = None531 elif self.backend_type == BackendTypes.OVERLOAD:532 self._api_token = self.read_token(self.get_option("token_file", ""))533 else:534 raise RuntimeError("Backend type doesn't match any of the expected")535 return self._api_token536 @staticmethod537 def read_token(filename):538 if filename:539 logger.debug("Trying to read token from %s", filename)540 try:541 with open(filename, 'r') as handle:542 data = handle.read().strip()543 logger.info(544 "Read authentication token from %s, "545 "token length is %d bytes", filename, len(str(data)))546 except IOError:547 logger.error(548 "Failed to read Overload API token from %s", filename)549 logger.info(550 "Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"551 )552 raise RuntimeError("API token error")553 return data554 else:555 logger.error("Overload API token filename is not defined")556 logger.info(557 "Get your Overload API token from https://overload.yandex.net and provide it via 'overload.token_file' parameter"558 )559 raise RuntimeError("API token error")560 def get_generator_info(self):561 return self.core.job.generator_plugin.get_info()562 @property563 def target(self):564 if self._target is None:565 self._target = self.get_generator_info().address566 logger.info("Detected target: %s", self.target)567 return self._target568class JobInfoWidget(AbstractInfoWidget):569 def __init__(self, sender):570 # type: (Plugin) -> object571 AbstractInfoWidget.__init__(self)572 self.owner = sender573 def get_index(self):574 return 1575 def render(self, screen):576 template = "Author: " + screen.markup.RED + "%s" + \577 screen.markup.RESET + \578 "%s\n Job: %s %s\n Task: %s %s\n Web: %s%s"579 data = (self.owner.lp_job.person[:1], self.owner.lp_job.person[1:],...
main.py
Source:main.py
...10import datetime11import itertools12def main(): 13 fake = Faker()14 gen = Generator(get_generator_info(),fake)15 gen.generate_customers()16 gen.generator_products()17 gen.generate_order_headers()18 gen.generate_order_details()19#ToDo: gen info could be set as config file ....refactor... 20#ToDo: product_list list of str.....move external to config or pass in as list... refactor21def get_generator_info() -> GeneratorInfo:22 gen_info = GeneratorInfo(23 order_date_order = datetime.date(2021,1,1)24 ,order_date_ship = datetime.date(2021,1,1)25 ,number_of_orders = 100026 ,number_of_customers = 1027 ,product_list = ["Spoon", "Fork", "Knife", "Plate", "Bowl", "Spork", "Cup", "Glass", "Guzzler", "Straw"] # for use with Faker Provider28 ,tax_rate = 0.06 29 )30 return gen_info31if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!