Best Python code snippet using localstack_python
rabbit_test.py
Source: rabbit_test.py
...179 monkeypatch.setattr("beer_garden.queue.rabbit.SchemaParser", parser_mock)180 client.clear_queue("queue")181 assert pyrabbit_client.get_messages.called is True182 assert parser_mock.parse_request.called is False183 def test_delete_queue(self, client, pyrabbit_client):184 client.delete_queue("queue")185 assert pyrabbit_client.delete_queue.called is True186 def test_destroy_queue_all_exceptions(self, client):187 disconnect_consumers_mock = Mock(side_effect=ValueError)188 clear_queue_mock = Mock(side_effect=ValueError)189 delete_queue = Mock(side_effect=ValueError)190 client.disconnect_consumers = disconnect_consumers_mock191 client.clear_queue = clear_queue_mock192 client.delete_queue = delete_queue193 client.destroy_queue("queue_name", True)194 assert disconnect_consumers_mock.called is True195 assert clear_queue_mock.called is True196 assert delete_queue.called is True197 def test_destroy_queue_with_http_errors(self, client):198 disconnect_consumers_mock = Mock(side_effect=HTTPError({}, status=500))...
origin_server_0.py
Source: origin_server_0.py
1from _thread import *2import socket3import sys4sys.path.insert(0, "../../")5import os 6import time7import sched8from termcolor import colored9from threading import Timer, Thread10import selectors11import constants12import copy13from Messages.Messages import *14DELETE_QUEUE = []15DELETE_QUEUE_CLOCK = 016SERVER_INDEX = 017IP, STORE_PORT = constants.ORIGIN_SERVERS_STORE_CREDENTIALS[SERVER_INDEX]18IP, REQUEST_PORT = constants.ORIGIN_SERVERS_REQUEST_CREDENTIALS[SERVER_INDEX]19IP, DELETE_PORT = constants.ORIGIN_SERVERS_DELETE_CREDENTIALS[SERVER_INDEX]20while(True):21 try:22 store_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 23 store_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)24 store_s.bind(('', STORE_PORT)) 25 store_s.listen(0)26 27 request_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 28 request_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)29 request_s.bind(('', REQUEST_PORT)) 30 request_s.listen(0)31 delete_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 32 delete_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)33 delete_s.bind(('', DELETE_PORT)) 34 delete_s.listen(0)35 print(colored(f"ORIGIN SERVER {SERVER_INDEX} INITIALISED SUCCESSFULLY", constants.SUCCESS))36 break37 except:38 print(colored(f"COULD NOT INITIALISE SERVER {SERVER_INDEX}. RETRYING..", constants.FAILURE))39 40def synchronise():41 global DELETE_QUEUE_CLOCK42 global DELETE_QUEUE43 while(True):44 IP, PORT = "localhost", constants.SYNC_PORT_245 try:46 sync_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)47 sync_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)48 sync_s.settimeout(constants.TIMEOUT_PERIOD)49 sync_s.connect((IP, PORT))50 print(colored(f"SYNCING CAPABILITIES WITH {IP} AT {PORT} INITIALISED", constants.SUCCESS))51 while(True):52 dqm_self = DeleteQueueMessage(DELETE_QUEUE, DELETE_QUEUE_CLOCK)53 dqm_self.send_q(sync_s)54 dqm_other = DeleteQueueMessage()55 dqm_other.receive_q(sync_s)56 if(dqm_other.DQ_CLOCK > dqm_self.DQ_CLOCK):57 DELETE_QUEUE = copy.deepcopy(dqm_other.DQ)58 59 time.sleep(1)60 curr_files = os.listdir()61 my_files = []62 for file in curr_files:63 if not file.endswith(".py") and file not in DELETE_QUEUE:64 my_files.append(file)65 print(my_files)66 n_1 = len(my_files)67 sync_s.send(str(n_1).encode())68 n_2 = int(sync_s.recv(1024).decode())69 for filename in my_files:70 fcm = FileContentMessage(filename)71 fcm.send_name(sync_s)72 status = fcm.receive_status(sync_s)73 if(status):74 pass75 else:76 fcm.send_file(sync_s)77 time.sleep(1)78 for i in range(n_2):79 fcm = FileContentMessage()80 fcm.receive_name(sync_s)81 if(fcm.checkExists()):82 fcm.send_status(sync_s)83 else:84 fcm.send_status(sync_s)85 fcm.receive_file(sync_s)86 time.sleep(1)87 print(colored(f"SYNCED SUCCESSFULLY!", constants.SUCCESS))88 time.sleep(constants.SYNCING_PERIOD)89 sync_s.close()90 except Exception as e:91 print(e)92 print(colored(f"UNABLE TO SYNC WITH {IP} AT {PORT}. RETRYING..", constants.FAILURE))93 try:94 sync_s.close()95 except:96 pass97 time.sleep(constants.SYNCING_PERIOD/8)98 99def store_content():100 global DELETE_QUEUE_CLOCK101 global DELETE_QUEUE102 fcm = FileContentMessage()103 while(True):104 try:105 store_c, store_addr = store_s.accept()106 fcm.receive_name(store_c)107 while(fcm.filename in DELETE_QUEUE):108 DELETE_QUEUE.remove(fcm.filename)109 DELETE_QUEUE_CLOCK = DELETE_QUEUE_CLOCK + 1110 fcm.receive_file(store_c)111 store_c.close()112 print(f"DATA STORED SUCCESSFULLY!")113 except Exception as e:114 print(e)115 print(colored(f"DATA COULD NOT BE STORED", constants.FAILURE))116def content_requests_handler():117 global DELETE_QUEUE_CLOCK118 global DELETE_QUEUE119 fcm = FileContentMessage()120 while(True):121 try:122 request_c, request_addr = request_s.accept()123 #print(colored(f"ACCEPTED REQUEST FOR DATA FROM {request_addr}", constants.SUCCESS))124 fcm.receive_name(request_c)125 print(colored(f"NAME OF FILE REQUESTED {fcm.filename}", constants.DEBUG))126 if(fcm.checkExists(DELETE_QUEUE) ):127 fcm.send_status(request_c)128 fcm.send_file(request_c)129 print(colored(f"REQUESTED FILE DELIVERED SUCCESSFULLY", constants.SUCCESS))130 else:131 fcm.send_status(request_c)132 print(colored(f"REQUESTED FILE NOT FOUND", constants.FAILURE))133 request_c.close()134 except:135 print(colored(f"CONNECTION ERROR. PLEASE TRY AGAIN..", constants.FAILURE))136def delete():137 global DELETE_QUEUE_CLOCK138 global DELETE_QUEUE139 fcm = FileContentMessage()140 while(True):141 try:142 delete_c, delete_addr = delete_s.accept()143 fcm.receive_name(delete_c)144 print(colored(f"NAME OF FILE DELETED: {fcm.filename}", constants.SUCCESS))145 DELETE_QUEUE.append(fcm.filename)146 DELETE_QUEUE_CLOCK = DELETE_QUEUE_CLOCK + 1147 except Exception as e:148 print(e)149 print(colored(f"FILE COULD NOT BE DELETED", constants.FAILURE))150def cleaner():151 global DELETE_QUEUE152 files = os.listdir()153 for file in DELETE_QUEUE:154 if file in files:155 os.remove(file)156 time.sleep(constants.TIMEOUT_PERIOD)157if __name__ == '__main__':158 159 threads = []160 161 t1 = Thread(target = store_content)162 threads.append(t1)163 t1.start()164 165 t2 = Thread(target = content_requests_handler)166 threads.append(t2)167 t2.start()168 t3 = Thread(target = synchronise)169 threads.append(t3)170 t3.start()171 t4 = Thread(target = delete)172 threads.append(t4)173 t4.start()174 t5 = Thread(target = cleaner)175 threads.append(t5)176 t5.start()177 for t in threads:...
lunchtime_2.py
Source: lunchtime_2.py
1# lunchtime_2.py2# Samsung SW Expert Academy3# https://swexpertacademy.com/main/code/problem/problemDetail.do?contestProbId=AV5-BEE6AK0DFAVl4def calc(stair_list, stair):5 count, d_count = 0, 06 delete_queue = []7 while stair_list or delete_queue or d_count:8 while d_count:9 if len(delete_queue) == 3:10 break11 delete_queue.append(stair[2])12 d_count -= 113 14 for i in range(len(delete_queue)-1, -1, -1):15 delete_queue[i] -= 116 if delete_queue[i] <= 0:17 delete_queue.pop(i)18 19 for i in range(len(stair_list)-1, -1, -1):20 stair_list[i] -= 121 if stair_list[i] <= 0:22 stair_list.pop(i)23 d_count += 124 count+=125 return count26def dfs(idx):27 if idx == Num:28 global min_count29 stair_list1, stair_list2 = [], []30 for i in range(Num):31 if check[i]:32 stair_list1.append(Peoples[i][0])33 else:34 stair_list2.append(Peoples[i][1])35 count = max(calc(sorted(stair_list1), Stairs[0]), calc(sorted(stair_list2), Stairs[1]))36 min_count = min(count, min_count)37 return38 check[idx] = False39 dfs(idx+1)40 check[idx] = True41 dfs(idx+1)42 43 44T = int(input())45for t in range(1, T+1):46 N = int(input())47 map_list = [list(map(int, input().split())) for _ in range(N)]48 Peoples, Stairs = [], []49 Num, min_count = 0, 98765432150 for i in range(N):51 for j in range(N):52 temp_num = map_list[i][j]53 if temp_num:54 if temp_num == 1:55 Num += 156 Peoples.append([i, j])57 else:58 Stairs.append([i, j, temp_num])59 for i in range(len(Peoples)):60 distance1 = abs(Peoples[i][0] - Stairs[0][0]) + abs(Peoples[i][1] - Stairs[0][1])61 distance2 = abs(Peoples[i][0] - Stairs[1][0]) + abs(Peoples[i][1] - Stairs[1][1])62 Peoples[i][0] = distance163 Peoples[i][1] = distance264 check = [False for _ in range(Num)]65 dfs(0)...
Check out the latest blogs from LambdaTest on this topic:
The fact is not alien to us anymore that cross browser testing is imperative to enhance your application’s user experience. Enhanced knowledge of popular and highly acclaimed testing frameworks goes a long way in developing a new app. It holds more significance if you are a full-stack developer or expert programmer.
QA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.
Having a good web design can empower business and make your brand stand out. According to a survey by Top Design Firms, 50% of users believe that website design is crucial to an organization’s overall brand. Therefore, businesses should prioritize website design to meet customer expectations and build their brand identity. Your website is the face of your business, so it’s important that it’s updated regularly as per the current web design trends.
Enterprise resource planning (ERP) is a form of business process management software—typically a suite of integrated applications—that assists a company in managing its operations, interpreting data, and automating various back-office processes. The introduction of a new ERP system is analogous to the introduction of a new product into the market. If the product is not handled appropriately, it will fail, resulting in significant losses for the business. Most significantly, the employees’ time, effort, and morale would suffer as a result of the procedure.
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!