Best Python code snippet using lemoncheesecake
biobank_data_comparison.py
Source:biobank_data_comparison.py
1from dataclasses import dataclass2from datetime import datetime3from protorpc import messages4from typing import List, Optional5import argparse6from dateutil.parser import parse7from sqlalchemy.orm import Session8from rdr_service.app_util import is_datetime_equal9from rdr_service.config import BIOBANK_DATA_COMPARISON_DOCUMENT_ID10from rdr_service.model.biobank_order import BiobankSpecimen11from rdr_service.model.biobank_stored_sample import BiobankStoredSample, SampleStatus12from rdr_service.model.config_utils import to_client_biobank_id13from rdr_service.services.google_sheets_client import GoogleSheetsClient14from rdr_service.tools.tool_libs.tool_base import cli_run, ToolBase15tool_cmd = 'biobank-api-check'16tool_desc = 'Run a comparison of the API data to the Sample Inventory Report data.'17class DifferenceType(messages.Enum):18 MISSING_FROM_SIR = 119 MISSING_FROM_API_DATA = 220 DISPOSAL_DATE = 321 CONFIRMED_DATE = 422 BIOBANK_ID = 523 TEST_CODE = 624 ORDER_ID = 725 STATUS = 826@dataclass27class SamplePair:28 report_data: Optional[BiobankStoredSample]29 api_data: Optional[BiobankSpecimen]30@dataclass31class DifferenceFound:32 sample_pair: SamplePair33 type: int34 def get_report_and_api_values(self):35 report_data = self.sample_pair.report_data36 api_data = self.sample_pair.api_data37 if self.type == DifferenceType.MISSING_FROM_SIR or self.type == DifferenceType.MISSING_FROM_API_DATA:38 return '', ''39 elif self.type == DifferenceType.DISPOSAL_DATE:40 return report_data.disposed, api_data.disposalDate41 elif self.type == DifferenceType.CONFIRMED_DATE:42 return report_data.confirmed, api_data.confirmedDate43 elif self.type == DifferenceType.BIOBANK_ID:44 return (45 to_client_biobank_id(report_data.biobankId),46 to_client_biobank_id(api_data.biobankId)47 )48 elif self.type == DifferenceType.TEST_CODE:49 return report_data.test, api_data.testCode50 elif self.type == DifferenceType.ORDER_ID:51 return report_data.biobankOrderIdentifier, api_data.orderId52 elif self.type == DifferenceType.STATUS:53 api_status = api_data.status54 if api_data.disposalReason:55 api_status += f', {api_data.disposalReason}'56 return report_data.status, api_status57class BiobankSampleComparator:58 def __init__(self, sample_pair: SamplePair):59 self.api_data = sample_pair.api_data60 self.report_data = sample_pair.report_data61 def get_differences(self) -> List[DifferenceFound]:62 discrepancies_found: List[DifferenceFound] = []63 if self.report_data is None:64 discrepancies_found.append(DifferenceFound(65 type=DifferenceType.MISSING_FROM_SIR,66 sample_pair=SamplePair(report_data=self.report_data, api_data=self.api_data)67 ))68 elif self.api_data is None:69 discrepancies_found.append(DifferenceFound(70 type=DifferenceType.MISSING_FROM_API_DATA,71 sample_pair=SamplePair(report_data=self.report_data, api_data=self.api_data)72 ))73 else:74 if self.report_data.biobankId != self.api_data.biobankId:75 discrepancies_found.append(DifferenceFound(76 type=DifferenceType.BIOBANK_ID,77 sample_pair=SamplePair(report_data=self.report_data, api_data=self.api_data)78 ))79 if self.report_data.test != self.api_data.testCode:80 discrepancies_found.append(DifferenceFound(81 type=DifferenceType.TEST_CODE,82 sample_pair=SamplePair(report_data=self.report_data, api_data=self.api_data)83 ))84 if self.report_data.biobankOrderIdentifier != self.api_data.orderId:85 discrepancies_found.append(DifferenceFound(86 type=DifferenceType.ORDER_ID,87 sample_pair=SamplePair(report_data=self.report_data, api_data=self.api_data)88 ))89 if not is_datetime_equal(90 self.report_data.confirmed, self.api_data.confirmedDate, difference_allowed_seconds=360091 ):92 discrepancies_found.append(DifferenceFound(93 type=DifferenceType.CONFIRMED_DATE,94 sample_pair=SamplePair(report_data=self.report_data, api_data=self.api_data)95 ))96 if not is_datetime_equal(97 self.report_data.disposed, self.api_data.disposalDate, difference_allowed_seconds=360098 ):99 discrepancies_found.append((DifferenceFound(100 type=DifferenceType.DISPOSAL_DATE,101 sample_pair=SamplePair(report_data=self.report_data, api_data=self.api_data)102 )))103 if not self.does_status_field_match():104 discrepancies_found.append(DifferenceFound(105 type=DifferenceType.STATUS,106 sample_pair=SamplePair(report_data=self.report_data, api_data=self.api_data)107 ))108 return discrepancies_found109 def does_status_field_match(self):110 report_status = self.report_data.status111 api_status = self.api_data.status112 api_disposal_reason = self.api_data.disposalReason113 if report_status == SampleStatus.CONSUMED and api_status == 'Disposed' and api_disposal_reason == 'Consumed':114 return True115 elif report_status == SampleStatus.QNS_FOR_PROCESSING and \116 api_status == 'Disposed' and api_disposal_reason == 'QNS for Processing':117 return True118 elif report_status == SampleStatus.UNKNOWN and (119 (api_status == 'Disposed' and api_disposal_reason == 'Could Not Process')120 or (api_status == 'Disposed' and api_disposal_reason == 'Consumed')121 or (api_status == 'Disposed' and api_disposal_reason == 'Damaged')122 or (api_status == 'Disposed' and api_disposal_reason == 'No Consent')123 or (api_status == 'Disposed' and api_disposal_reason == 'Missing')124 or (api_status == 'In Circulation' and self.report_data.test == '1PXR2')125 ):126 return True127 elif report_status == SampleStatus.RECEIVED and api_status == 'In Circulation':128 return True129 # elif report_status == SampleStatus.RECEIVED and api_status == 'Disposed':130 # # TODO: make sure this is actually something we want to do131 # # (ignore samples that haven't been updated in a SIR)132 # return True133 elif report_status == SampleStatus.QUALITY_ISSUE and (134 api_status == 'Disposed' and api_disposal_reason == 'Quality Issue'135 ):136 return True137 elif report_status == SampleStatus.ACCESSINGING_ERROR and (138 api_status == 'Disposed' and api_disposal_reason == 'Accessioning Error'139 ):140 return True141 elif report_status == SampleStatus.LAB_ACCIDENT and (142 api_status == 'Disposed' and api_disposal_reason == 'Lab Accident'143 ):144 return True145 elif report_status == SampleStatus.DISPOSED and (146 api_status == 'Disposed' and api_disposal_reason == 'Disposed'147 ):148 return True149 elif report_status == SampleStatus.SAMPLE_NOT_PROCESSED and (150 api_status == 'Disposed' and api_disposal_reason == 'Sample Not Processed'151 ):152 return True153 else:154 return False155 # TODO: samples that are 1SAL2 and disposed on the API but still with a status of 1 on the SIR...156 # these wouldn't get updated on the SIR when the biobank gets it 6weeks later157class BiobankDataCheckTool(ToolBase):158 def __init__(self, *args, **kwargs):159 super(BiobankDataCheckTool, self).__init__(*args, **kwargs)160 self.differences_found: List[DifferenceFound] = []161 self.start_date: Optional[datetime] = None162 self.end_date: Optional[datetime] = None163 self.spreadsheet_id: Optional[str] = None164 def run_process(self):165 # Use default gcp context to interact with the database and find differences166 super(BiobankDataCheckTool, self).run_process()167 # Switch over to a new gcp context using a SA to upload the comparison results to Drive168 with self.initialize_process_context(169 service_account='configurator@all-of-us-rdr-prod.iam.gserviceaccount.com'170 ) as upload_env:171 self.gcp_env = upload_env172 self._upload_differences_found()173 def run(self):174 super(BiobankDataCheckTool, self).run()175 # Parse supplied date range176 self.start_date = parse(self.args.start)177 self.end_date = parse(self.args.end)178 with self.get_session() as session:179 stored_samples = self._get_report_samples(session=session)180 api_samples = self._get_api_samples(session=session)181 sample_pairs = self._get_sample_pairs(182 # Match up the samples by their id, loading any from the database that might183 # have been missed in the date range184 session=session,185 report_samples=stored_samples,186 api_samples=api_samples187 )188 for pair in sample_pairs:189 comparator = BiobankSampleComparator(pair)190 self.differences_found.extend(comparator.get_differences())191 self._print_differences_found()192 # Need to use the current google context to get the spreadsheet id from the server config193 # (the service account used for interacting with the spreadsheet doesn't have access to the config)194 server_config = self.get_server_config()195 self.spreadsheet_id = server_config[BIOBANK_DATA_COMPARISON_DOCUMENT_ID]196 def _get_report_samples(self, session: Session) -> List[BiobankStoredSample]:197 """Get the sample data received from the Sample Inventory Reports"""198 return list(199 session.query(BiobankStoredSample).filter(200 BiobankStoredSample.rdrCreated.between(self.start_date, self.end_date)201 )202 )203 def _get_api_samples(self, session: Session) -> List[BiobankSpecimen]:204 """Get the sample data received from the Specimen API"""205 return list(206 session.query(BiobankSpecimen).filter(207 BiobankSpecimen.created.between(self.start_date, self.end_date)208 )209 )210 @classmethod211 def _get_sample_pairs(cls, report_samples: List[BiobankStoredSample], api_samples: List[BiobankSpecimen],212 session: Session) -> List[SamplePair]:213 """214 Creates SamplePairs for all the provided samples.215 If a sample is found in one but not the other, then the database will be checked to see if the counterpart216 might have just been outside the date range.217 """218 sample_pairs = []219 # Create SamplePairs for all the api samples found in the report data220 report_samples_id_map = {sample.biobankStoredSampleId: sample for sample in report_samples}221 for api_sample in api_samples:222 report_sample = None223 if api_sample.rlimsId in report_samples_id_map:224 report_sample = report_samples_id_map[api_sample.rlimsId]225 del report_samples_id_map[api_sample.rlimsId]226 sample_pairs.append(227 SamplePair(api_data=api_sample, report_data=report_sample)228 )229 # Any samples still left in the report map didn't have counter parts in the api data230 for report_sample in report_samples_id_map.values():231 sample_pairs.append(232 SamplePair(api_data=None, report_data=report_sample)233 )234 # Check the database to see if any samples from the report match something from the api235 pairs_missing_api_data_map = {236 pair.report_data.biobankStoredSampleId: pair237 for pair in sample_pairs if pair.api_data is None238 }239 sample_ids = pairs_missing_api_data_map.keys()240 db_api_data: List[BiobankSpecimen] = session.query(BiobankSpecimen).filter(241 BiobankSpecimen.rlimsId.in_(sample_ids)242 ).all()243 for api_sample in db_api_data:244 pair = pairs_missing_api_data_map[api_sample.rlimsId]245 pair.api_data = api_sample246 # ... and check the inverse, making sure any api samples that fell within the time range didn't get missed247 # because of a date difference248 pairs_missing_report_data_map = {249 pair.api_data.rlimsId: pair250 for pair in sample_pairs if pair.report_data is None251 }252 sample_ids = pairs_missing_report_data_map.keys()253 db_report_data: List[BiobankStoredSample] = session.query(BiobankStoredSample).filter(254 BiobankStoredSample.biobankStoredSampleId.in_(sample_ids)255 )256 for report_sample in db_report_data:257 pair = pairs_missing_report_data_map[report_sample.biobankStoredSampleId]258 pair.report_data = report_sample259 return sample_pairs260 def _print_differences_found(self):261 for diff in self.differences_found:262 specimen = diff.sample_pair.api_data263 sample = diff.sample_pair.report_data264 if diff.type == DifferenceType.STATUS:265 print(f'{specimen.rlimsId} STATUS -- '266 f'API: "{specimen.status}, {specimen.disposalReason}" '267 f'SIR: "{str(sample.status)}"')268 elif diff.type == DifferenceType.DISPOSAL_DATE:269 if sample.status == SampleStatus.RECEIVED and specimen.status == 'Disposed':270 ...271 else:272 print(f'{specimen.rlimsId} DISPOSAL DATE -- API: {specimen.disposalDate} SIR: {sample.disposed}')273 elif diff.type == DifferenceType.CONFIRMED_DATE:274 print(f'{specimen.rlimsId} CONFIRMED DATE -- API: {specimen.confirmedDate} SIR: {sample.confirmed}')275 else:276 sample_id = sample.biobankStoredSampleId if sample else specimen.rlimsId277 print(f'{sample_id} -- {str(diff.type)}')278 def _upload_differences_found(self):279 with GoogleSheetsClient(280 spreadsheet_id=self.spreadsheet_id,281 service_key_id=self.gcp_env.service_key_id282 ) as client:283 new_tab_name = f'{self.start_date.date()} to {self.end_date.date()}'284 client.add_new_tab(new_tab_name)285 client.set_current_tab(new_tab_name)286 # write headers287 client.update_cell(0, 0, 'sample id')288 client.update_cell(0, 1, 'difference found')289 client.update_cell(0, 2, 'data found from SIR')290 client.update_cell(0, 3, 'data found from API')291 for index, diff in enumerate(self.differences_found):292 report_sample = diff.sample_pair.report_data293 api_sample = diff.sample_pair.api_data294 sample_id = report_sample.biobankStoredSampleId if report_sample else api_sample.rlimsId295 report_value, api_value = diff.get_report_and_api_values()296 row_to_update = index + 1297 client.update_cell(row=row_to_update, col=0, value=sample_id)298 client.update_cell(row=row_to_update, col=1, value=str(diff.type))299 client.update_cell(row=row_to_update, col=2, value=str(report_value))300 client.update_cell(row=row_to_update, col=3, value=str(api_value))301def add_additional_arguments(parser: argparse.ArgumentParser):302 parser.add_argument('--start', required=True)303 parser.add_argument('--end', required=True)304def run():...
test_report.py
Source:test_report.py
...130def test_check_report_message_template_ko():131 with pytest.raises(ValueError):132 assert check_report_message_template("{invalid_var}'")133@pytest.fixture()134def report_sample():135 @lcc.suite()136 class suite:137 @lcc.test()138 def test_1(self):139 pass140 @lcc.test()141 @lcc.disabled()142 def test_2(self):143 pass144 @lcc.test()145 def test_3(self):146 lcc.log_error("some issue")147 @lcc.test()148 def test_4(self):...
test_health_check_api.py
Source:test_health_check_api.py
1#!/usr/bin/python2#3# Copyright 2014 Huawei Technologies Co. Ltd4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16"""Test health check api."""17import os18import simplejson as json19import unittest220os.environ['COMPASS_IGNORE_SETTING'] = 'true'21from compass.utils import setting_wrapper as setting22reload(setting)23from test_api import ApiTestCase24from compass.db.api import cluster as cluster_db25from compass.db.api import health_check_report as health_check_db26from compass.utils import flags27from compass.utils import logsetting28report_sample = {29 "report": {30 "actions": {31 "neutron.create_network": {32 "duration": {33 "data": [1.105, 0.973],34 "summary": {35 "errors": 0,36 "success": "100.0%",37 "min (sec)": 0.973,38 "avg (sec)": 1.04,39 "max (sec)": 1.105,40 "total": 241 }42 }43 },44 "neutron.delete_network": {45 "duration": {46 "data": [1.038, 0.842],47 "summary": {48 "errors": 0,49 "success": "100.0%",50 "min (sec)": 0.842,51 "avg (sec)": 0.940,52 "max (sec)": 1.038,53 "total": 254 }55 }56 }57 },58 "errors_info": []59 },60 "raw_output": {}61}62api_resp_tpml = {63 "cluster_id": 1,64 "name": "sample_name",65 "report": {},66 "state": "verifying",67 "errors_message": ""68}69class TestHealthCheckAPI(ApiTestCase):70 """Test health check api."""71 def setUp(self):72 super(TestHealthCheckAPI, self).setUp()73 self.cluster_id = 174 self.url = '/clusters/%s/healthreports' % self.cluster_id75 def tearDown(self):76 super(TestHealthCheckAPI, self).tearDown()77 def test_add_and_list_reports(self):78 # Create multiple reports79 reports_list = [80 {'name': 'rp1', 'category': 'c1'},81 {'name': 'rp2', 'category': 'c2'},82 {'name': 'rp3', 'category': 'c3'}83 ]84 request_data = json.dumps({"report_list": reports_list})85 return_value = self.test_client.post(self.url, data=request_data)86 resp = json.loads(return_value.get_data())87 self.assertEqual(200, return_value.status_code)88 self.assertEqual(3, len(resp))89 # Create one report90 request_data = json.dumps({'name': 'rp4 test'})91 return_value = self.test_client.post(self.url, data=request_data)92 resp = json.loads(return_value.get_data())93 self.assertEqual(200, return_value.status_code)94 self.assertEqual('rp4-test', resp['name'])95 # Create duplicate report96 return_value = self.test_client.post(self.url, data=request_data)97 self.assertEqual(409, return_value.status_code)98 # List all reports99 return_value = self.test_client.get(self.url)100 resp = json.loads(return_value.get_data())101 self.assertEqual(200, return_value.status_code)102 self.assertEqual(4, len(resp))103 def test_update_and_get_health_report(self):104 report_name = 'test-report'105 health_check_db.add_report_record(self.cluster_id, name=report_name)106 url = '/'.join((self.url, report_name))107 request_data = json.dumps(108 {"report": report_sample, "state": "finished"}109 )110 return_value = self.test_client.put(url, data=request_data)111 resp = json.loads(return_value.get_data())112 self.maxDiff = None113 self.assertEqual(200, return_value.status_code)114 self.assertDictEqual(report_sample, resp['report'])115 return_value = self.test_client.put(url, data=request_data)116 self.assertEqual(403, return_value.status_code)117 # Get report118 return_value = self.test_client.get(url)119 self.assertEqual(200, return_value.status_code)120 self.assertDictEqual(report_sample, resp['report'])121 def test_action_start_check_health(self):122 url = '/clusters/%s/action' % self.cluster_id123 request_data = json.dumps({'check_health': None})124 # Cluster's state is not 'SUCCESSFUL' yet.125 return_value = self.test_client.post(url, data=request_data)126 self.assertEqual(403, return_value.status_code)127 # Cluster has been deployed successfully.128 cluster_db.update_cluster_state(129 self.cluster_id, state='SUCCESSFUL'130 )131 return_value = self.test_client.post(url, data=request_data)132 self.assertEqual(202, return_value.status_code)133if __name__ == '__main__':134 flags.init()135 logsetting.init()...
report_sample_test.py
Source:report_sample_test.py
1#!/usr/bin/env python32#3# Copyright (C) 2021 The Android Open Source Project4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16import os17import re18import tempfile19from typing import Set20from . test_utils import TestBase, TestHelper21class TestReportSample(TestBase):22 def test_no_flags(self):23 got = self.run_cmd(24 ['report_sample.py',25 '-i',26 TestHelper.testdata_path('perf_display_bitmaps.data')],27 return_output=True)28 got = got.replace('\r', '')29 with open(TestHelper.testdata_path('perf_display_bitmaps.perf-script')) as f:30 want = f.read()31 self.assertEqual(got, want)32 def test_comm_filter_to_renderthread(self):33 got = self.run_cmd(34 ['report_sample.py',35 '-i',36 TestHelper.testdata_path('perf_display_bitmaps.data'),37 '--comm', 'RenderThread'],38 return_output=True)39 got = got.replace('\r', '')40 self.assertIn('RenderThread', got)41 self.assertNotIn('com.example.android.displayingbitmaps', got)42 with open(TestHelper.testdata_path('perf_display_bitmaps.RenderThread.perf-script')) as f:43 want = f.read()44 self.assertEqual(got, want)45 def test_comm_filter_to_ui_thread(self):46 got = self.run_cmd(47 ['report_sample.py',48 '-i',49 TestHelper.testdata_path('perf_display_bitmaps.data'),50 '--comm', 'com.example.android.displayingbitmaps'],51 return_output=True)52 got = got.replace('\r', '')53 self.assertIn('com.example.android.displayingbitmaps', got)54 self.assertNotIn('RenderThread', got)55 with open(TestHelper.testdata_path('perf_display_bitmaps.UiThread.perf-script')) as f:56 want = f.read()57 self.assertEqual(got, want)58 def test_header(self):59 got = self.run_cmd(60 ['report_sample.py',61 '-i',62 TestHelper.testdata_path('perf_display_bitmaps.data'),63 '--header'],64 return_output=True)65 got = got.replace('\r', '')66 with open(TestHelper.testdata_path('perf_display_bitmaps.header.perf-script')) as f:67 want = f.read()68 self.assertEqual(got, want)69 def test_trace_offcpu(self):70 got = self.run_cmd(71 ['report_sample.py', '-i', TestHelper.testdata_path('perf_with_trace_offcpu_v2.data'),72 '--trace-offcpu', 'on-cpu'], return_output=True)73 self.assertIn('cpu-clock:u', got)74 self.assertNotIn('sched:sched_switch', got)75 def test_sample_filters(self):76 def get_threads_for_filter(filter: str) -> Set[int]:77 report = self.run_cmd(78 ['report_sample.py', '-i', TestHelper.testdata_path('perf_display_bitmaps.data')] +79 filter.split(), return_output=True)80 pattern = re.compile(r'\s+31850/(\d+)\s+')81 threads = set()82 for m in re.finditer(pattern, report):83 threads.add(int(m.group(1)))84 return threads85 self.assertNotIn(31850, get_threads_for_filter('--exclude-pid 31850'))86 self.assertIn(31850, get_threads_for_filter('--include-pid 31850'))87 self.assertIn(31850, get_threads_for_filter('--pid 31850'))88 self.assertNotIn(31881, get_threads_for_filter('--exclude-tid 31881'))89 self.assertIn(31881, get_threads_for_filter('--include-tid 31881'))90 self.assertIn(31881, get_threads_for_filter('--tid 31881'))91 self.assertNotIn(31881, get_threads_for_filter(92 '--exclude-process-name com.example.android.displayingbitmaps'))93 self.assertIn(31881, get_threads_for_filter(94 '--include-process-name com.example.android.displayingbitmaps'))95 self.assertNotIn(31850, get_threads_for_filter(96 '--exclude-thread-name com.example.android.displayingbitmaps'))97 self.assertIn(31850, get_threads_for_filter(98 '--include-thread-name com.example.android.displayingbitmaps'))99 with tempfile.NamedTemporaryFile('w', delete=False) as filter_file:100 filter_file.write('GLOBAL_BEGIN 684943449406175\nGLOBAL_END 684943449406176')101 filter_file.flush()102 threads = get_threads_for_filter('--filter-file ' + filter_file.name)103 self.assertIn(31881, threads)104 self.assertNotIn(31850, threads)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!