Best Python code snippet using tempest_python
main_window.py
Source:main_window.py
...181 self.file.change_directory(self.directory)182 if self.validate.requirement_check() != True:183 self.button_state(False)184 self.action_state(False)185 self.create_log(self.validate.requirement_check())186 else:187 self.button_state(True)188 self.action_state(True)189 self.create_log(190 "<span style='color:green;'>All the requirements are met.</span>"191 )192 self.port = self.file.find_ports(self.env_file_name)193 def create_log(self, message: str) -> None:194 """195 create_log creates the log.196 :param message: The message to be displayed.197 :type message: str198 """199 self.current_time = time.localtime()200 self.current_time = time.strftime("%H:%M:%S", self.current_time)201 self.op_log.append(202 "<html><body>"203 + "<b style='color:blue;'>"204 + f"[{self.current_time}]"205 + "</b>"206 + " >>> "207 + message208 + "<br/></body></html>"209 )210 def button_state(self, state: bool) -> None:211 """212 button_state changes the state of the buttons.213 :param state: The state of the buttons.214 :type state: bool215 """216 self.start_stop_btn.setEnabled(state)217 self.lhost_btn.setEnabled(state)218 self.pma_btn.setEnabled(state)219 self.flocation_btn.setEnabled(state)220 def action_state(self, state: bool) -> None:221 """222 action_state changes the state of the actions.223 :param state: The state of the actions.224 :type state: bool225 """226 self.actionEdit_Ports.setEnabled(state)227 self.actionRemove_Services.setEnabled(state)228 def service_state(self) -> None:229 """230 service_state changes the state of the services.231 """232 ready_msg_1 = "Starting..."233 ready_msg_2 = "Stopping..."234 success_msg_1 = "Service started."235 success_msg_2 = "Service stopped."236 error_msg_1 = "Service failed to start."237 error_msg_2 = "Service failed to stop."238 btn_state = self.start_stop_btn.isChecked()239 if btn_state:240 self.create_log(ready_msg_1)241 self.start_stop_btn.setText("Stop")242 if self.docker.start():243 self.create_log(success_msg_1)244 else:245 self.create_log(error_msg_1)246 self.error.show(error_msg_1)247 else:248 self.create_log(ready_msg_2)249 self.start_stop_btn.setText("Start")250 if self.docker.stop():251 self.create_log(success_msg_2)252 else:253 self.create_log(error_msg_2)254 self.error.show(error_msg_2)255 def open_localhost(self) -> None:256 """257 open_localhost opens the localhost.258 """259 ready_msg = "Opening localhost..."260 success_msg = "Opened localhost."261 warning_msg = "Please start the service first."262 error_msg = "Failed to open localhost."263 url = f"http://localhost:{self.port['WEB_PORT']}"264 if self.start_stop_btn.isChecked():265 self.create_log(ready_msg)266 try:267 self.file.open_this(url)268 self.create_log(success_msg)269 except:270 self.create_log(error_msg)271 self.error.show(error_msg)272 else:273 self.create_log(warning_msg)274 self.error.show(warning_msg)275 def open_pma(self) -> None:276 """277 open_pma opens the phpmyadmin.278 """279 ready_msg = "Opening phpmyadmin..."280 success_msg = "Opened phpmyadmin."281 warning_msg = "Please start the service first."282 error_msg = "Failed to open phpmyadmin."283 url = f"http://localhost:{self.port['PMA_PORT']}"284 if self.start_stop_btn.isChecked():285 self.create_log(ready_msg)286 try:287 self.file.open_this(url)288 self.create_log(success_msg)289 except:290 self.create_log(error_msg)291 self.error.show(error_msg)292 else:293 self.create_log(warning_msg)294 self.error.show(warning_msg)295 def open_project(self) -> None:296 """297 open_project opens the project.298 """299 success_msg = "Opened project."300 error_msg = "Failed to open project folder."301 url = f"{self.directory}/{self.public_directory}"302 self.create_log("Opening project...")303 try:304 self.file.open_this(url)305 self.create_log(success_msg)306 except:307 self.create_log(error_msg)308 self.error.show(error_msg)309 def create_project(self) -> None:310 """311 create_project creates the project.312 """313 ready_msg = "Adding new project..."314 success_msg = "Project created."315 error_msg = "Failed to create project."316 self.create_log(ready_msg)317 if self.new_project.show():318 self.create_log(success_msg)319 else:320 self.create_log(error_msg)321 self.error.show(error_msg)322 self.load_projects()323 def exit_app(self) -> None:324 """325 exit_app will exit the application.326 """327 ready_msg = "Stopping services..."328 confirm_msg = "Are you sure you want to quit?"329 success_msg = "Exited."330 cancel_msg = "Exiting canceled."331 if self.confirm.show(confirm_msg):332 self.create_log(ready_msg)333 self.docker.stop()334 self.create_log(success_msg)335 exit()336 else:337 self.create_log(cancel_msg)338 def edit_ports(self) -> None:339 """340 edit_ports will edit the ports.341 """342 ready_msg = "Editing ports..."343 success_msg = "Ports edited."344 warning_msg = "Please start the service first."345 error_msg = "Failed to edit ports."346 self.create_log(ready_msg)347 if not self.start_stop_btn.isChecked():348 result = self.edit_port_dialog.show()349 if result != False:350 self.create_log(success_msg)351 else:352 self.create_log(error_msg)353 self.error.show(error_msg)354 else:355 self.create_log(warning_msg)356 self.error.show(warning_msg)357 self.port = self.file.find_ports(self.env_file_name)358 def remove_services(self) -> None:359 """360 remove_services will remove the services.361 """362 ready_msg = "Removing services..."363 confirm_msg = "Are you sure you want to remove the services?"364 success_msg = "Services removed."365 warning_msg = "Please start the service first."366 error_msg = "Failed to remove services."367 cancel_msg = "Services removal canceled."368 if not self.start_stop_btn.isChecked():369 self.create_log(ready_msg)370 if self.confirm.show(confirm_msg):371 if self.docker.remove():372 self.create_log(success_msg)373 else:374 self.create_log(error_msg)375 self.error.show(error_msg)376 else:377 self.create_log(cancel_msg)378 else:379 self.create_log(warning_msg)...
tests.py
Source:tests.py
...173 # scopes174 def test_get_unique_scopes(self):175 scope1 = 'test1'176 scope2 = 'test2'177 log1 = self.access_log.create_log(scope=scope1)178 log2 = self.access_log.create_log(scope=scope1)179 log3 = self.access_log.create_log(scope=scope2)180 unique_scopes = self.access_log.get_unique_scopes()181 self.assertEqual(2, len(unique_scopes))182 self.assertTrue(scope1 in unique_scopes)183 self.assertTrue(scope2 in unique_scopes)184 # logs185 def test_create_log_with_preset_remote_origin(self):186 preset_remote_origin = '1.1.1.1'187 self.access_log.remote_origin = '1.1.1.1'188 log = self.access_log.create_log()189 self.assertEqual(preset_remote_origin, log.remote_origin.exploded)190 def test_prune_logs_all(self):191 log1 = self.access_log.create_log(creation_time=1)192 log2 = self.access_log.create_log(creation_time=2)193 log3 = self.access_log.create_log(creation_time=3)194 self.assertIsNotNone(self.access_log.get_log(log1.id))195 self.assertIsNotNone(self.access_log.get_log(log2.id))196 self.assertIsNotNone(self.access_log.get_log(log3.id))197 self.access_log.prune_logs()198 self.assertIsNone(self.access_log.get_log(log1.id))199 self.assertIsNone(self.access_log.get_log(log2.id))200 self.assertIsNone(self.access_log.get_log(log3.id))201 def test_prune_logs_created_before(self):202 log1 = self.access_log.create_log(creation_time=1)203 log2 = self.access_log.create_log(creation_time=2)204 log3 = self.access_log.create_log(creation_time=3)205 self.assertIsNotNone(self.access_log.get_log(log1.id))206 self.assertIsNotNone(self.access_log.get_log(log2.id))207 self.assertIsNotNone(self.access_log.get_log(log3.id))208 self.access_log.prune_logs(created_before=3)209 self.assertIsNone(self.access_log.get_log(log1.id))210 self.assertIsNone(self.access_log.get_log(log2.id))211 self.assertIsNotNone(self.access_log.get_log(log3.id))212 def test_cooldown_remote_origin(self):213 remote_origin = '1.1.1.1'214 amount = 1215 period = 60216 scope = 'test'217 self.assertFalse(218 self.access_log.cooldown(219 scope,220 amount,221 period,222 remote_origin=remote_origin,223 )224 )225 self.access_log.create_log(scope=scope, remote_origin=remote_origin)226 # explicit remote origin227 self.assertTrue(228 self.access_log.cooldown(229 scope,230 amount,231 period,232 remote_origin=remote_origin,233 )234 )235 # preset remote origin is used if not specified236 self.access_log.remote_origin = remote_origin237 self.assertTrue(self.access_log.cooldown(scope, amount, period))238 # remote origin matches even logs with non-matching subject id239 self.assertTrue(240 self.access_log.cooldown(241 scope,242 amount,243 period,244 remote_origin=remote_origin,245 subject_id=uuid.uuid4().bytes,246 )247 )248 def test_cooldown_subject_id(self):249 subject_id = uuid.uuid4().bytes250 amount = 1251 period = 60252 scope = 'test'253 self.assertFalse(254 self.access_log.cooldown(255 scope,256 amount,257 period,258 subject_id=subject_id,259 )260 )261 self.access_log.create_log(scope=scope, subject_id=subject_id)262 self.assertTrue(263 self.access_log.cooldown(264 scope,265 amount,266 period,267 subject_id=subject_id,268 )269 )270 # subject id matches even logs with non-matching remote_origin271 self.assertTrue(272 self.access_log.cooldown(273 scope,274 amount,275 period,276 remote_origin='1.1.1.1',277 subject_id=subject_id,278 )279 )280 def test_cooldown_range_and_amount_per_period(self):281 remote_origin = '1.1.1.1'282 self.access_log.remote_origin = remote_origin283 amount = 1284 period = 60285 scope = 'test'286 the_past = time.time() - period - 1287 # logs outside of the cooldown period aren't counted towards the amount288 self.access_log.create_log(scope=scope, creation_time=the_past)289 # multiple logs up to a cap can be set for a given period290 self.access_log.create_log(scope=scope)291 amount = 2292 self.assertFalse(self.access_log.cooldown(scope, amount, period))293 self.access_log.create_log(scope=scope)294 self.assertTrue(self.access_log.cooldown(scope, amount, period))295 # anonymization296 def test_anonymize_id(self):297 id = uuid.uuid4().bytes298 log_subject = self.access_log.create_log(subject_id=id)299 log_object = self.access_log.create_log(object_id=id)300 count_methods_filter_fields = [301 (self.access_log.count_logs, 'subject_ids'),302 (self.access_log.count_logs, 'object_ids'),303 ]304 for count, filter_field in count_methods_filter_fields:305 self.assertEqual(1, count(filter={filter_field: id}))306 new_id_bytes = self.access_log.anonymize_id(id)307 for count, filter_field in count_methods_filter_fields:308 self.assertEqual(0, count(filter={filter_field: id}))309 # assert logs still exist, but with the new id as subject/object310 for count, filter_field in count_methods_filter_fields:311 self.assertEqual(1, count(filter={filter_field: new_id_bytes}))312 #TODO passing in an id to use for anonymization is allowed, so test it313 def test_anonymize_id_with_new_id(self):314 pass315 def test_anonymize_log_origins(self):316 origin1 = '1.2.3.4'317 expected_anonymized_origin1 = '1.2.0.0'318 log1 = self.access_log.create_log(remote_origin=origin1)319 origin2 = '2001:0db8:85a3:0000:0000:8a2e:0370:7334'320 expected_anonymized_origin2 = '2001:0db8:85a3:0000:0000:0000:0000:0000'321 log2 = self.access_log.create_log(remote_origin=origin2)322 logs = self.access_log.search_logs()323 self.access_log.anonymize_log_origins(logs)324 anonymized_log1 = self.access_log.get_log(log1.id)325 anonymized_log2 = self.access_log.get_log(log2.id)326 self.assertEqual(327 expected_anonymized_origin1,328 anonymized_log1.remote_origin.exploded,329 )330 self.assertEqual(331 expected_anonymized_origin2,332 anonymized_log2.remote_origin.exploded,333 )334if __name__ == '__main__':335 if '--db' in sys.argv:...
integrityChecker.py
Source:integrityChecker.py
...19from checkLogPath import checkLogPath20from writeIntegrityPanic import writeIntegrityPanic21from createProject import createProject22class integrityChecker(opdb_setup,getElementType,getProjects,createElement,getElementList,getVersionList,checkLogPath,writeIntegrityPanic,createProject):23 def create_log(self,Path,logdescription):24 logfile=""25 if self.getElementType_main(Path=Path)=="projectRoot":26 logfile=self.AdminROOT+"/common/integrity_logs/"+str(self.timerec)+".log"27 elif self.getElementType_main(Path=Path)!="":28 pr=Path.split(":")[1]29 logfile=self.AdminROOT+"/"+pr+"/integrity_logs/"+str(self.timerec)+".log"30 else:31 pass32 33 if logfile!="":34 if os.path.isfile(logfile):35 fl=open(logfile,"r")36 rea=fl.read()37 fl.close()38 39 tolog=rea.strip().lstrip()+"\n"+str(Path)+" "+logdescription+"\n"40 41 fl=open(logfile,"w")42 fl.write(tolog)43 fl.close()44 else:45 tolog=str(Path)+" "+logdescription+"\n"46 47 fl=open(logfile,"w")48 fl.write(tolog)49 fl.close() 50 def checker_loop(self,Path):51 print Path52 path_type=self.getElementType_main(Path=Path)53 if path_type=="projectRoot":54 pre=self.getProjects_main()55 p_c=os.listdir(self.ProjectROOT+Path.replace(":","/"))56 p_d=[]57 for p_ in p_c:58 if os.path.isdir(self.ProjectROOT+Path.replace(":","/")+str(p_)):59 p_d.append(p_)60 61 filestodelete=[]62 for pp in p_c:63 if os.path.isfile(self.ProjectROOT+"/"+str(pp)):64 if pp=="elements.atr" or pp=="versions.atr":65 filestodelete.append(pp)66 67 if len(filestodelete) > 0:68 for flss in filestodelete:69 os.remove(self.ProjectROOT+"/"+flss)70 self.create_log(Path,"Project root was cleaned from failing .atr files.")71 72 duplchk={}73 for i in pre:74 duplchk[i]=pre.count(i)75 ccc=076 for i in duplchk.keys():77 if int(duplchk[i]) > 1:78 ccc=179 else:80 pass81 if ccc==1:82 ret_solve=""83 for elem in duplchk.keys():84 ret_solve+=elem+"\n"85 #-------------------------here we solve the duplicated entries problem86 fi=open(self.ProjectROOT+"/projects.atr","w")87 fi.write(ret_solve)88 fi.close()89 pre=self.getProjects_main()90 self.create_log(Path,"There was duplicated entries in the elementdescription file, so I recreated them.")91 92 93 for prr in pre:94 #------------- solving the case of the registrad folder without a type definition file95 if os.path.isdir(self.ProjectROOT+"/"+prr):96 if self.getElementType_main(Path=Path+prr)=="":97 ff=open(self.ProjectROOT+"/"+prr+"/elements.atr","w")98 ff.write("")99 ff.close()100 self.create_log(Path,"This was a registred empty folder: "+str(":"+prr)+" now it is a container.")101 else: 102 pass103 104 if set(pre) == set(p_d):105 pass106 107 else:108 p_mr=list(set(pre).difference(set(p_d))) #tobb a bejegyzes mint kene109 p_md=list(set(p_d).difference(set(pre))) #tobb a konyvtar mint kene110 if len(p_md) > 0:111 for di in p_md:112 if self.getElementType_main(Path=Path+di)=="":113 114 if len(os.listdir(self.ProjectROOT+"/"+di)) < 1:115 #-----------------------we solved the empty folder case..........116 os.rmdir(self.ProjectROOT+"/"+di)117 self.create_log(Path,"Empty folder was deleted: "+str(":"+di))118 else: 119 #-------------------------------------------here we solve the problem when we found a directory without any element entry and we do not even know its type120 self.writeIntegrityPanic_main(Path=Path,Problem=di,Description="I can not categorize this folder: "+di+" at the end of the path, please take care about it.")121 self.create_log(Path,"User action is needed for: "+str(":"+di))122 123 elif self.getElementType_main(Path=Path+di)=="item" or self.getElementType_main(Path=Path+di)=="container":124 125 #---------------------------------herer we solve the case with the folders which are for delete126 if int(self.checkLogPath_main(logSystemPath=self.AdminROOT+"/common/project_delete_list.atr",Path=Path+str(di)))==int(1):127 pass128 else:129 fl=open(self.AdminROOT+"/common/project_delete_list.atr","r")130 rea=fl.read()131 fl.close()132 133 tolog=rea.strip().lstrip()+"\n"+str(Path+di)+" integrityCheck "+str(self.timerec)+" integrityCheck automatically placed this item to the delelte list.\n"134 135 fl=open(self.AdminROOT+"/common/project_delete_list.atr","w")136 fl.write(tolog)137 fl.close() 138 self.create_log(Path,"Delete request was placed for: "+str(":"+di))139 140 if len(p_mr) > 0:141 for el in p_mr:142 #--------------------here we solve the problem when there is more items in a project.art file, and we create containers for it.143 #os.makedirs(self.ProjectROOT+Path.replace(":","/")+str(el))144 #pf=open(self.ProjectROOT+str(Path.replace(":","/"))+"/"+str(el)+"/elements.atr","w")145 #pf.write("")146 #pf.close()147 self.createProject_main(str(el))148 self.create_log(Path,"Project: "+str(el)+" created.")149 150 151 for em in pre:152 self.checker_loop(":"+em)153 154 #ve=self.getVersionList_main(Path)155 #if len(ve)>0:156 # for v in self.getVersionList_main(Path):157 # self.checker_loop(Path+"@"+v[0])158 159 elif path_type=="container" or path_type=="item":160 pre=self.getElementList_main(Path=Path)161 p_c=os.listdir(self.ProjectROOT+Path.replace(":","/"))162 p_d=[]163 for p_ in p_c:164 if os.path.isdir(self.ProjectROOT+Path.replace(":","/")+"/"+str(p_)):165 p_d.append(p_)166 duplchk={}167 for i in pre:168 duplchk[i]=pre.count(i)169 ccc=0170 171 for i in duplchk.keys():172 if int(duplchk[i]) > 1:173 ccc=1174 else:175 pass176 if ccc==1:177 ret_solve=""178 for elem in duplchk.keys():179 ret_solve+=elem+"\n"180 #-------------------------here we solve the duplicated entries problem181 fi=open(self.ProjectROOT+Path.replace(":","/")+"/elements.atr","w")182 fi.write(ret_solve)183 fi.close()184 pre=self.getProjects_main()185 self.create_log(Path,"There was duplicated entries in the elementdescription file, so I recreated them.")186 187 for prr in pre:188 #------------- solving the case of the registred folder without a type definition file189 if os.path.isdir(self.ProjectROOT+Path.replace(":","/")+"/"+prr):190 if self.getElementType_main(Path=Path+":"+prr)=="":191 ff=open(self.ProjectROOT+Path.replace(":","/")+"/"+prr+"/elements.atr","w")192 ff.write("")193 ff.close()194 self.create_log(Path,"This was a registred empty folder: "+str(Path+":"+prr)+" now it is a container.")195 else: 196 pass197 198 if set(pre) == set(p_d):199 pass200 201 else:202 p_mr=list(set(pre).difference(set(p_d))) #tobb a bejegyzes mint kene203 p_md=list(set(p_d).difference(set(pre))) #tobb a konyvtar mint kene204 if len(p_md) > 0:205 for di in p_md:206 207 if self.getElementType_main(Path=Path+":"+di)=="":208 if len(os.listdir(self.ProjectROOT+Path.replace(":","/")+"/"+di)) < 1:209 #-----------------------we solved the empty folder case..........210 os.rmdir(self.ProjectROOT+Path.replace(":","/")+"/"+di)211 self.create_log(Path,"Empty folder was deleted: "+str(Path+":"+di))212 else: 213 #-------------------------------------------here we solve the problem when we found a directory without any element entry and we do not even know its type214 self.writeIntegrityPanic_main(Path=Path,Problem=di,Description=("I can not categorize this folder: "+di+" at the end of the path, please take care about it."))215 self.create_log(Path,"User action is needed for: "+str(Path+":"+di))216 217 elif self.getElementType_main(Path=Path+":"+di)=="item" or self.getElementType_main(Path=Path+":"+di)=="container":218 219 #---------------------------------herer we solve the case with the folders which are for delete220 if int(self.checkLogPath_main(logSystemPath=self.AdminROOT+"/"+self.getp(Path)+"/delete_list.atr",Path=Path+":"+str(di)))==int(1):221 pass222 else:223 fl=open(self.AdminROOT+"/"+self.getp(Path)+"/delete_list.atr","r")224 rea=fl.read()225 fl.close()226 227 tolog=rea.strip().lstrip()+"\n"+str(Path+":"+di)+" integrityCheck "+str(self.timerec)+" integrityCheck automatically placed this item to the delelte list.\n"228 229 fl=open(self.AdminROOT+"/"+self.getp(Path)+"/delete_list.atr","w")230 fl.write(tolog)231 fl.close() 232 self.create_log(Path,"Delete request was placed for: "+str(Path+":"+di))233 234 if len(p_mr) > 0:235 for el in p_mr:236 #--------------------here we solve the problem when there is more items in a element.art file, and we create containers for it.237 os.makedirs(self.ProjectROOT+Path.replace(":","/")+"/"+str(el))238 pf=open(self.ProjectROOT+str(Path.replace(":","/"))+"/"+str(el)+"/elements.atr","w")239 pf.write("")240 pf.close()241 self.create_log(Path,"Container: "+str(el)+" created.")242 243 244 if path_type=="item":245 vers=self.getVersionList_main(Path=Path)246 if len(vers) > 0:247 for ww in vers:248 vw=ww[0]249 if os.path.isdir(self.VaultROOT+"/"+Path.replace(":","/")+"/"+vw):250 pass251 else:252 os.makedirs(self.VaultROOT+"/"+Path.replace(":","/")+"/"+vw)253 self.create_log(Path,"Version folder was created in the vaults: @"+str(vw))254 else:255 if os.path.isdir(self.VaultROOT+"/"+Path.replace(":","/")):256 pass257 else:258 os.makedirs(self.VaultROOT+"/"+Path.replace(":","/"))259 self.create_log(Path,"Item folder was created in the Vault.")260 for em in pre:261 self.checker_loop(Path+":"+em)262 263 #ve=self.getVersionList_main(Path)264 #if len(ve)>0:265 # for v in self.getVersionList_main(Path):266 # self.checker_loop(Path+"@"+v[0])267 268 elif path_type=="version":269 pass270 271 272 elif path_type=="attribute":273 pass...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!