Best Python code snippet using locust
tests.py
Source:tests.py
1import os2import application3import unittest4import tempfile5from app.services import *6from db.import_data.import_geonode import *7from pymongo import MongoClient8from mock import MagicMock9class LocatorTestCase(unittest.TestCase):10 def test_should_find_level_name_of_locator(self):11 self.assertEqual(Locator("UGANDA").level_name(), "national")12 self.assertEqual(Locator("UGANDA, ACHOLI").level_name(), "region")13 self.assertEqual(Locator("UGANDA, ACHOLI, GULU").level_name(), "district")14 self.assertEqual(Locator("UGANDA, ACHOLI, GULU, PATIKO").level_name(), "subcounty")15 self.assertEqual(Locator("UGANDA, ACHOLI, GULU, PATIKO, OTHER").level_name(), "parish")16 def test_should_find_child_level_name_of_locator(self):17 self.assertEqual(Locator("UGANDA").child_level_name(), "region")18 self.assertEqual(Locator("UGANDA, ACHOLI").child_level_name(), "district")19 self.assertEqual(Locator("UGANDA, ACHOLI, GULU").child_level_name(), "subcounty")20 self.assertEqual(Locator("UGANDA, ACHOLI, GULU, PATIKO").child_level_name(), "parish")21 self.assertEqual(Locator("UGANDA, ACHOLI, GULU, PATIKO, OTHER").child_level_name(), None)22class LocationServiceTestCase(unittest.TestCase):23 def setUp(self):24 self.db = MongoClient().location_tree25 self.db.location_tree.remove()26 self.db.location_tree.insert({27 "_id": "UGANDA, ACHOLI", 28 "type": "region",29 "location": { 30 "national": "UGANDA",31 "region": "ACHOLI"32 }})33 self.db.location_tree.insert({34 "_id": "UGANDA, ACHOLI, GULU", 35 "type": "district",36 "location": { 37 "national": "UGANDA",38 "region": "ACHOLI",39 "district": "GULU"40 }})41 self.db.location_tree.insert({42 "_id": "UGANDA, ACHOLI, AMURU", 43 "type": "district",44 "location": { 45 "national": "UGANDA",46 "region": "ACHOLI",47 "district": "AMURU"48 }})49 def test_find_children_of_location(self):50 locationService = LocationService(self.db)51 children = locationService.children(Locator("UGANDA, ACHOLI"))52 self.assertEqual(list((child['_id'] for child in children['children'])), ["UGANDA, ACHOLI, GULU", "UGANDA, ACHOLI, AMURU"])53class SiteVistServiceTestCase(unittest.TestCase):54 def setUp(self):55 self.db = MongoClient().devtrac2_test56 self.db.site_visits.remove()57 self.db.site_visits.insert({"_id": 1})58 self.db.site_visits.insert({"_id": 2})59 def test_should_find_all_questions(self):60 siteVisitService = SiteVisitService(self.db)61 self.assertEqual([siteVisit['_id'] for siteVisit in siteVisitService.all()], [1, 2])62class UReportTestCase(unittest.TestCase):63 def setUp(self):64 self.db = MongoClient().devtrac2_test65 self.db.ureport_questions.remove()66 self.db.ureport_responses.remove()67 self.db.ureport_questions.insert({"_id": 1, "question": "a_question"})68 self.db.ureport_questions.insert({"_id": 2, "question": "a_question_2"})69 self.db.ureport_responses.insert({70 "_id": 1, 71 "location": {72 "region": "ACHOLI"73 },74 "text": "text1",75 "poll_id": 176 })77 self.db.ureport_responses.insert({78 "_id": 8, 79 "location": {80 "region": "ACHOLI",81 "district": "GULU"82 },83 "text": "text8",84 "poll_id": 285 })86 self.db.ureport_responses.insert({87 "_id": 2, 88 "location": {89 "region": "ACHOLI",90 "district": "GULU"91 },92 "text": "text2",93 "poll_id": 194 })95 self.db.ureport_responses.insert({96 "_id": 3, 97 "location": {98 "region": "ACHOLI",99 "district": "GULU",100 "parish": "OTHER"101 },102 "text": "text3",103 "poll_id": 1104 })105 self.db.ureport_responses.insert({106 "_id": 4, 107 "location": {108 "region": "ACHOLI",109 "district": "GULU",110 "parish": "PATIKO"111 },112 "text": "text4",113 "poll_id": 1114 })115 self.db.ureport_responses.insert({116 "_id": 5, 117 "location": {118 "region": "KAMPALA"119 },120 "text": "text7",121 "poll_id": 1122 })123 self.db.ureport_responses.insert({124 "_id": 6, 125 "location": {126 "region": "ACHOLI",127 "district": "GULU"128 },129 "text": "text5",130 "poll_id": 1131 })132 self.db.ureport_responses.insert({133 "_id": 7, 134 "location": {135 "region": "ACHOLI",136 "district": "GULU"137 },138 "text": "text6",139 "poll_id": 1140 })141 def test_should_find_all_questions(self):142 ureportService = UReportService(self.db)143 self.assertEqual(len(ureportService.questions()), 2)144 self.assertEqual(ureportService.questions()[0]['_id'], 1); 145 self.assertEqual(ureportService.questions()[0]['question'], "a_question"); 146 def test_should_find_top5_for_district(self):147 ureportService = UReportService(self.db)148 responses = [ report['text'] for report in ureportService.top5(Locator("UGANDA, ACHOLI"), 1)]149 self.assertEqual(responses, ["text1", "text2", "text3", "text4", "text5"])150test_features = [151 {'properties' : 152 { 153 'Reg_2011': 'test_region',154 'DNAME_2010': "test_district",155 'SNAME_2010': "test_subcounty",156 'PNAME_2006': "test_parish"157 } 158 },159 {'properties' : 160 { 161 'Reg_2011': 'test_region',162 'DNAME_2010': "test_district",163 'SNAME_2010': "test_subcounty",164 'PNAME_2006': "test_parish"165 } 166 },167 {'properties' : 168 { 169 'Reg_2011': 'test_region',170 'DNAME_2010': "test_district",171 'SNAME_2010': "test_subcounty_2",172 'PNAME_2006': "test_parish_2"173 } 174 },175 {'properties' : 176 { 177 'Reg_2011': 'test_region',178 'DNAME_2010': "test_district_2",179 'SNAME_2010': "test_subcounty_2",180 'PNAME_2006': "test_parish_3"181 } 182 },183]184class ImportDataTestCase(unittest.TestCase):185 def setUp(self):186 self.database = MongoClient().devtrac2_test187 def test_should_import_and_aggregate_data(self):188 wfs_service = MagicMock()189 wfs_service.get_features.return_value = test_features190 import_dataset(wfs_service, self.database, "test", "test_feature")191 test_aggregation = self.database.test_aggregation192 self.assertEqual(self.database.test.count(), 4)193 self.assertEqual(test_aggregation.count(), 10)194 self.assertEqual(test_aggregation.find({'_id': 'UGANDA, test_region'})[0]['value'], 4) 195 self.assertEqual(test_aggregation.find({'_id': 'UGANDA, test_region, test_district'})[0]['value'], 3) 196 self.assertEqual(test_aggregation.find({'_id': 'UGANDA, test_region, test_district_2'})[0]['value'], 1) 197 self.assertEqual(test_aggregation.find({'_id': 'UGANDA, test_region, test_district, test_subcounty'})[0]['value'], 2)198 self.assertEqual(test_aggregation.find({'_id': 'UGANDA, test_region, test_district, test_subcounty_2'})[0]['value'], 1)199 self.assertEqual(test_aggregation.find({'_id': 'UGANDA, test_region, test_district_2, test_subcounty_2'})[0]['value'], 1)200 self.assertEqual(test_aggregation.find({'_id': 'UGANDA, test_region, test_district, test_subcounty, test_parish'})[0]['value'], 2)201 self.assertEqual(test_aggregation.find({'_id': 'UGANDA, test_region, test_district, test_subcounty_2, test_parish_2'})[0]['value'], 1)202 self.assertEqual(test_aggregation.find({'_id': 'UGANDA, test_region, test_district_2, test_subcounty_2, test_parish_3'})[0]['value'], 1)203 self.assertEqual(test_aggregation.find({'_id': 'UGANDA'})[0]['value'], 4)204 def test_should_create_location_tree(self):205 wfs_service = MagicMock()206 wfs_service.get_features.return_value = test_features207 import_locationTree(wfs_service, self.database)208 location_tree = self.database.location_tree209 self.assertEqual(location_tree.find({"type": "parish", "location.subcounty": "test_subcounty_2"}).count(), 2)210 self.assertEqual(location_tree.find({"type": "subcounty", "location.district": "test_district"}).count(), 2)211 self.assertEqual(location_tree.find({"type": "district", "location.region": "test_region"}).count(), 2)212 self.assertEqual(location_tree.find({"type": "region"}).count(), 1)213if __name__ == '__main__':...
config.py
Source:config.py
1#2# Copyright 2017 Wooga GmbH3#4# Permission is hereby granted, free of charge, to any person obtaining a copy of5# this software and associated documentation files (the "Software"), to deal in6# the Software without restriction, including without limitation the rights to7# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies8# of the Software, and to permit persons to whom the Software is furnished to do9# so, subject to the following conditions:10#11# The above copyright notice and this permission notice shall be included in all12# copies or substantial portions of the Software.13#14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR15# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,16# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE17# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER18# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE20# SOFTWARE.21from datetime import datetime, date22class ConfigHelper(dict):23 @staticmethod24 def __default_config():25 return {26 'context': {27 'defaults': {},28 },29 'targets': {30 'test_table': {31 'start_date': datetime.now(),32 'schema': 'test_schema',33 'key_columns': ['key_column'],34 'aggregated_columns': {35 'test_aggregation': {36 'test_column': {37 'column_name': 'test_src_column',38 'update_type': 'REPLACE',39 },40 'another_test_column': {41 'column_name': 'another_test_src_column',42 'update_type': 'MAX',43 },44 },45 'another_aggregation': {46 'another_aggregation_test_column': {47 'column_name': 'another_aggregation_test_src_column',48 'update_type': 'KEEP',49 }50 },51 },52 },53 'another_table': {54 'start_date': datetime.now(),55 'schema': 'another_test_schema',56 'key_columns': ['key_column'],57 'aggregated_columns': {58 'test_aggregation': {59 'test_column': {60 'column_name': 'another_table_test_src_column',61 'update_type': 'MIN',62 },63 },64 },65 },66 },67 'aggregations': {68 'test_aggregation': {69 'query': "SELECT * FROM DUAL WHERE dt BETWEEN '{{ start_date }}' AND '{{ end_date }}'",70 'time_key': 'test_time_key',71 },72 'another_aggregation': {73 'query': 'SELECT everything FROM here',74 'time_key': 'another_test_time_key',75 },76 },77 }78 def __init__(self, **kwargs):79 super(ConfigHelper, self).__init__(**kwargs)80 self.update(self.__default_config())81 def parameterize_context(self, items=None, with_targets=True):82 if items is None:83 items = {'item': {}, 'another_item': {}}84 self['context']['items'] = items85 self['context']['item_column'] = 'item_column'86 if with_targets:87 for tk, tv in self['targets'].iteritems():88 tv['items'] = '*'89 return self90 def parameterize_aggregation(self, aggregation_id='test_aggregation'):91 self['aggregations'][aggregation_id]['query'] = "SELECT * FROM {{ item }} WHERE dt BETWEEN '{{ start_date }}' AND '{{ end_date }}'"92 return self93 def with_parameter_columns(self):94 params = {95 'date': date(2017, 1, 1),96 'datetime': datetime(2017, 1, 1, 0, 0, 0),97 'number': 42,98 'bool': True,99 }100 if self['context'].get('items'):101 for item, p in self['context'].get('items').iteritems():102 p.update(params)103 else:104 self['context']['defaults'].update(params)105 for tc in self['targets'].values():106 parameter_columns = {}107 for k in params:108 parameter_columns["%s_col" % k] = k109 tc['parameter_columns'] = parameter_columns110 return self111 def with_timeseries(self, target_id='test_table'):112 self['targets'][target_id]['timeseries_key'] = 'timeseries_column'113 return self114 def with_offset(self, offset=1, aggregation_id='test_aggregation'):115 self['aggregations'][aggregation_id]['offset'] = offset116 return self117 def with_reruns(self, reruns=3, aggregation_id='test_aggregation'):118 self['aggregations'][aggregation_id]['reruns'] = reruns...
test_PUMS_demographic_count_aggregation.py
Source:test_PUMS_demographic_count_aggregation.py
1"""Doing aggregation is runtime-intensive so all tests use same aggregator object"""2import pytest3from aggregate.PUMS.count_PUMS_demographics import PUMSCountDemographics4from tests.util import race_counts, age_bucket_counts5from tests.PUMS.local_loader import LocalLoader6local_loader = LocalLoader()7@pytest.mark.test_aggregation8@pytest.mark.test_new_crosstabs9def test_local_loader(all_data):10 """This code to take all_data arg from command line and get the corresponding data has to be put in test because of how pytest works.11 This test exists for the sake of passing all_data arg from command line to local loader, it DOESN'T test anything"""12 local_loader.load_count_aggregator(all_data)13@pytest.mark.test_aggregation14@pytest.mark.test_new_crosstabs15def test_all_counts_sum_to_total_pop():16 aggregator = local_loader.count_aggregator17 for ind in aggregator.indicators_denom:18 assert (19 aggregator.aggregated[20 [f"{category}-count" for category in aggregator.categories[ind]]21 ].sum(axis=1)22 == aggregator.aggregated["total_pop-count"]23 ).all()24@pytest.mark.test_aggregation25def test_that_all_races_sum_to_total_within_indicator():26 """Parameterize this to look at nativity, age buckets as well"""27 lep_race_cols = [28 f"lep-{r}-count" for r in race_counts29 ] # Generate this from function30 assert (31 local_loader.aggregated[lep_race_cols].sum(axis=1)32 == local_loader.aggregated["lep-count"]33 ).all()34@pytest.mark.test_aggregation35def test_that_all_races_sum_to_total():36 assert (37 local_loader.aggregated[race_counts].sum(axis=1)38 == local_loader.aggregated["total_pop-count"]39 ).all()40@pytest.mark.test_aggregation41def test_that_all_age_buckets_sum_to_total():42 assert (43 local_loader.aggregated[age_bucket_counts].sum(axis=1)44 == local_loader.aggregated["total_pop-count"]45 ).all()46@pytest.mark.test_aggregation47def test_total_counts_match():48 citywide_gb = local_loader.by_person.groupby("age_bucket_by_race").agg(49 {"PWGTP": "sum"}50 )51 for variable in citywide_gb.index:52 assert (53 citywide_gb.at[variable, "PWGTP"]54 == local_loader.aggregated[f"{variable}-count"].sum()55 )56@pytest.mark.test_aggregation57def test_age_bucket_assignment_correct():58 """95 is top coded value for age"""59 by_person_data = local_loader.by_person60 assert min(by_person_data[by_person_data["age_bucket"] == "PopU16"]["AGEP"]) == 061 assert max(by_person_data[by_person_data["age_bucket"] == "PopU16"]["AGEP"]) == 1562 assert min(by_person_data[by_person_data["age_bucket"] == "P16t64"]["AGEP"]) == 1663 assert max(by_person_data[by_person_data["age_bucket"] == "P16t64"]["AGEP"]) == 6464 assert min(by_person_data[by_person_data["age_bucket"] == "P65pl"]["AGEP"]) == 6565 assert max(by_person_data[by_person_data["age_bucket"] == "P65pl"]["AGEP"]) == 9566@pytest.mark.test_aggregation67def test_that_fb_correctly_assigned():68 by_person_data = local_loader.by_person69 assert (70 by_person_data[by_person_data["NATIVITY"] == "Native"]["foreign_born"]71 .isna()72 .all()73 )74 assert (75 by_person_data[by_person_data["NATIVITY"] == "Foreign born"]["foreign_born"]76 == "fb"77 ).all()78TEST_CASES = [79 ("20150000144861", "fb_wnh"),80 ("2019HU13853441", "fb_hsp"),81 ("2019HU12654531", "fb_anh"),82 ("20160002802551", "fb_onh"),83 ("2019HU12564633", "fb_bnh"),84]85@pytest.mark.test_aggregation86@pytest.mark.parametrize("person_id, expected_val", TEST_CASES)87def test_that_fb_by_race_correctly_assigned(person_id, expected_val):88 assert local_loader.by_person.loc[person_id]["foreign_born_by_race"] == expected_val89@pytest.mark.test_aggregation90def test_that_native_born_no_fb_by_race():91 by_person_data = local_loader.by_person92 assert (93 by_person_data[by_person_data["NATIVITY"] == "Native"]["foreign_born_by_race"]94 .isna()95 .all()96 )97TEST_RECORDS = [98 ("20150001199221", "lep_hsp"),99 ("20150000267661", "lep_wnh"),100 ("20150002963751", "lep_bnh"),101 ("20150003090011", "lep_anh"),102 ("20160006078411", "lep_onh"),103]104@pytest.mark.test_aggregation105@pytest.mark.parametrize("record_id, expected_val", TEST_RECORDS)106def test_that_LEP_by_race_correctly_assigned(record_id, expected_val):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!