Best Python code snippet using green
evaluator.py
Source:evaluator.py
...25 try:26 dictionary_ids = dictionaries if dictionaries else self.lexonomy_client.dictionaries()["dictionaries"]27 except HTTPError as error:28 report["available"] = False29 self._add_errors(report, [str(error)])30 #sys.stderr.write(f'Evaluating {len(dictionary_ids):d} dictionaries\n')31 return self.loop_dictionary_retrieval(dictionary_ids, limit, report)32 def loop_dictionary_retrieval(self, dictionary_ids, limit, report):33 if limit == -1:34 limit = len(dictionary_ids)35 count = 036 dicts = []37 for dictionary_id in dictionary_ids:38 try:39 if count < limit:40 #sys.stderr.write(f"Loading Metadata of {dictionary_id} \n")41 metadata = Metadata(self.lexonomy_client.about(dictionary_id))42 dictionary = Dictionary(dictionary_id, metadata)43 dicts.append(dictionary)44 count += 145 except HTTPError as error:46 self._add_errors(report, [str(error)])47 report["available"] = False48 #sys.stderr.write(f'Failed loading {dictionary_id} dictionary \n')49 return dicts, report50 def evaluate_metadata(self, dictionaries: [Dictionary]) -> dict:51 report = {}52 for dictionary in dictionaries:53 #sys.stderr.write(f'Evaluating {dictionary}')54 #sys.stderr.flush()55 metadata_report = {}56 metadata = dictionary.metadata57 if dictionary.metadata.errors:58 metadata_report['errors'] = dictionary.metadata.errors59 for metadata_evaluator in self.metadata_metrics_evaluators:60 #sys.stderr.write(str(metadata_evaluator))61 #sys.stderr.flush()62 metadata_evaluator.analyze(metadata)63 metadata_report.update(metadata_evaluator.result())64 report[dictionary.id] = {'metadata_report': metadata_report}65 #sys.stderr.write(str(metadata_report))66 #sys.stderr.flush()67 return report68 def evaluate_entries(self, dictionaries: [Dictionary], max_entries=None) -> dict:69 if max_entries is None:70 max_entries = 10071 report = {}72 for dictionary in dictionaries:73 entry_report = {}74 entry_counter = max_entries if max_entries is not None else dictionary.metadata.entry_count75 self._loop_entries_endpoint(dictionary, entry_report, entry_counter)76 #sys.stderr.write("\n")77 self._collect_entry_metrics(entry_report, self.entry_metrics_evaluators)78 report[dictionary.id] = {'entry_report': entry_report}79 return report80 @staticmethod81 def entry_evaluation_report_as_dataframe(report: dict):82 return pd.DataFrame.from_dict({i: report['dictionaries'][i]['entry_report']83 for i in report['dictionaries'].keys()},84 orient='index')85 @staticmethod86 def metadata_evaluation_report_as_dataframe(report: dict):87 return pd.DataFrame.from_dict({i: report['dictionaries'][i]['metadata_report']88 for i in report['dictionaries'].keys()},89 orient='index', dtype=object)90 @staticmethod91 def entry_report(dictionary_id, report: dict):92 return report[dictionary_id]['entry_report']93 def visualize(self, final_report):94 dataframe = self.entry_evaluation_report_as_dataframe(final_report).drop('errors', axis=1)95 dataframe = dataframe.apply(lambda x: x / x.max(), axis=0)96 dataframe['dict_type'] = 097 parallel_coordinates(dataframe, "dict_type", axvlines=True)98 plt.show()99 def aggregated_evaluation(self, report: dict):100 df = self.metadata_evaluation_report_as_dataframe(report)101 if Vocabulary.SIZE_OF_DICTIONARY in df:102 report[Vocabulary.AGGREGATION_METRICS] = {103 Vocabulary.DICTIONARY_SIZE: {104 'min': float(df[Vocabulary.SIZE_OF_DICTIONARY].min()),105 'max': float(df[Vocabulary.SIZE_OF_DICTIONARY].max()),106 'mean': float(df[Vocabulary.SIZE_OF_DICTIONARY].mean()),107 'median': float(df[Vocabulary.SIZE_OF_DICTIONARY].median())108 }109 }110 return report111 def _loop_entries_endpoint(self, dictionary, entry_report, max_entries, entries_limit=100):112 entries_offset = 0113 while entries_offset <= max_entries:114 try:115 entries = self.lexonomy_client.list(dictionary.id, limit=entries_limit, offset=entries_offset)116 except HTTPError as error:117 self._add_errors(entry_report, [str(error)])118 except JSONDecodeError as error:119 self._add_errors(entry_report, [str(error)])120 except RequestException as error:121 self._add_errors(entry_report, [str(error)])122 if not entries:123 break124 entries_offset = self._handle_entries(dictionary, entries, entry_report, max_entries, entries_offset)125 #sys.stderr.write(str(entries_offset) + '...')126 #sys.stderr.flush()127 if len(entries) < entries_limit:128 break129 @staticmethod130 def _collect_entry_metrics(entry_report, entry_metrics_evaluators: [EntryMetric]):131 for entry_metric in entry_metrics_evaluators:132 if entry_metric.result():133 #sys.stderr.write(str(entry_metric))134 #sys.stderr.write(str(entry_metric.result()))135 #sys.stderr.write("\n")136 #sys.stderr.flush()137 entry_report.update(entry_metric.result())138 entry_metric.reset()139 def _handle_entries(self, dictionary, entries, entry_report, max_entries, entries_offset):140 for entry in entries:141 entries_offset += 1142 if entries_offset > max_entries:143 break144 try:145 entry = Entry(entry)146 if entry.errors:147 self._add_errors(entry_report, entry.errors)148 else:149 self._entry_report(dictionary.id, entry_report, entry)150 except HTTPError:151 self._add_errors(entry_report, f'Failed to retrieve lemmas for dictionary {dictionary.id}')152 except ParseError as parse_error:153 self._add_errors(entry_report, [str(parse_error)])154 except JSONDecodeError as json_decode_error:155 self._add_errors(entry_report, [str(json_decode_error)])156 except RequestException as json_decode_error:157 self._add_errors(entry_report, [str(json_decode_error)])158 return entries_offset159 def evaluation_report(self, dictionary_report:dict, entry_report: dict, metadata_report: dict):160 for key in entry_report.keys():161 if key not in dictionary_report['dictionaries']:162 dictionary_report['dictionaries'][key] = {'entry_report': {}, 'metadata_report': {}}163 dictionary_report['dictionaries'][key]['entry_report'] = entry_report[key]['entry_report']164 for key in metadata_report.keys():165 if key not in dictionary_report['dictionaries']:166 dictionary_report['dictionaries'][key] = {'entry_report': {}, 'metadata_report': {}}167 dictionary_report['dictionaries'][key]['metadata_report'] = metadata_report[key]['metadata_report']168 return dictionary_report169 def _entry_report(self, dictionary_id: str, entry_report: dict, entry: Entry):170 retrieved_entry: JsonEntry = self._retrieve_entry(dictionary_id, entry, entry_report)171 if retrieved_entry is not None:172 if retrieved_entry.errors:173 self._add_errors(entry_report, retrieved_entry.errors)174 self._run_entry_metrics_evaluators(retrieved_entry, entry)175 def _retrieve_entry(self, dictionary_id, entry: Entry, entry_report: dict) -> JsonEntry:176 if "json" in entry.formats:177 try:178 return JsonEntry(self.lexonomy_client.json(dictionary_id, entry.id))179 except JSONDecodeError as jde:180 raise JSONDecodeError(f"Error parsing json response {entry.id}: {str(jde)}")181 elif "tei" in entry.formats:182 tei_entry = self.lexonomy_client.tei(dictionary_id, entry.id)183 try:184 tei_entry_element = validate_tei(tei_entry)185 return JsonEntry.from_tei_entry(tei_entry_element, entry.id)186 except ParseError as pe:187 raise ParseError(f"Error with entry {entry.id}: {str(pe)}")188 elif "ontolex" in entry.formats:189 ontolex_entry = self.lexonomy_client.ontolex(dictionary_id, entry.id)190 #try:191 ontolex_entry_element = validate_ontolex(ontolex_entry)192 return JsonEntry.from_ontolex_entry(ontolex_entry_element, entry.id)193 #except Exception as error:194 # raise error195 else:196 self._add_errors(entry_report, ["Entry has no supported formats"])197 return None198 def _run_entry_metrics_evaluators(self, entry_details, entry_metadata):199 for entry_metric in self.entry_metrics_evaluators:200 entry_metric.accumulate(entry_details, entry_metadata)201 def _prepare_report(self, dictionary):202 if dictionary.id not in self.report['dictionaries']:203 self.report['dictionaries'][dictionary.id] = {'entry_report': {}, 'metadata_report': {}}204 def _add_entry_report(self, dictionary, entry_report):205 self.report['dictionaries'][dictionary.id]['entry_report'] = entry_report206 def _add_errors(self, entry_report, errors):207 if "errors" not in entry_report:208 entry_report["errors"] = []...
test_evaluate.py
Source:test_evaluate.py
1# nuScenes dev-kit.2# Code written by Holger Caesar, 2019.3import json4import os5import random6import shutil7import sys8import unittest9from typing import Dict, Optional, Any10import numpy as np11from tqdm import tqdm12from nuscenes import NuScenes13from nuscenes.eval.common.config import config_factory14from nuscenes.eval.tracking.constants import TRACKING_NAMES15from nuscenes.eval.tracking.evaluate import TrackingEval16from nuscenes.eval.tracking.utils import category_to_tracking_name17from nuscenes.utils.splits import create_splits_scenes18class TestMain(unittest.TestCase):19 res_mockup = 'nusc_eval.json'20 res_eval_folder = 'tmp'21 def tearDown(self):22 if os.path.exists(self.res_mockup):23 os.remove(self.res_mockup)24 if os.path.exists(self.res_eval_folder):25 shutil.rmtree(self.res_eval_folder)26 @staticmethod27 def _mock_submission(nusc: NuScenes,28 split: str,29 add_errors: bool = False) -> Dict[str, dict]:30 """31 Creates "reasonable" submission (results and metadata) by looping through the mini-val set, adding 1 GT32 prediction per sample. Predictions will be permuted randomly along all axes.33 :param nusc: NuScenes instance.34 :param split: Dataset split to use.35 :param add_errors: Whether to use GT or add errors to it.36 """37 def random_class(category_name: str, _add_errors: bool = False) -> Optional[str]:38 # Alter 10% of the valid labels.39 class_names = sorted(TRACKING_NAMES)40 tmp = category_to_tracking_name(category_name)41 if tmp is None:42 return None43 else:44 if not _add_errors or np.random.rand() < .9:45 return tmp46 else:47 return class_names[np.random.randint(0, len(class_names) - 1)]48 def random_id(instance_token: str, _add_errors: bool = False) -> str:49 # Alter 10% of the valid ids to be a random string, which hopefully corresponds to a new track.50 if not _add_errors or np.random.rand() < .9:51 _tracking_id = instance_token + '_pred'52 else:53 _tracking_id = str(np.random.randint(0, sys.maxsize))54 return _tracking_id55 mock_meta = {56 'use_camera': False,57 'use_lidar': True,58 'use_radar': False,59 'use_map': False,60 'use_external': False,61 }62 mock_results = {}63 # Get all samples in the current evaluation split.64 splits = create_splits_scenes()65 val_samples = []66 for sample in nusc.sample:67 if nusc.get('scene', sample['scene_token'])['name'] in splits[split]:68 val_samples.append(sample)69 # Prepare results.70 instance_to_score = dict()71 for sample in tqdm(val_samples, leave=False):72 sample_res = []73 for ann_token in sample['anns']:74 ann = nusc.get('sample_annotation', ann_token)75 translation = np.array(ann['translation'])76 size = np.array(ann['size'])77 rotation = np.array(ann['rotation'])78 velocity = nusc.box_velocity(ann_token)[:2]79 tracking_id = random_id(ann['instance_token'], _add_errors=add_errors)80 tracking_name = random_class(ann['category_name'], _add_errors=add_errors)81 # Skip annotations for classes not part of the detection challenge.82 if tracking_name is None:83 continue84 # Skip annotations with 0 lidar/radar points.85 num_pts = ann['num_lidar_pts'] + ann['num_radar_pts']86 if num_pts == 0:87 continue88 # If we randomly assign a score in [0, 1] to each box and later average over the boxes in the track,89 # the average score will be around 0.5 and we will have 0 predictions above that.90 # Therefore we assign the same scores to each box in a track.91 if ann['instance_token'] not in instance_to_score:92 instance_to_score[ann['instance_token']] = random.random()93 tracking_score = instance_to_score[ann['instance_token']]94 tracking_score = np.clip(tracking_score + random.random() * 0.3, 0, 1)95 if add_errors:96 translation += 4 * (np.random.rand(3) - 0.5)97 size *= (np.random.rand(3) + 0.5)98 rotation += (np.random.rand(4) - 0.5) * .199 velocity *= np.random.rand(3)[:2] + 0.5100 sample_res.append({101 'sample_token': sample['token'],102 'translation': list(translation),103 'size': list(size),104 'rotation': list(rotation),105 'velocity': list(velocity),106 'tracking_id': tracking_id,107 'tracking_name': tracking_name,108 'tracking_score': tracking_score109 })110 mock_results[sample['token']] = sample_res111 mock_submission = {112 'meta': mock_meta,113 'results': mock_results114 }115 return mock_submission116 @unittest.skip117 def basic_test(self,118 eval_set: str = 'mini_val',119 add_errors: bool = False,120 render_curves: bool = False) -> Dict[str, Any]:121 """122 Run the evaluation with fixed randomness on the specified subset, with or without introducing errors in the123 submission.124 :param eval_set: Which split to evaluate on.125 :param add_errors: Whether to use GT as submission or introduce additional errors.126 :param render_curves: Whether to render stats curves to disk.127 :return: The metrics returned by the evaluation.128 """129 random.seed(42)130 np.random.seed(42)131 assert 'NUSCENES' in os.environ, 'Set NUSCENES env. variable to enable tests.'132 if eval_set.startswith('mini'):133 version = 'v1.0-mini'134 elif eval_set == 'test':135 version = 'v1.0-test'136 else:137 version = 'v1.0-trainval'138 nusc = NuScenes(version=version, dataroot=os.environ['NUSCENES'], verbose=False)139 with open(self.res_mockup, 'w') as f:140 mock = self._mock_submission(nusc, eval_set, add_errors=add_errors)141 json.dump(mock, f, indent=2)142 cfg = config_factory('tracking_nips_2019')143 nusc_eval = TrackingEval(cfg, self.res_mockup, eval_set=eval_set, output_dir=self.res_eval_folder,144 nusc_version=version, nusc_dataroot=os.environ['NUSCENES'], verbose=False)145 metrics = nusc_eval.main(render_curves=render_curves)146 return metrics147 @unittest.skip148 def test_delta_mock(self,149 eval_set: str = 'mini_val',150 render_curves: bool = False):151 """152 This tests runs the evaluation for an arbitrary random set of predictions.153 This score is then captured in this very test such that if we change the eval code,154 this test will trigger if the results changed.155 :param eval_set: Which set to evaluate on.156 :param render_curves: Whether to render stats curves to disk.157 """158 # Run the evaluation with errors.159 metrics = self.basic_test(eval_set, add_errors=True, render_curves=render_curves)160 # Compare metrics to known solution.161 if eval_set == 'mini_val':162 self.assertAlmostEqual(metrics['amota'], 0.23766771095785147)163 self.assertAlmostEqual(metrics['amotp'], 1.5275400961369252)164 self.assertAlmostEqual(metrics['motar'], 0.3726570200013319)165 self.assertAlmostEqual(metrics['mota'], 0.25003943918566174)166 self.assertAlmostEqual(metrics['motp'], 1.2976508610883917)167 else:168 print('Skipping checks due to choice of custom eval_set: %s' % eval_set)169 @unittest.skip170 def test_delta_gt(self,171 eval_set: str = 'mini_val',172 render_curves: bool = False):173 """174 This tests runs the evaluation with the ground truth used as predictions.175 This should result in a perfect score for every metric.176 This score is then captured in this very test such that if we change the eval code,177 this test will trigger if the results changed.178 :param eval_set: Which set to evaluate on.179 :param render_curves: Whether to render stats curves to disk.180 """181 # Run the evaluation without errors.182 metrics = self.basic_test(eval_set, add_errors=False, render_curves=render_curves)183 # Compare metrics to known solution. Do not check:184 # - MT/TP (hard to figure out here).185 # - AMOTA/AMOTP (unachieved recall values lead to hard unintuitive results).186 if eval_set == 'mini_val':187 self.assertAlmostEqual(metrics['amota'], 1.0)188 self.assertAlmostEqual(metrics['amotp'], 0.0, delta=1e-5)189 self.assertAlmostEqual(metrics['motar'], 1.0)190 self.assertAlmostEqual(metrics['recall'], 1.0)191 self.assertAlmostEqual(metrics['mota'], 1.0)192 self.assertAlmostEqual(metrics['motp'], 0.0, delta=1e-5)193 self.assertAlmostEqual(metrics['faf'], 0.0)194 self.assertAlmostEqual(metrics['ml'], 0.0)195 self.assertAlmostEqual(metrics['fp'], 0.0)196 self.assertAlmostEqual(metrics['fn'], 0.0)197 self.assertAlmostEqual(metrics['ids'], 0.0)198 self.assertAlmostEqual(metrics['frag'], 0.0)199 self.assertAlmostEqual(metrics['tid'], 0.0)200 self.assertAlmostEqual(metrics['lgd'], 0.0)201 else:202 print('Skipping checks due to choice of custom eval_set: %s' % eval_set)203if __name__ == '__main__':...
interface.py
Source:interface.py
...73 vars.update({'errors': self._errors, 'data': self._data, 'messages': self._messages,74 'wallabag_host': self._cfg.wallabag_host,75 'tags': [t.tag for t in wallabag.make_tags(self._cfg.tag)]})76 return vars77 def _add_errors(self, errors):78 self._errors.update(errors)79 def _set_data(self, data):80 self._data = data81 def _add_message(self, msg):82 self._messages.append(msg)83 @property84 def _session(self):85 return self.request.app['session_maker']86class IndexView(ViewBase):87 @aiohttp_jinja2.template("index.html")88 async def get(self):89 return self._template({})90 @aiohttp_jinja2.template("index.html")91 async def post(self):92 data = await self.request.post()93 self._set_data(data)94 validator = Validator(self.request.app.loop, data)95 await asyncio.gather(validator.validate_emails(),96 validator.validate_credentials())97 self._add_errors(validator.errors)98 if validator.success:99 user = models.User(name=validator.username, kindle_mail=validator.kindle_email,100 email=validator.notify_email)101 with self._session as session:102 if session.query(models.User.name).filter(models.User.name == validator.username).count() != 0:103 self._add_errors({'user': "User is already registered"})104 elif not await self._wallabag.get_token(user, validator.password):105 self._add_errors({'auth': 'Cannot authenticate at wallabag server to get a token'})106 else:107 session.add(user)108 session.commit()109 self._add_message(f'User {validator.username} successfully registered')110 self._set_data({})111 logger.info("User {user} registered", user=validator.username)112 return self._template({})113class ReLoginView(ViewBase):114 @aiohttp_jinja2.template("relogin.html")115 async def get(self):116 return self._template({'action': 'update', 'description': 'Refresh'})117 @aiohttp_jinja2.template("relogin.html")118 async def post(self):119 data = await self.request.post()120 self._set_data(data)121 validator = Validator(self.request.app.loop, data)122 await validator.validate_credentials()123 self._add_errors(validator.errors)124 if validator.success:125 with self._session as session:126 user = session.query(models.User).filter(models.User.name == validator.username).first()127 if user is None:128 self._add_errors({'user': 'User not registered'})129 else:130 if await self._wallabag.get_token(user, validator.password):131 user.active = True132 session.commit()133 self._add_message(f"User {validator.username} successfully updated.")134 logger.info("User {user} successfully updated.", user=user)135 else:136 self._add_errors({'auth': "Authentication against wallabag server failed"})137 return self._template({'action': 'update', 'description': 'Refresh'})138class DeleteView(ViewBase):139 @aiohttp_jinja2.template("relogin.html")140 async def get(self):141 return self._template({'action': 'delete', 'description': 'Delete'})142 @aiohttp_jinja2.template("relogin.html")143 async def post(self):144 data = await self.request.post()145 self._set_data(data)146 validator = Validator(self.request.app.loop, data)147 await validator.validate_credentials()148 self._add_errors(validator.errors)149 if validator.success:150 with self._session as session:151 user = session.query(models.User).filter(models.User.name == validator.username).first()152 if user is None:153 self._add_errors({'user': 'User not registered'})154 else:155 if await self._wallabag.get_token(user, validator.password):156 session.delete(user)157 session.commit()158 self._add_message(f"User {validator.username} successfully deleted.")159 logger.info("User {user} successfully deleted.", user=user)160 else:161 self._add_errors({'auth': "Authentication against wallabag server failed"})162 return self._template({'action': 'delete', 'description': 'Delete'})163class App:164 def __init__(self, config, wallabag):165 self.config = config166 self.wallabag = wallabag167 self.app = web.Application()168 self.site = None # type: web.TCPSite169 self.setup_app()170 self.setup_routes()171 def setup_app(self):172 self.app['config'] = self.config173 self.app['wallabag'] = self.wallabag174 self.app['session_maker'] = models.context_session(self.config)175 aiohttp_jinja2.setup(...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!