Best Python code snippet using tempest_python
AssessorToDb.py
Source:AssessorToDb.py
...12 def __init__(self):13 self.working_dir = './working'14 self.file_dict = {}15 self.schema = ''16 def create_working_dir(self):17 try:18 if not os.path.isdir(self.working_dir):19 os.mkdir(self.working_dir)20 except Exception as e:21 print('error in create_working_dir routine')22 raise23 def cleanup_working_dir(self):24 try: 25 if os.path.isdir(self.working_dir):26 shutil.rmtree(self.working_dir)27 except Exception as e:28 print('error in cleanup_working_dir routine')29 raise30 def download_and_unzip(self, url):31 try:32 self.create_working_dir()33 r = requests.get(url)34 z = zipfile.ZipFile(io.BytesIO(r.content))35 z.extractall(self.working_dir)36 except Exception as e:37 raise38 def get_filename_for_download(self, file_title):39 try: 40 file_name = self.working_dir + '/' + file_title + '.txt'41 return file_name42 except Exception as e:43 raise44 def download_file(self, file_title, file_url):45 try:46 r = requests.get(file_url)47 file_name = self.get_filename_for_download(file_title)48 with open(file_name, "wb") as f:49 f.write(r.content)50 except Exception as e:51 print('url given: {}'.format(file_url))52 raise53 def download_files(self):54 try:55 self.create_working_dir()56 fd = self.file_dict57 for f in fd.keys():58 fname = fd[f][0]59 fname = fname.replace(' ', '%20')60 fname = self.url_base + fname61 self.download_and_unzip(fname)62 except Exception as e:63 raise64 def file_name_to_table_name(self, file_name):65 try:66 s = file_name67 s = s[:-4]68 return s69 except Exception as e:70 raise71 def files_to_df_dict(self):72 try:73 df_dict = {}74 for f in os.scandir(working_dir):75 f = f.name76 if f[-4:] == '.txt':77 key = self.file_name_to_table_name(f)78 df = pd.read_csv(working_dir + '/' + f,79 sep='|',80 encoding = 'ISO-8859-1')81 df_dict[key] = df82 return df_dict83 except Exception as e:84 print('filename: {}'.format(f))85 raise86 def create_df_dict(self):87 try:88 df_dict = {}89 for k in self.file_dict:90 f = self.file_dict[k][0]91 f = f.replace('.zip', '.txt')92 if f[-4:] == '.txt':93 df = pd.read_csv(self.working_dir + "/" + f,94 sep='|',95 encoding="ISO-8859-1")96 elif f[-4:] == '.csv':97 df = pd.read_csv(self.working_dir + '/' + f,98 encoding='ISO-8859-1')99 df_dict[k] = df100 return df_dict101 except Exception as e:102 raise103 def process_files(self):104 try:105 df_dict = self.create_df_dict()106 # for each df in df_dict, send df to Sockeye107 self.make_engine()108 today = date.today()109 for df_key in df_dict.keys():110 df = df_dict[df_key]111 str_date = today.strftime("_%Y%m%d")112 table_name = df_key + str_date113 df.to_sql(name=table_name, schema=self.schema, con=self.engine)114 except Exception as e:115 print(df_key)116 raise117 def make_engine(self):118 """119 Make connections to the database120 """121 try:122 conn_string = "DRIVER={{ODBC Driver 17 for SQL Server}}; " \123 "SERVER={}; DATABASE={}; trusted_connection=yes".format(124 'AWS-PROD-SQL\\Sockeye',125 'Assessor')126 #self.sql_conn = pyodbc.connect(conn_string)127 params = urllib.parse.quote_plus(conn_string)128 engine = sqlalchemy.create_engine("mssql+pyodbc:///?odbc_connect=%s" % params)129 self.engine = engine130 except Exception as e:131 print(e.args[0])132 raise133 def import_data(self):134 try:135 print("downloading data for {}".format(self.schema))136 self.download_files()137 print("importing data into db...")138 self.process_files()139 self.cleanup_working_dir()140 print("completed import.")141 except Exception as e:142 print(e.args[0])143 raise144class PierceAssessorETL(AssessorETL):145 import PierceColNames as pcn146 def __init__(self):147 super().__init__()148 self.schema = 'Pierce'149 self.file_dict = {'appraisal_account': ['appraisal_account.zip', pcn.appraisal_account_cols],150 'improvement': ['improvement.zip', pcn.improvement_cols],151 'improvement_builtas': ['improvement_builtas.zip', pcn.improvement_builtas_cols],152 'land_attribute': ['land_attribute.zip', pcn.land_attribute_cols],153 'improvement_detail': ['improvement_detail.zip', pcn.improvement_detail_cols]}154 self.url_base = 'https://online.co.pierce.wa.us/datamart/'155 def create_df_dict(self):156 try:157 df_dict = {}158 for k in self.file_dict:159 f = self.file_dict[k][0]160 f = f.replace('.zip', '.txt')161 if f[-4:] == '.txt':162 df = pd.read_csv(self.working_dir + "/" + f,163 sep='|',164 encoding="ISO-8859-1",165 names=self.file_dict[k][1])166 df_dict[k] = df167 return df_dict168 except Exception as e:169 raise170class KingAssessorETL(AssessorETL):171 def __init__(self):172 super().__init__()173 self.working_dir = './working_king'174 self.schema = 'King'175 self.file_dict = {'parcel': ['Parcel.zip'],176 'RealPropAcct': ['Real Property Account.zip'],177 'CondoComplexAndUnits': ['Condo Complex and Units.zip'],178 'ResBuilding': ['Residential Building.zip'],179 'ApartmentComplex': ['Apartment Complex.zip'],180 'CommercialBuilding': ['Commercial Building.zip']}181 self.url_base = 'https://aqua.kingcounty.gov/extranet/assessor/'182 def create_df_dict(self):183 try:184 df_dict = {}185 for f in os.scandir(self.working_dir):186 f = f.name187 if f[-4:] == '.csv':188 key = self.file_name_to_table_name(f)189 df = pd.read_csv(self.working_dir + '/' + f,190 encoding = 'ISO-8859-1')191 df_dict[key] = df192 return df_dict193 except Exception as e:194 print('filename: {}'.format(f))195 raise196class KitsapAssessorETL(AssessorETL):197 def __init__(self):198 super().__init__()199 self.working_dir = './working_kitsap'200 self.schema = 'Kitsap'201 self.url_base = 'https://www.kitsapgov.com/assessor/Documents/'202 self.file_dict = {'Parcels': ['Parcels.txt'],203 'Dwellings': ['Dwellings.txt'],204 'MobileHome': ['MH.txt'],205 'Valuations': ['Valuations.txt'],206 'Codes': ['Codes.txt'],207 'Property_addresses': ['Property_addresses.txt'],208 'Comml_Imps': ['Comml_Imps.txt']}209 def download_files(self):210 try:211 fd = self.file_dict212 self.create_working_dir()213 for f in fd.keys():214 fname = fd[f][0]215 fname = fname.replace(' ', '%20')216 furl = self.url_base + fname217 self.download_file(fname, furl)218 except Exception as e:219 raise220 def get_filename_for_download(self, file_title):221 try: 222 file_name = self.working_dir + '/' + file_title223 return file_name224 except Exception as e:225 raise226 def create_df_dict(self):227 try:228 df_dict = {}229 for k in self.file_dict:230 f = self.file_dict[k][0]231 if f[-4:] == '.txt':232 if k == 'Codes':233 df = pd.read_csv(self.working_dir + "/" + f,234 sep='\t',235 encoding="utf-16",236 index_col=False)237 else:238 df = pd.read_csv(self.working_dir + "/" + f,239 sep='\t',240 encoding="ISO-8859-1")241 df_dict[k] = df242 return df_dict243 except Exception as e:244 raise245class SnohomishAssessorETL(AssessorETL):246 def __init__(self):247 super().__init__()248 self.working_dir = './working_snohomish'249 self.schema = 'Snohomish'250 self.url_base = 'ftp://ftp.snoco.org/assessor/property_characteristics/'251 self.file_dict = {'Improvement': ['SnohomishCo Improvement Records_2020AV.xlsx'],252 'Land': ['SnohomishCo Land Records_2020AV.xlsx'],253 'Master': ['SnohomishCo Master Records_2020AV.xlsx']}254 def get_filename_for_download(self, file_title):255 try: 256 file_name = self.working_dir + '/' + file_title257 return file_name258 except Exception as e:259 raise260 def download_file(self, file_title, file_url):261 try:262 file_name = self.get_filename_for_download(file_title)263 with closing(urllib.request.urlopen(file_url)) as r:264 with open(file_name, 'wb') as f:265 shutil.copyfileobj(r,f)266 except Exception as e:267 print('url given: {}'.format(file_url))268 print('file_name gotten: {}'.format(file_name))269 raise270 271 def download_files(self):272 try:273 fd = self.file_dict274 self.create_working_dir()275 for f in fd.keys():276 f_name = fd[f][0]277 #fname = fname.replace(' ', '%20')278 f_url = self.url_base + f_name279 f_url = f_url.replace(' ', '%20')280 self.download_file(f_name, f_url)281 except Exception as e:282 raise283 def create_df_dict(self):284 try:285 df_dict = {}286 for k in self.file_dict:287 f = self.file_dict[k][0]288 if f[-5:] == '.xlsx':...
test_osutils.py
Source:test_osutils.py
...36 monkeypatch.setattr(shutil, "rmtree", self._shutil_rmtree)37 ## No errors38 def test_no_working_dir(self, monkeypatch):39 self._monkeypatch_all(monkeypatch)40 with osutils.create_working_dir():41 pass42 assert self._os_getcwd_calls == [((), {})]43 assert self._os_makedirs_calls == []44 assert self._tempfile_mkdtemp_calls == [((), {})]45 assert self._os_chdir_calls == [46 (('/tempdir/', ), {}),47 (('/original-dir/', ), {})48 ]49 assert self._shutil_rmtree_calls == []50 def test_working_dir_specified(self, monkeypatch):51 self._monkeypatch_all(monkeypatch)52 with osutils.create_working_dir(working_dir='/foo/'):53 pass54 assert self._os_getcwd_calls == [((), {})]55 assert self._os_makedirs_calls == [56 (('/foo/', ), {'exist_ok': True})57 ]58 assert self._tempfile_mkdtemp_calls == []59 assert self._os_chdir_calls == [60 (('/foo/', ), {}),61 (('/original-dir/', ), {})62 ]63 assert self._shutil_rmtree_calls == []64 def test_no_working_dir_delete_if_no_error(self, monkeypatch):65 self._monkeypatch_all(monkeypatch)66 with osutils.create_working_dir(delete_if_no_error=True):67 pass68 assert self._os_getcwd_calls == [((), {})]69 assert self._os_makedirs_calls == []70 assert self._tempfile_mkdtemp_calls == [((), {})]71 assert self._os_chdir_calls == [72 (('/tempdir/', ), {}),73 (('/original-dir/', ), {})74 ]75 assert self._shutil_rmtree_calls == [76 (('/tempdir/', ), {})77 ]78 def test_working_dir_specified_delete_if_no_error(self, monkeypatch):79 self._monkeypatch_all(monkeypatch)80 with osutils.create_working_dir(working_dir='/foo/', delete_if_no_error=True):81 pass82 assert self._os_getcwd_calls == [((), {})]83 assert self._os_makedirs_calls == [84 (('/foo/', ), {'exist_ok': True})85 ]86 assert self._tempfile_mkdtemp_calls == []87 assert self._os_chdir_calls == [88 (('/foo/', ), {}),89 (('/original-dir/', ), {})90 ]91 assert self._shutil_rmtree_calls == [92 (('/foo/', ), {})93 ]94 ## With errors95 def test_error_no_working_dir(self, monkeypatch):96 self._monkeypatch_all(monkeypatch)97 try:98 with osutils.create_working_dir():99 123 / 0100 except:101 pass102 assert self._os_getcwd_calls == [((), {})]103 assert self._os_makedirs_calls == []104 assert self._tempfile_mkdtemp_calls == [((), {})]105 assert self._os_chdir_calls == [106 (('/tempdir/', ), {}),107 (('/original-dir/', ), {})108 ]109 assert self._shutil_rmtree_calls == []110 def test_error_working_dir_specified(self, monkeypatch):111 self._monkeypatch_all(monkeypatch)112 try:113 with osutils.create_working_dir(working_dir='/foo/'):114 123 / 0115 except:116 pass117 assert self._os_getcwd_calls == [((), {})]118 assert self._os_makedirs_calls == [119 (('/foo/', ), {'exist_ok': True})120 ]121 assert self._tempfile_mkdtemp_calls == []122 assert self._os_chdir_calls == [123 (('/foo/', ), {}),124 (('/original-dir/', ), {})125 ]126 assert self._shutil_rmtree_calls == []127 def test_error_no_working_dir_delete_if_no_error(self, monkeypatch):128 self._monkeypatch_all(monkeypatch)129 try:130 with osutils.create_working_dir(delete_if_no_error=True):131 123 / 0132 except:133 pass134 assert self._os_getcwd_calls == [((), {})]135 assert self._os_makedirs_calls == []136 assert self._tempfile_mkdtemp_calls == [((), {})]137 assert self._os_chdir_calls == [138 (('/tempdir/', ), {}),139 (('/original-dir/', ), {})140 ]141 assert self._shutil_rmtree_calls == [] # not deleted, dur to error142 def test_error_working_dir_specified_delete_if_no_error(self, monkeypatch):143 self._monkeypatch_all(monkeypatch)144 try:145 with osutils.create_working_dir(working_dir='/foo/', delete_if_no_error=True):146 123 / 0147 except:148 pass149 assert self._os_getcwd_calls == [((), {})]150 assert self._os_makedirs_calls == [151 (('/foo/', ), {'exist_ok': True})152 ]153 assert self._tempfile_mkdtemp_calls == []154 assert self._os_chdir_calls == [155 (('/foo/', ), {}),156 (('/original-dir/', ), {})157 ]...
test_osutils_create_working_dir.py
Source:test_osutils_create_working_dir.py
...9 ## With errors10 logging.basicConfig(level=logging.DEBUG)11 logging.info("-"*80)12 logging.info("No working dir, no cleanup")13 with osutils.create_working_dir() as wdir:14 os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))15 assert os.path.exists(wdir)16 shutil.rmtree(wdir)17 logging.info("-"*80)18 logging.info("Working dir, no cleanup")19 with osutils.create_working_dir(working_dir='./foo/') as wdir:20 os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))21 assert os.path.exists(wdir)22 shutil.rmtree(wdir)23 logging.info("-"*80)24 logging.info("No working dir, with cleanup")25 with osutils.create_working_dir(delete_if_no_error=True) as wdir:26 os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))27 assert not os.path.exists(wdir)28 logging.info("-"*80)29 logging.info("Working dir, with cleanup")30 with osutils.create_working_dir(working_dir='./bar/', delete_if_no_error=True) as wdir:31 os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))32 assert not os.path.exists(wdir)33 ## No errors34 logging.basicConfig(level=logging.DEBUG)35 logging.info("-"*80)36 logging.info("No working dir, no cleanup ***(WITH ERROR)***")37 try:38 with osutils.create_working_dir() as wdir:39 os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))40 123 / 041 except:42 pass43 assert os.path.exists(wdir)44 shutil.rmtree(wdir)45 logging.info("-"*80)46 logging.info("Working dir, no cleanup ***(WITH ERROR)***")47 try:48 with osutils.create_working_dir(working_dir='./foo/') as wdir:49 os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))50 123 / 051 except:52 pass53 assert os.path.exists(wdir)54 shutil.rmtree(wdir)55 logging.info("-"*80)56 logging.info("No working dir, with cleanup ***(WITH ERROR)***")57 try:58 with osutils.create_working_dir(delete_if_no_error=True) as wdir:59 os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))60 123 / 061 except:62 pass63 assert os.path.exists(wdir)64 shutil.rmtree(wdir)65 logging.info("-"*80)66 logging.info("Working dir, with cleanup ***(WITH ERROR)***")67 try:68 with osutils.create_working_dir(working_dir='./bar/', delete_if_no_error=True) as wdir:69 os.makedirs(os.path.join(wdir, 'aaa', 'bbb'))70 123 / 071 except:72 pass73 assert os.path.exists(wdir)74 shutil.rmtree(wdir)75if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!