Best Python code snippet using slash
views.py
Source:views.py
...57 # cost = cost + int(distance - 3) * .558 # error_string = ""59 # for key,val in params.items():60 # error_string = error_string +""+key + ":" + val + " | "61 # write_to_error_file(time + " | " + date + " | " +"addresss pass ("+error_string+"): ")62 # except Exception as e:63 # error_string = ""64 # for key,val in params.items():65 # error_string = error_string +""+key + ":" + val + " | "66 # write_to_error_file(time + " | " + date + " | " +"addresss error ("+error_string+"): " + str(e))67 # cost = 068 context ={69 "cost": ("%.2f"%(cost)),70 }71 return JsonResponse(json.loads(json.dumps(context)))72def process(request):73 params = request.GET74 # Time75 mountain = timezone('US/Mountain')76 mountain_time = datetime.now(mountain)77 date_time = mountain_time.strftime('%Y-%m-%d_%H:%M:%S').split("_")78 date = date_time[0]79 time = date_time[1]80 # Customer Information81 try:82 if params["fname"] == "":83 fname = "none"84 else:85 fname = params["fname"]86 except Exception as e:87 write_to_error_file(time + " | " + date + " | " +"fname error: " + str(e))88 fname = "fail"89 try:90 if params["lname"] == "":91 lname = "none"92 else:93 lname = params["lname"]94 except Exception as e:95 write_to_error_file(time + " | " + date + " | " +"lname error: " + str(e))96 lname = "fail"97 try:98 if params["email"] == "":99 email = "none"100 else:101 email = params["email"]102 except Exception as e:103 write_to_error_file(time + " | " + date + " | " +"email error: " + str(e))104 email = "fail"105 try:106 if params["myphone"] == "":107 customer_phone = "none"108 else:109 customer_phone = params["myphone"]110 except Exception as e:111 write_to_error_file(time + " | " + date + " | " +"customer_phone error: " + str(e))112 customer_phone = "fail"113 try:114 if params["instagram"] == "":115 instagram = "none"116 else:117 instagram = params["instagram"]118 except Exception as e:119 write_to_error_file(time + " | " + date + " | " +"instagram error: " + str(e))120 instagram = "fail"121 # Recipient Information122 try:123 if params["recipient"] == "":124 recipient = "none"125 else:126 recipient = params["recipient"]127 except Exception as e:128 write_to_error_file(time + " | " + date + " | " +"recipient error: " + str(e))129 recipient = "fail"130 try:131 if params["custphone"] == "":132 rec_phone_number = "none"133 else:134 rec_phone_number = params["custphone"]135 except Exception as e:136 write_to_error_file(time + " | " + date + " | " +"rec_phone_number error: " + str(e))137 rec_phone_number = "fail"138 try:139 if params["address"] == "":140 address = "none"141 else:142 address = params["address"]143 except Exception as e:144 write_to_error_file(time + " | " + date + " | " +"address error: " + str(e))145 address = "fail"146 try:147 if params["date"] == "":148 delivery_date = "none"149 else:150 delivery_date = params["date"]151 except Exception as e:152 write_to_error_file(time + " | " + date + " | " +"delivery_date error: " + str(e))153 delivery_date = "0000-00-00"154 try:155 if params["message"] == "":156 message = "none"157 else:158 message = params["message"]159 except Exception as e:160 write_to_error_file(time + " | " + date + " | " +"message error: " + str(e))161 message = "fail"162 try:163 if params["delivery_type"] == "":164 delivery_type = "none"165 else:166 delivery_type = params["delivery_type"]167 except Exception as e:168 write_to_error_file(time + " | " + date + " | " +"delivery_type error: " + str(e))169 delivery_type = "fail"170 try:171 if params["del_name"] == "":172 del_name = "none"173 else:174 del_name = params["del_name"]175 except Exception as e:176 write_to_error_file(time + " | " + date + " | " +"del_name error: " + str(e))177 del_name = "fail"178 # Design179 try:180 if params["keywords"] == "":181 keywords = "none"182 else:183 keywords = params["keywords"]184 except Exception as e:185 write_to_error_file(time + " | " + date + " | " +"keywords error: " + str(e))186 keywords = "fail"187 # Cost188 try:189 if params["total"] == "" or params["total"] == "NaN":190 total = 0191 else:192 total = params["total"]193 except Exception as e:194 write_to_error_file(time + " | " + date + " | " +"total error: " + str(e))195 total = 0196 try:197 if params["delivery_fee"] == "":198 delivery_fee = 0199 else:200 delivery_fee = params["delivery_fee"]201 except Exception as e:202 write_to_error_file(time + " | " + date + " | " + "delivery_fee error: " + str(e))203 delivery_fee = 0204 try:205 if params["user_input"] == "":206 user_input = 0207 else:208 user_input = params["user_input"].replace("$","").replace("_","")209 except Exception as e:210 write_to_error_file(time + " | " + date + " | " + "user_input error: " + str(e))211 user_input = 0212 try:213 order_number = int(max(list(Floral.objects.values_list("order_number", flat=True)))+1)214 except:215 order_number = 0216 try:217 my_error_string = "#"+str(order_number) + " | " + time + " | " + date + " | " + fname + " | " + lname + " | " + email + " | " + customer_phone + " | " \218 "" + recipient + " | " + rec_phone_number + " | " + address + " | " + delivery_date +" | " + del_name + " | " + message + " | " \219 "" + keywords + " | " + str(total) + " | " + str(delivery_fee)220 write_to_error_file(my_error_string)221 except Exception as e:222 write_to_error_file(time + " | " + date + " | " + "error_string_error: " + str(e))223 # BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))224 # my_path = os.path.join(BASE_DIR, 'error_file.txt')225 # from django.conf import settings226 # os.path.join(settings.MEDIA_ROOT, 'error_file.txt')227 if delivery_type == 'Residential':228 del_name = "none"229 try:230 s = Floral(231 order_number = order_number,232 delivered = False,233 date_processed = date,234 time_processed = time,235 customer_first_name = fname,236 customer_last_name = lname,237 customer_email = email,238 customer_phone = customer_phone,239 customer_instagram = instagram,240 recipient_name = recipient,241 recipient_phone = rec_phone_number,242 delivery_address = address,243 delivery_date = delivery_date,244 delivery_message = message,245 design_keywords = keywords,246 floral_cost = int(user_input),247 delivery_fee = float(delivery_fee),248 delivery_type=delivery_type,249 del_name=del_name,250 total_cost = float(total),251 )252 s.save()253 except Exception as e:254 write_to_error_file(time + " | " + date + " | " + "database error: " + str(e))255 cfg.CONFIG.ORDERNUMBER = order_number256 context ={257 "order_number": order_number,258 }259 return JsonResponse(json.loads(json.dumps(context)))260 # return render(request, 'country_floral_app/submit.html', context)261def confirm(request):262 params = request.GET263 # order_number = int(max(list(Floral.objects.values_list("order_number", flat=True))))264 order_number = params["order_number"]265 if str(cfg.CONFIG.ORDERNUMBER) == str(order_number):266 try:267 context = Floral.objects.all().filter(order_number=order_number).values()[0]268 except Exception as e:269 write_to_error_file("#"+str(order_number) + " failed:" + str(e))270 return render(request, 'country_floral_app/submit.html', context)271 else: return render(request, 'country_floral_app/error.html', {})272def about(request):273 context = {274 }275 return render(request, 'country_floral_app/about.html', context)276def contact(request):277 context = {278 }279 return render(request, 'country_floral_app/contact.html', context)280def gallery(request):281 context = {282 }283 return render(request, 'country_floral_app/gallery.html', context)284def events(request):285 context = {286 }287 return render(request, 'country_floral_app/events.html', context)288def write_to_error_file(error_string):289 try:290 with open("/root/database/error_file.txt", "a") as myfile:291 myfile.write(error_string + "\n")292 except:...
worker.py
Source:worker.py
...37 warning.message, warning.filename, warning.lineno, extra={'capture': False})38 def error_added(self, error, result):39 if result.is_global_result():40 self.client.report_session_error("Session error in client id {}: {}".format(self.client_id, error.message))41 def write_to_error_file(self, msg):42 try:43 file_name = "{}-{}.log".format(config.root.parallel.worker_error_file, self.client_id)44 worker_error_file = os.path.join(config.root.parallel.workers_error_dir, file_name)45 with open(worker_error_file, 'a') as error_file:46 error_file.write(msg)47 except OSError as err:48 _logger.error("Failed to write to worker error file, error: {}", err, extra={'capture': False})49 def connect_to_server(self):50 try:51 self.client = xmlrpc_client.ServerProxy(self.server_addr, allow_none=True)52 self.client.connect(self.client_id, os.getpid())53 self._stop_event = threading.Event()54 self._watchdog_thread = threading.Thread(target=self.keep_alive, args=(self._stop_event,))55 self._watchdog_thread.setDaemon(True)56 self._watchdog_thread.start()57 except OSError as err:58 self.write_to_error_file("Failed to connect to server, error: {}".format(str(err)))59 def _stop_keepalive_thread(self):60 if self._stop_event is not None and not self._stop_event.is_set():61 self._stop_event.set()62 self._watchdog_thread.join()63 def start_execution(self, app, collected_tests):64 if _HAS_PID_PGID:65 if os.getpid() != os.getpgid(0):66 os.setsid()67 else:68 _logger.debug(69 "Skipping setsid for process since it is not available"70 )71 register(self.warning_added)72 register(self.error_added)73 collection = [(test.__slash__.file_path,74 test.__slash__.function_name,75 test.__slash__.variation.dump_variation_dict()) for test in collected_tests]76 if not self.client.validate_collection(self.client_id, sorted(collection)):77 _logger.error("Collections of worker id {} and master don't match, worker terminates", self.client_id,78 extra={'capture': False})79 self._stop_keepalive_thread()80 self.client.disconnect(self.client_id, has_failure=True)81 return82 should_stop = False83 signatures_dict = collections.defaultdict(set)84 for index, test in enumerate(collection):85 signatures_dict[test].add(index)86 with app.session.get_started_context():87 try:88 while not should_stop:89 test_entry = self.client.get_test(self.client_id)90 if test_entry == WAITING_FOR_CLIENTS:91 _logger.debug("Worker_id {} recieved waiting_for_clients, sleeping", self.client_id)92 time.sleep(0.05)93 elif test_entry == PROTOCOL_ERROR:94 _logger.error("Worker_id {} recieved protocol error message, terminating", self.client_id,95 extra={'capture': False})96 break97 elif test_entry == NO_MORE_TESTS:98 _logger.debug("Got NO_MORE_TESTS, Client {} disconnecting", self.client_id)99 break100 else:101 index = signatures_dict[tuple(test_entry[0])].pop()102 test = collected_tests[index]103 context.session.current_parallel_test_index = test_entry[1]104 run_tests([test])105 result = context.session.results[test]106 _logger.debug("Client {} finished test, sending results", self.client_id)107 ret = self.client.finished_test(self.client_id, result.serialize())108 if ret == PROTOCOL_ERROR:109 _logger.error("Worker_id {} recieved protocol error message, terminating", self.client_id,110 extra={'capture': False})111 should_stop = True112 except INTERRUPTION_EXCEPTIONS:113 self.write_to_error_file("Worker interrupted while executing test")114 _logger.error("Worker interrupted while executing test", extra={'capture': False})115 raise116 except Exception as err:117 self.write_to_error_file(str(err))118 raise119 else:120 context.session.mark_complete()121 context.session.initiate_cleanup()122 self._stop_keepalive_thread()123 self.client.disconnect(self.client_id)124 finally:125 context.session.initiate_cleanup()...
usb_prepare_util.py
Source:usb_prepare_util.py
...35 except subprocess.CalledProcessError as e:36 output = e.output37 print(output)38 if (time.time() - time_now) > 60:39 write_to_error_file(4)40 done = 041 while done == 0:42 try:43 output = subprocess.check_output("sudo umount /dev/dm-0", shell=True)44 print(output)45 done = 146 except subprocess.CalledProcessError as e:47 output = e.output48 print(output)49 if (time.time() - time_now) > 60:50 write_to_error_file(4)51 done = 052 while done == 0:53 try:54 output = subprocess.check_output("sudo mount /dev/dm-0 /mnt/usb", shell=True)55 print(output)56 done = 157 except subprocess.CalledProcessError as e:58 output = e.output59 print(output)60 if (time.time() - time_now) > 60:61 write_to_error_file(4)62def delete_previous_day_folders(today):63 list_days = os.listdir("/mnt/usb/")64 for days in list_days:65 path_to_day = "/mnt/usb/" + days66 try:67 length = len([i for i in os.listdir(path_to_day + "/all_frames/all_frames/")])68 #If there's an exception, it's because there are no folders 69 except:70 length = 071 if length == 0 and str(days) != (today):72 os.system("sudo rm -r " + path_to_day)73 print("sudo rm -r " + path_to_day)74 else:75 if str(days) != str(today):76 write_to_error_file(3)77def find_usb_percent_full():78 output = subprocess.check_output("df /dev/dm-0", shell=True)79 percent = str(output).split()[-2][:-1]80 return percent81def write_to_error_file(error): 82 #ERROR 1: The USB is filling up (it is at least 75% full).83 #ERROR 2: The USB is almost completely full (95%) and recording frames has stopped.84 #ERROR 3: There are folders from at least one previous day that haven't been pushed for some reason.85 #ERROR 4: nousb86 with open("/home/nvidia/ping_errors/errors.txt", 'w') as the_file:87 the_file.write("!error" + str(error) + "!")88def find_its_max_frame(path_today):89 folder_list = os.listdir(path_today + '/all_frames/all_frames/')90 length = len(folder_list)91 if length == 0:92 return -193 num_list = [int(i[6:]) for i in folder_list]94 num_list.sort()95 folder = num_list[-1]96 num_frame = len(os.listdir(path_today + '/all_frames/all_frames/folder' + str(folder) + '/'))97 framenum = folder * 1000 + num_frame - 198 return framenum99def usb_storage_check():100 try:101 percent = find_usb_percent_full()102 print(str(percent) + "% full")103 except:104 percent = 0105 print("Cannot allocate memory")106 if 90 > int(percent) >= 75:107 write_to_error_file(1)108 if int(percent) >= 90:109 write_to_error_file(2)110 remove_redundant_data("/mnt/usb/", 30)111def remove_redundant_data(path_init, num):112 path = get_oldest_folder(path_init)113 folder_list = os.listdir(path)114 while len(folder_list) == 0:115 os.system('sudo rm -rf {}'.format(path[:-22]))116 path = get_oldest_folder(path_init)117 folder_list = os.listdir(path)118 if len(folder_list) <= num:119 for folder in folder_list:120 os.system('sudo rm -rf {}{}'.format(path, folder))121 else:122 folder_list = [int(i[6:]) for i in folder_list]123 folder_list.sort()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!