Best Python code snippet using yandex-tank
plugin.py
Source:plugin.py
...101 def _cleanup(self):102 self.upload_actual_rps(data=pandas.DataFrame([]), last_piece=True)103 uploader_metainfo = self.get_lp_meta()104 autostop_info = self.get_autostop_info()105 regressions = self.get_regressions_names(uploader_metainfo)106 lp_link = self.core.info.get_value(['uploader', 'web_link'])107 meta = self.meta108 meta.update(autostop_info)109 meta['regression'] = regressions110 meta['lunapark_link'] = lp_link111 self.data_session.update_job(meta)112 self.data_session.close(test_end=self.core.info.get_value(['generator', 'test_end'], 0) * 10**6)113 def is_test_finished(self):114 df = next(self.reader)115 if df is not None:116 self.upload(df)117 return -1118 def monitoring_data(self, data_list):119 self.upload_monitoring(data_list)120 def post_process(self, retcode):121 try:122 self.rps_uploader.start()123 for chunk in self.reader:124 if chunk is not None:125 self.upload(chunk)126 self.upload_actual_rps(data=pandas.DataFrame([]), last_piece=True)127 if self.rps_uploader.is_alive():128 self.rps_uploader.join()129 except KeyboardInterrupt:130 logger.warning('Caught KeyboardInterrupt on Neuploader')131 self._cleanup()132 return retcode133 @property134 def is_telegraf(self):135 return True136 def get_metric_obj(self, col, case):137 """138 Generator of metric objects:139 Checks existent metrics and creates new metric if it does not exist.140 :param col: str with column name141 :param case: str with case name142 :return: metric object143 """144 case_metrics = self.metrics_objs.get(case)145 if case_metrics is None:146 for col, constructor in self.col_map.items():147 self.metrics_objs.setdefault(case, {})[col] = constructor(148 dict(self.meta,149 name=col,150 source='tank',151 importance='high' if col in self.importance_high else ''),152 raw=False, aggregate=True,153 parent=self.get_metric_obj(col, self.OVERALL) if case != self.OVERALL else None,154 case=case if case != self.OVERALL else None155 )156 return self.metrics_objs[case][col]157 def upload(self, df):158 self.upload_actual_rps(df)159 df_cases_set = set()160 for row in df.itertuples():161 if row.tag and isinstance(row.tag, str):162 df_cases_set.add(row.tag)163 if '|' in row.tag:164 for tag in row.tag.split('|'):165 df_cases_set.add(tag)166 for column in self.col_map:167 overall_metric_obj = self.get_metric_obj(column, self.OVERALL)168 df['value'] = df[column]169 result_df = self.filter_df_by_case(df, self.OVERALL)170 overall_metric_obj.put(result_df)171 for case_name in df_cases_set:172 case_metric_obj = self.get_metric_obj(column, case_name)173 df['value'] = df[column]174 result_df = self.filter_df_by_case(df, case_name)175 case_metric_obj.put(result_df)176 def upload_monitoring(self, data):177 for metric_name, df in self.monitoring_data_to_dfs(data).items():178 if metric_name not in self.monitoring_metrics:179 panel, metric = metric_name.split(':', 1)180 try:181 group, name = metric.split('_', 1)182 except ValueError:183 name = metric184 group = '_OTHER_'185 self.monitoring_metrics[metric_name] =\186 self.data_session.new_true_metric(187 meta=dict(self.meta,188 name=name,189 group=group,190 host=panel,191 type='monitoring'))192 self.monitoring_metrics[metric_name].put(df)193 def upload_planned_rps(self):194 """ Uploads planned rps as a raw metric """195 df = self.parse_stpd()196 if not df.empty:197 self.rps_metrics['planned_rps_metrics_obj'] = self.data_session.new_true_metric(198 meta=dict(self.meta, name=self.PLANNED_RPS_METRICS_NAME, source='tank'),199 raw=True, aggregate=False, parent=None, case=None)200 self.rps_metrics['planned_rps_metrics_obj'].put(df)201 def upload_actual_rps(self, data, last_piece=False):202 """ Upload actual rps metric """203 if self.rps_metrics['actual_rps_metrics_obj'] is None:204 self.rps_metrics['actual_rps_metrics_obj'] = self.data_session.new_true_metric(205 meta=dict(self.meta, name=self.ACTUAL_RPS_METRICS_NAME),206 raw=True, aggregate=False, parent=None, case=None207 )208 df = self.count_actual_rps(data, last_piece)209 if not df.empty:210 self.rps_metrics['actual_rps_metrics_obj'].put(df)211 def parse_stpd(self):212 """ Reads rps plan from stpd file """213 stpd_file = self.core.info.get_value(['stepper', 'stpd_file'])214 if not stpd_file:215 logger.info('No stpd found, no planned_rps metrics')216 return pandas.DataFrame()217 rows_list = []218 test_start = int(self.core.info.get_value(['generator', 'test_start'], 0) * 10 ** 3)219 pattern = r'^\d+ (\d+)\s*.*$'220 regex = re.compile(pattern)221 try:222 with open(stpd_file) as stpd:223 for line in stpd:224 if regex.match(line):225 timestamp = int((int(line.split(' ')[1]) + test_start) / 1e3) # seconds226 rows_list.append(timestamp)227 except Exception:228 logger.warning('Failed to parse stpd file')229 logger.debug('', exc_info=True)230 return pandas.DataFrame()231 return self.rps_series_to_df(pandas.Series(rows_list))232 def count_actual_rps(self, data, last_piece):233 """ Counts actual rps on base of input chunk. Uses buffer for latest timestamp in df. """234 if not last_piece and not data.empty:235 concat_ts = pandas.concat([(data.ts / 1e6).astype(int), self.rps_metrics['actual_rps_latest']])236 self.rps_metrics['actual_rps_latest'] = concat_ts.loc[lambda s: s == concat_ts.max()]237 series_to_send = concat_ts.loc[lambda s: s < concat_ts.max()]238 df = self.rps_series_to_df(series_to_send) if series_to_send.any else pandas.DataFrame([])239 else:240 df = self.rps_series_to_df(self.rps_metrics['actual_rps_latest'])241 self.rps_metrics['actual_rps_latest'] = pandas.Series()242 return df243 @staticmethod244 def monitoring_data_to_dfs(data):245 panels = {}246 for chunk in data:247 for panel_name, content in chunk['data'].items():248 if panel_name in panels:249 for metric_name, value in content['metrics'].items():250 if metric_name in panels[panel_name]:251 panels[panel_name][metric_name]['value'].append(value)252 panels[panel_name][metric_name]['ts'].append(chunk['timestamp'])253 else:254 panels[panel_name][metric_name] = {'value': [value], 'ts': [chunk['timestamp']]}255 else:256 panels[panel_name] = {name: {'value': [value], 'ts': [chunk['timestamp']]} for name, value in content['metrics'].items()}257 return {'{}:{}'.format(panelk, name): pandas.DataFrame({'ts': [ts * 1000000 for ts in values['ts']], 'value': values['value']})258 for panelk, panelv in panels.items() for name, values in panelv.items()}259 @staticmethod260 def rps_series_to_df(series):261 df = series.value_counts().to_frame(name='value')262 df_to_send = df.rename_axis('ts')263 df_to_send.reset_index(inplace=True)264 df_to_send.loc[:, 'ts'] = (df_to_send['ts'] * 1e6).astype(int)265 return df_to_send266 @staticmethod267 def filter_df_by_case(df, case):268 """269 Filter dataframe by case name. If case is '__overall__', return all rows.270 :param df: DataFrame271 :param case: str with case name272 :return: DataFrame with columns 'ts' and 'value'273 """274 case = case.strip()275 return df[['ts', 'value']] if case == Plugin.OVERALL else df[df.tag.str.strip() == case][['ts', 'value']]276 def get_lp_meta(self):277 uploader_meta = self.core.info.get_value(['uploader'])278 if not uploader_meta:279 logger.info('No uploader metainfo found')280 return {}281 else:282 meta_tags_names = ['component', 'description', 'name', 'person', 'task', 'version', 'lunapark_jobno']283 meta_tags = {key: uploader_meta.get(key) for key in meta_tags_names if key in uploader_meta}284 meta_tags.update({k: v if v is not None else '' for k, v in uploader_meta.get('meta', {}).items()})285 return meta_tags286 @staticmethod287 def get_regressions_names(uploader_metainfo):288 task, component_name = uploader_metainfo.get('task'), uploader_metainfo.get('component')289 if not task or not component_name:290 return []291 project_name = task.split('-')[0]292 lp_api_url = 'https://lunapark.yandex-team.ru/api/regress/{}/componentlist.json'.format(project_name)293 try:294 componentlist =\295 requests.get(lp_api_url).json()296 except (ValueError, ConnectionError):297 logger.info("Failed to fetch data from {}".format(lp_api_url), exc_info=True)298 return []299 for component in componentlist:300 try:301 if component['name'] == component_name:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!