Best Python code snippet using localstack_python
metric_aggregator.py
Source:metric_aggregator.py
...25 else:26 output += f" {template_not_implemented_html}{e}</input><br/>\n"27 output += " </details></li>\n"28 return output29def create_simple_html_report(file_name, metrics):30 output = "<html><h1> Metric Collection Report of Integration Tests </h1>\n\n"31 output += "<div><b>Disclaimer</b>: naive calculation of test coverage - if operation is called at least once, it is considered as 'covered'.<br/>\n"32 output += "✨: aws_validated or using the snapshot fixture<br/>\n"33 output += "📸: using the snapshot fixture without any skip_snapshot_verify<br/>\n"34 output += "🚫: using the snapshot fixture but uses skip_snapshot_verify<br/></div>\n"35 for service in sorted(metrics.keys()):36 output += f"<h1> {service} </h1>\n<div>\n"37 details = metrics[service]38 if not details["service_attributes"]["pro"]:39 output += "community<br/>\n"40 elif not details["service_attributes"]["community"]:41 output += "pro only<br/>\n"42 else:43 output += "community, and pro features<br/>\n"44 del metrics[service]["service_attributes"]45 output += "</div>\n"46 operation_counter = len(details)47 operation_tested = 048 tmp = ""49 template_aws_validated = '<span title="AWS validated">✨</span>'50 template_snapshot_verified = '<span title="Snapshot verified">📸</span>'51 template_snapshot_skipped = '<span title="Snapshot skipped">🚫</span>'52 for operation in sorted(details.keys()):53 op_details = details[operation]54 if op_details.get("invoked", 0) > 0:55 operation_tested += 156 aws_validated = f"{template_aws_validated if op_details.get('aws_validated') or op_details.get('snapshot') else ''}"57 snapshot = f"{template_snapshot_verified if aws_validated and not op_details.get('snapshot_skipped_paths') else template_snapshot_skipped if aws_validated else ''}"58 tmp += f"<p>{template_implemented_html}{operation} {aws_validated} {snapshot}</input>\n"59 else:60 tmp += f"<p>{template_not_implemented_html}{operation}</input>\n"61 tmp += "<ul>"62 if op_details.get("parameters"):63 parameters = op_details.get("parameters")64 if parameters:65 tmp += _generate_details_block_html("parameters hit", parameters)66 if op_details.get("errors"):67 tmp += _generate_details_block_html("errors hit", op_details["errors"])68 tmp += "</ul>"69 tmp += "</p>"70 output += f"<p><details><summary>{operation_tested/operation_counter*100:.2f}% test coverage</summary>\n\n{tmp}\n</details></p>\n"71 with open(file_name, "a") as fd:72 fd.write(f"{output}\n")73 output = ""74def _generate_details_block(details_title: str, details: dict) -> str:75 output = f" <details><summary>{details_title}</summary>\n\n"76 for e, count in details.items():77 if count > 0:78 output += f" {template_implemented_item}{e}\n"79 else:80 output += f" {template_not_implemented_item}{e}\n"81 output += " </details>\n"82 return output83def create_readable_report(file_name: str, metrics: dict):84 output = "# Metric Collection Report of Integration Tests #\n\n"85 output += "**__Disclaimer__**: naive calculation of test coverage - if operation is called at least once, it is considered as 'covered'.\n"86 output += f"{AWS_VALIDATED}: aws_validated or using the snapshot fixture\n"87 output += f"{SNAPSHOT}: using the snapshot fixture without any skip_snapshot_verify\n"88 output += f"{AWS_VALIDATED}: using the snapshot fixture but uses skip_snapshot_verify\n"89 for service in sorted(metrics.keys()):90 output += f"## {service} ##\n"91 details = metrics[service]92 if not details["service_attributes"]["pro"]:93 output += "community\n"94 elif not details["service_attributes"]["community"]:95 output += "pro only\n"96 else:97 output += "community, and pro features\n"98 del metrics[service]["service_attributes"]99 operation_counter = len(details)100 operation_tested = 0101 tmp = ""102 for operation in sorted(details.keys()):103 op_details = details[operation]104 if op_details.get("invoked", 0) > 0:105 operation_tested += 1106 aws_validated = f"{AWS_VALIDATED if op_details.get('aws_validated') or op_details.get('snapshot') else ''}"107 snapshot = f"{SNAPSHOT if aws_validated and not op_details.get('snapshot_skipped_paths') else SNAPSHOT_SKIP_VERIFY if aws_validated else ''}"108 tmp += f"{template_implemented_item}{operation} {aws_validated} {snapshot}\n"109 else:110 tmp += f"{template_not_implemented_item}{operation}\n"111 if op_details.get("parameters"):112 parameters = op_details.get("parameters")113 if parameters:114 tmp += _generate_details_block("parameters hit", parameters)115 if op_details.get("errors"):116 tmp += _generate_details_block("errors hit", op_details["errors"])117 output += f"<details><summary>{operation_tested/operation_counter*100:.2f}% test coverage</summary>\n\n{tmp}\n</details>\n"118 with open(file_name, "a") as fd:119 fd.write(f"{output}\n")120 output = ""121def _init_service_metric_counter() -> Dict:122 metric_recorder = {}123 from localstack.aws.spec import load_service124 for s, provider in SERVICE_PLUGINS.api_provider_specs.items():125 try:126 service = load_service(s)127 ops = {}128 service_attributes = {"pro": "pro" in provider, "community": "default" in provider}129 ops["service_attributes"] = service_attributes130 for op in service.operation_names:131 attributes = {}132 attributes["invoked"] = 0133 attributes["aws_validated"] = False134 attributes["snapshot"] = False135 if hasattr(service.operation_model(op).input_shape, "members"):136 params = {}137 for n in service.operation_model(op).input_shape.members:138 params[n] = 0139 attributes["parameters"] = params140 if hasattr(service.operation_model(op), "error_shapes"):141 exceptions = {}142 for e in service.operation_model(op).error_shapes:143 exceptions[e.name] = 0144 attributes["errors"] = exceptions145 ops[op] = attributes146 metric_recorder[s] = ops147 except Exception:148 LOG.debug(f"cannot load service '{s}'")149 return metric_recorder150def print_usage():151 print("missing argument: directory")152 print("usage: python metric_aggregator.py <dir-to-raw-csv-metric> [amd64|arch64]")153def write_json(file_name: str, metric_dict: dict):154 with open(file_name, "w") as fd:155 fd.write(json.dumps(metric_dict, indent=2, sort_keys=True))156def _print_diff(metric_recorder_internal, metric_recorder_external):157 for key, val in metric_recorder_internal.items():158 for subkey, val in val.items():159 if isinstance(val, dict) and val.get("invoked"):160 if val["invoked"] > 0 and not metric_recorder_external[key][subkey]["invoked"]:161 print(f"found invocation mismatch: {key}.{subkey}")162def append_row_to_raw_collection(collection_raw_csv_file_name, row, arch):163 with open(collection_raw_csv_file_name, "a") as fd:164 writer = csv.writer(fd)165 row.append(arch)166 writer.writerow(row)167def aggregate_recorded_raw_data(168 base_dir: str, collection_raw_csv: Optional[str] = None, collect_for_arch: Optional[str] = ""169) -> dict:170 pathlist = Path(base_dir).rglob("metric-report-raw-data-*.csv")171 recorded = _init_service_metric_counter()172 for path in pathlist:173 print(f"checking {str(path)}")174 with open(path, "r") as csv_obj:175 csv_dict_reader = csv.reader(csv_obj)176 # skip the header177 next(csv_dict_reader)178 for row in csv_dict_reader:179 if collection_raw_csv:180 arch = ""181 if "arm64" in str(path):182 arch = "arm64"183 elif "amd64" in str(path):184 arch = "amd64"185 # only aggregate all if we did not set a specific target to collect186 if not collect_for_arch:187 append_row_to_raw_collection(collection_raw_csv, copy.deepcopy(row), arch)188 elif collect_for_arch in str(path):189 append_row_to_raw_collection(collection_raw_csv, copy.deepcopy(row), arch)190 metric: Metric = Metric(*row)191 if metric.xfail == "True":192 print(f"test {metric.node_id} marked as xfail")193 continue194 if collect_for_arch and collect_for_arch not in str(path):195 continue196 service = recorded[metric.service]197 ops = service[metric.operation]198 errors = ops.setdefault("errors", {})199 if metric.exception:200 exception = metric.exception201 errors[exception] = ops.get(exception, 0) + 1202 elif int(metric.response_code) >= 300:203 for expected_error in ops.get("errors", {}).keys():204 if expected_error in metric.response_data:205 # assume we have a match206 errors[expected_error] += 1207 LOG.warning(208 f"Exception assumed for {metric.service}.{metric.operation}: code {metric.response_code}"209 )210 break211 ops["invoked"] += 1212 if metric.snapshot == "True":213 ops["snapshot"] = True # TODO snapshot currently includes also "skip_verify"214 ops["snapshot_skipped_paths"] = metric.snapshot_skipped_paths or ""215 if metric.aws_validated == "True":216 ops["aws_validated"] = True217 if not metric.parameters:218 params = ops.setdefault("parameters", {})219 params["_none_"] = params.get("_none_", 0) + 1220 else:221 for p in metric.parameters.split(","):222 ops["parameters"][p] += 1223 test_list = ops.setdefault("tests", [])224 if metric.node_id not in test_list:225 test_list.append(metric.node_id)226 return recorded227def main():228 if not len(sys.argv) >= 2 or not Path(sys.argv[1]).is_dir():229 print_usage()230 return231 base_dir = sys.argv[1]232 collect_for_arch = ""233 if len(sys.argv) == 3:234 collect_for_arch = sys.argv[2]235 if collect_for_arch not in ("amd64", "arm64"):236 print_usage()237 return238 print(239 f"Set target to '{collect_for_arch}' - will only aggregate for these test results. Raw collection of all files.\n"240 )241 # TODO: removed splitting of internal/external recorded calls, as some pro tests use 'internals' to connect to service242 metrics_path = os.path.join(base_dir, "parity_metrics")243 Path(metrics_path).mkdir(parents=True, exist_ok=True)244 dtime = datetime.datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%s")245 collection_raw_csv = os.path.join(metrics_path, f"raw-collected-data-{dtime}.csv")246 with open(collection_raw_csv, "w") as fd:247 writer = csv.writer(fd)248 header = Metric.RAW_DATA_HEADER.copy()249 header.append("arch")250 writer.writerow(header)251 recorded_metrics = aggregate_recorded_raw_data(base_dir, collection_raw_csv, collect_for_arch)252 write_json(253 os.path.join(254 metrics_path,255 f"metric-report-{dtime}{collect_for_arch}.json",256 ),257 recorded_metrics,258 )259 # filename = os.path.join(metrics_path, f"metric-report-{dtime}{collect_for_arch}.md")260 # create_readable_report(filename, recorded_metrics)261 filename = os.path.join(metrics_path, f"metric-report-{dtime}{collect_for_arch}.html")262 create_simple_html_report(filename, recorded_metrics)263if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!