Best Python code snippet using yandex-tank
tankcore.py
Source:tankcore.py
...108 self.lock_dir = self.get_option(self.SECTION, 'lock_dir')109 with open(os.path.join(self.artifacts_dir, CONFIGINITIAL), 'w') as f:110 yaml.dump(self.configinitial, f)111 self.add_artifact_file(error_output)112 self.add_artifact_to_send(LPRequisites.CONFIGINITIAL, yaml.dump(self.configinitial))113 configinfo = self.config.validated.copy()114 configinfo.setdefault(self.SECTION, {})115 configinfo[self.SECTION][self.API_JOBNO] = self.test_id116 self.add_artifact_to_send(LPRequisites.CONFIGINFO, yaml.dump(configinfo))117 with open(os.path.join(self.artifacts_dir, VALIDATED_CONF), 'w') as f:118 yaml.dump(configinfo, f)119 logger.info('New test id %s' % self.test_id)120 @property121 def cfg_snapshot(self):122 if not self._cfg_snapshot:123 self._cfg_snapshot = str(self.config)124 return self._cfg_snapshot125 @staticmethod126 def get_available_options():127 # todo: should take this from schema128 return [129 "artifacts_base_dir", "artifacts_dir",130 "taskset_path", "affinity"131 ]132 @property133 def plugins(self):134 """135 :returns: {plugin_name: plugin_class, ...}136 :rtype: dict137 """138 if self._plugins is None:139 self.load_plugins()140 if self._plugins is None:141 self._plugins = {}142 return self._plugins143 @property144 def artifacts_base_dir(self):145 if not self._artifacts_base_dir:146 try:147 artifacts_base_dir = os.path.abspath(self.get_option(self.SECTION, "artifacts_base_dir"))148 except ValidationError:149 artifacts_base_dir = os.path.abspath('logs')150 if not os.path.exists(artifacts_base_dir):151 os.makedirs(artifacts_base_dir)152 os.chmod(self.artifacts_base_dir, 0o755)153 self._artifacts_base_dir = artifacts_base_dir154 return self._artifacts_base_dir155 def load_plugins(self):156 """157 Tells core to take plugin options and instantiate plugin classes158 """159 logger.info("Loading plugins...")160 for (plugin_name, plugin_path, plugin_cfg) in self.config.plugins:161 logger.debug("Loading plugin %s from %s", plugin_name, plugin_path)162 if plugin_path == "yandextank.plugins.Overload":163 logger.warning(164 "Deprecated plugin name: 'yandextank.plugins.Overload'\n"165 "There is a new generic plugin now.\n"166 "Correcting to 'yandextank.plugins.DataUploader overload'")167 plugin_path = "yandextank.plugins.DataUploader overload"168 try:169 logger.info("Trying to import module: {}".format(plugin_name))170 logger.info("Path: {}".format(plugin_path))171 plugin = il.import_module(plugin_path)172 except ImportError:173 logger.warning('Plugin name %s path %s import error', plugin_name, plugin_path)174 logger.debug('Plugin name %s path %s import error', plugin_name, plugin_path, exc_info=True)175 raise176 try:177 instance = getattr(plugin, 'Plugin')(self, cfg=plugin_cfg, name=plugin_name)178 except AttributeError:179 logger.warning('Plugin %s classname should be `Plugin`', plugin_name)180 raise181 else:182 self.register_plugin(self.PLUGIN_PREFIX + plugin_name, instance)183 logger.debug("Plugin instances: %s", self._plugins)184 @property185 def job(self):186 if not self._job:187 # monitoring plugin188 monitorings = [plugin for plugin in self.plugins.values() if isinstance(plugin, MonitoringPlugin)]189 # generator plugin190 try:191 gen = self.get_plugin_of_type(GeneratorPlugin)192 except KeyError:193 logger.warning("Load generator not found")194 gen = GeneratorPlugin(self, {}, 'generator dummy')195 # aggregator196 aggregator = TankAggregator(gen)197 self._job = Job(monitoring_plugins=monitorings,198 generator_plugin=gen,199 aggregator=aggregator,200 tank=socket.getfqdn())201 return self._job202 def plugins_configure(self):203 """ Call configure() on all plugins """204 self.publish("core", "stage", "configure")205 logger.info("Configuring plugins...")206 self.taskset_affinity = self.get_option(self.SECTION, 'affinity')207 if self.taskset_affinity:208 self.__setup_taskset(self.taskset_affinity, pid=os.getpid())209 for plugin in self.plugins.values():210 if not self.interrupted.is_set():211 logger.debug("Configuring %s", plugin)212 plugin.configure()213 if isinstance(plugin, MonitoringDataListener):214 self.monitoring_data_listeners.append(plugin)215 def plugins_prepare_test(self):216 """ Call prepare_test() on all plugins """217 logger.info("Preparing test...")218 self.publish("core", "stage", "prepare")219 for plugin in self.plugins.values():220 if not self.interrupted.is_set():221 logger.debug("Preparing %s", plugin)222 plugin.prepare_test()223 def plugins_start_test(self):224 """ Call start_test() on all plugins """225 if not self.interrupted.is_set():226 logger.info("Starting test...")227 self.publish("core", "stage", "start")228 self.job.aggregator.start_test()229 for plugin_name, plugin in self.plugins.items():230 logger.debug("Starting %s", plugin)231 start_time = time.time()232 plugin.start_test()233 logger.info("Plugin {0:s} required {1:f} seconds to start".format(plugin_name,234 time.time() - start_time))235 self.publish('generator', 'test_start', self.job.generator_plugin.start_time)236 def wait_for_finish(self):237 """238 Call is_test_finished() on all plugins 'till one of them initiates exit239 """240 if not self.interrupted.is_set():241 logger.info("Waiting for test to finish...")242 logger.info('Artifacts dir: {dir}'.format(dir=self.artifacts_dir))243 self.publish("core", "stage", "shoot")244 if not self.plugins:245 raise RuntimeError("It's strange: we have no plugins loaded...")246 while not self.interrupted.is_set():247 begin_time = time.time()248 aggr_retcode = self.job.aggregator.is_test_finished()249 if aggr_retcode >= 0:250 return aggr_retcode251 for plugin_name, plugin in self.plugins.items():252 logger.debug("Polling %s", plugin)253 try:254 retcode = plugin.is_test_finished()255 if retcode >= 0:256 return retcode257 except Exception:258 logger.warning('Plugin {} failed:'.format(plugin_name), exc_info=True)259 if isinstance(plugin, GeneratorPlugin):260 return RetCode.ERROR261 else:262 logger.warning('Disabling plugin {}'.format(plugin_name))263 plugin.is_test_finished = lambda: RetCode.CONTINUE264 end_time = time.time()265 diff = end_time - begin_time266 logger.debug("Polling took %s", diff)267 logger.debug("Tank status: %s", json.dumps(self.info.get_info_dict()))268 # screen refresh every 0.5 s269 if diff < 0.5:270 time.sleep(0.5 - diff)271 return 1272 def plugins_end_test(self, retcode):273 """ Call end_test() on all plugins """274 logger.info("Finishing test...")275 self.publish("core", "stage", "end")276 self.publish('generator', 'test_end', time.time())277 logger.info("Stopping load generator and aggregator")278 retcode = self.job.aggregator.end_test(retcode)279 logger.debug("RC after: %s", retcode)280 logger.info('Stopping monitoring')281 for plugin in self.job.monitoring_plugins:282 logger.info('Stopping %s', plugin)283 retcode = plugin.end_test(retcode) or retcode284 logger.info('RC after: %s', retcode)285 for plugin in [p for p in self.plugins.values() if286 p is not self.job.generator_plugin and p not in self.job.monitoring_plugins]:287 logger.debug("Finalize %s", plugin)288 try:289 logger.debug("RC before: %s", retcode)290 retcode = plugin.end_test(retcode)291 logger.debug("RC after: %s", retcode)292 except Exception: # FIXME too broad exception clause293 logger.error("Failed finishing plugin %s", plugin, exc_info=True)294 if not retcode:295 retcode = 1296 return retcode297 def plugins_post_process(self, retcode):298 """299 Call post_process() on all plugins300 """301 logger.info("Post-processing test...")302 self.publish("core", "stage", "post_process")303 for plugin in self.plugins.values():304 logger.debug("Post-process %s", plugin)305 try:306 logger.debug("RC before: %s", retcode)307 retcode = plugin.post_process(retcode)308 logger.debug("RC after: %s", retcode)309 except Exception: # FIXME too broad exception clause310 logger.error("Failed post-processing plugin %s", plugin, exc_info=True)311 if not retcode:312 retcode = 1313 return retcode314 def publish_monitoring_data(self, data):315 """sends pending data set to listeners"""316 for plugin in self.monitoring_data_listeners:317 # deep copy to ensure each listener gets it's own copy318 try:319 plugin.monitoring_data(copy.deepcopy(data))320 except Exception:321 logger.error("Plugin failed to process monitoring data", exc_info=True)322 def __setup_taskset(self, affinity, pid=None, args=None):323 """ if pid specified: set process w/ pid `pid` CPU affinity to specified `affinity` core(s)324 if args specified: modify list of args for Popen to start w/ taskset w/ affinity `affinity`325 """326 self.taskset_path = self.get_option(self.SECTION, 'taskset_path')327 if args:328 return [self.taskset_path, '-c', affinity] + args329 if pid:330 args = "%s -pc %s %s" % (self.taskset_path, affinity, pid)331 retcode, stdout, stderr = execute(args, shell=True, poll_period=0.1, catch_out=True)332 logger.debug('taskset for pid %s stdout: %s', pid, stdout)333 if retcode == 0:334 logger.info("Enabled taskset for pid %s with affinity %s", str(pid), affinity)335 else:336 logger.debug('Taskset setup failed w/ retcode :%s', retcode)337 raise KeyError(stderr)338 def _collect_artifacts(self, validation_failed=False):339 logger.debug("Collecting artifacts")340 logger.info("Artifacts dir: %s", self.artifacts_dir)341 for filename, keep in self.artifact_files.items():342 try:343 self.__collect_file(filename, keep)344 except Exception as ex:345 logger.warn("Failed to collect file %s: %s", filename, ex)346 def get_option(self, section, option, default=None):347 return self.config.get_option(section, option, default)348 def set_option(self, section, option, value):349 """350 Set an option in storage351 """352 raise NotImplementedError353 def set_exitcode(self, code):354 self.output['core']['exitcode'] = code355 def get_plugin_of_type(self, plugin_class):356 """357 Retrieve a plugin of desired class, KeyError raised otherwise358 """359 logger.debug("Searching for plugin: %s", plugin_class)360 matches = [plugin for plugin in self.plugins.values() if isinstance(plugin, plugin_class)]361 if matches:362 if len(matches) > 1:363 logger.debug(364 "More then one plugin of type %s found. Using first one.",365 plugin_class)366 return matches[-1]367 else:368 raise KeyError("Requested plugin type not found: %s" % plugin_class)369 def get_plugins_of_type(self, plugin_class):370 """371 Retrieve a list of plugins of desired class, KeyError raised otherwise372 """373 logger.debug("Searching for plugins: %s", plugin_class)374 matches = [plugin for plugin in self.plugins.values() if isinstance(plugin, plugin_class)]375 if matches:376 return matches377 else:378 raise KeyError("Requested plugin type not found: %s" % plugin_class)379 def get_jobno(self, plugin_name='plugin_lunapark'):380 uploader_plugin = self.plugins[plugin_name]381 return uploader_plugin.lp_job.number382 def __collect_file(self, filename, keep_original=False):383 """384 Move or copy single file to artifacts dir385 """386 dest = self.artifacts_dir + '/' + os.path.basename(filename)387 logger.debug("Collecting file: %s to %s", filename, dest)388 if not filename or not os.path.exists(filename):389 logger.warning("File not found to collect: %s", filename)390 return391 if os.path.exists(dest):392 # FIXME: 3 find a way to store artifacts anyway393 logger.warning("File already exists: %s", dest)394 return395 if keep_original:396 shutil.copy(filename, self.artifacts_dir)397 else:398 shutil.move(filename, self.artifacts_dir)399 os.chmod(dest, 0o644)400 def add_artifact_file(self, filename, keep_original=False):401 """402 Add file to be stored as result artifact on post-process phase403 """404 if filename:405 logger.debug(406 "Adding artifact file to collect (keep=%s): %s", keep_original,407 filename)408 self.artifact_files[filename] = keep_original409 def add_artifact_to_send(self, lp_requisites, content):410 self.artifacts_to_send.append((lp_requisites, content))411 def apply_shorthand_options(self, options, default_section='DEFAULT'):412 for option_str in options:413 key, value = option_str.split('=')414 try:415 section, option = key.split('.')416 except ValueError:417 section = default_section418 option = key419 logger.debug(420 "Override option: %s => [%s] %s=%s", option_str, section,421 option, value)422 self.set_option(section, option, value)423 def mkstemp(self, suffix, prefix, directory=None):...
plugin.py
Source:plugin.py
...131 self.monitoring = None132 self.die_on_fail = False133 return134 with open(self.config) as f:135 self.core.add_artifact_to_send(LPRequisites.MONITORING, str(f.read()))136 # FIXME [legacy] backward compatibility with Monitoring module137 # configuration below.138 self.monitoring.ssh_timeout = expand_to_seconds(139 self.get_option("ssh_timeout", "5s"))140 try:141 autostop = self.core.get_plugin_of_type(AutostopPlugin)142 autostop.add_criterion_class(MetricHigherCriterion)143 autostop.add_criterion_class(MetricLowerCriterion)144 except KeyError:145 logger.debug(146 "No autostop plugin found, not adding instances criterion")147 def prepare_test(self):148 if not self.config or self.config.lower() == 'none':149 return...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!