Best Python code snippet using localstack_python
main.py
Source: main.py
...102 except Exception as err:103 # Publish error to IPC and MQTT on default error topic. 104 protocol = 'ipc_mqtt'105 err_msg = 'Exception in main process loop: {}'.format(err)106 self.publish_error(protocol, err_msg)107 108 finally:109 time.sleep(10)110 def default_message_handler(self, protocol, topic, message_id, status, route, message):111 '''112 This default message handler function is a route of last resort to handle 113 PubSub messages received by the SDK with a route value that does not 114 match any functions in the registered message_handler classes.115 In this example, we generate and publish an error message to both IPC and MQTT.116 You could instead handle this locally or just sliently discard messages here depending 117 on the application.118 '''119 # Publish error to IPC and MQTT on default error topic, will log locally as an error as well. 120 err_msg = 'Received message to unknown route / message handler: {} - message: {}'.format(route, message)121 self.publish_error(protocol, err_msg)122if __name__ == "__main__":123 try:124 # Parse config from component recipe into sys.argv[1] 125 ggv2_component_config = sys.argv[1]126 ggv2_component_config = json.loads(ggv2_component_config)127 128 log.info('GG PubSub SDK Config: {}'.format(ggv2_component_config))129 # Create the component class with the parsed Greengrass recipe config.130 ggv2_component = MyAwsGreengrassV2Component(ggv2_component_config)131 132 # Start the main process loop to hold up the process.133 ggv2_component.service_loop()134 except Exception as err:135 log.error('Exception occurred initialising component. ERROR MESSAGE: {}'.format(err))
test_gcs_pubsub.py
Source: test_gcs_pubsub.py
...19 subscriber.subscribe()20 publisher = GcsPublisher(address=gcs_server_addr)21 err1 = ErrorTableData(error_message="test error message 1")22 err2 = ErrorTableData(error_message="test error message 2")23 publisher.publish_error(b"aaa_id", err1)24 publisher.publish_error(b"bbb_id", err2)25 assert subscriber.poll() == (b"aaa_id", err1)26 assert subscriber.poll() == (b"bbb_id", err2)27 subscriber.close()28@pytest.mark.asyncio29@pytest.mark.parametrize(30 "ray_start_regular", [{31 "_system_config": {32 "gcs_grpc_based_pubsub": True33 }34 }],35 indirect=True)36async def test_aio_publish_and_subscribe_error_info(ray_start_regular):37 address_info = ray_start_regular38 gcs_server_addr = address_info["gcs_address"]39 subscriber = GcsAioErrorSubscriber(address=gcs_server_addr)40 await subscriber.subscribe()41 publisher = GcsAioPublisher(address=gcs_server_addr)42 err1 = ErrorTableData(error_message="test error message 1")43 err2 = ErrorTableData(error_message="test error message 2")44 await publisher.publish_error(b"aaa_id", err1)45 await publisher.publish_error(b"bbb_id", err2)46 assert await subscriber.poll() == (b"aaa_id", err1)47 assert await subscriber.poll() == (b"bbb_id", err2)48 await subscriber.close()49@pytest.mark.parametrize(50 "ray_start_regular", [{51 "_system_config": {52 "gcs_grpc_based_pubsub": True53 }54 }],55 indirect=True)56def test_publish_and_subscribe_logs(ray_start_regular):57 address_info = ray_start_regular58 gcs_server_addr = address_info["gcs_address"]59 subscriber = GcsLogSubscriber(address=gcs_server_addr)60 subscriber.subscribe()61 publisher = GcsPublisher(address=gcs_server_addr)62 log_batch = {63 "ip": "127.0.0.1",64 "pid": 1234,65 "job": "0001",66 "is_err": False,67 "lines": ["line 1", "line 2"],68 "actor_name": "test actor",69 "task_name": "test task",70 }71 publisher.publish_logs(log_batch)72 # PID is treated as string.73 log_batch["pid"] = "1234"74 assert subscriber.poll() == log_batch75 subscriber.close()76@pytest.mark.asyncio77@pytest.mark.parametrize(78 "ray_start_regular", [{79 "_system_config": {80 "gcs_grpc_based_pubsub": True81 }82 }],83 indirect=True)84async def test_aio_publish_and_subscribe_logs(ray_start_regular):85 address_info = ray_start_regular86 gcs_server_addr = address_info["gcs_address"]87 subscriber = GcsAioLogSubscriber(address=gcs_server_addr)88 await subscriber.subscribe()89 publisher = GcsAioPublisher(address=gcs_server_addr)90 log_batch = {91 "ip": "127.0.0.1",92 "pid": "gcs",93 "job": "0001",94 "is_err": False,95 "lines": ["line 1", "line 2"],96 "actor_name": "test actor",97 "task_name": "test task",98 }99 await publisher.publish_logs(log_batch)100 assert await subscriber.poll() == log_batch101 await subscriber.close()102@pytest.mark.parametrize(103 "ray_start_regular", [{104 "_system_config": {105 "gcs_grpc_based_pubsub": True106 }107 }],108 indirect=True)109def test_publish_and_subscribe_function_keys(ray_start_regular):110 address_info = ray_start_regular111 gcs_server_addr = address_info["gcs_address"]112 subscriber = GcsFunctionKeySubscriber(address=gcs_server_addr)113 subscriber.subscribe()114 publisher = GcsPublisher(address=gcs_server_addr)115 publisher.publish_function_key(b"111")116 publisher.publish_function_key(b"222")117 assert subscriber.poll() == b"111"118 assert subscriber.poll() == b"222"119 subscriber.close()120@pytest.mark.asyncio121@pytest.mark.parametrize(122 "ray_start_regular", [{123 "_system_config": {124 "gcs_grpc_based_pubsub": True125 }126 }],127 indirect=True)128async def test_aio_publish_and_subscribe_resource_usage(ray_start_regular):129 address_info = ray_start_regular130 gcs_server_addr = address_info["gcs_address"]131 subscriber = GcsAioResourceUsageSubscriber(address=gcs_server_addr)132 await subscriber.subscribe()133 publisher = GcsAioPublisher(address=gcs_server_addr)134 await publisher.publish_resource_usage("aaa_id", "{\"cpu\": 1}")135 await publisher.publish_resource_usage("bbb_id", "{\"cpu\": 2}")136 assert await subscriber.poll() == ("aaa_id", "{\"cpu\": 1}")137 assert await subscriber.poll() == ("bbb_id", "{\"cpu\": 2}")138 await subscriber.close()139@pytest.mark.parametrize(140 "ray_start_regular", [{141 "_system_config": {142 "gcs_grpc_based_pubsub": True143 }144 }],145 indirect=True)146def test_two_subscribers(ray_start_regular):147 """Tests concurrently subscribing to two channels work."""148 address_info = ray_start_regular149 gcs_server_addr = address_info["gcs_address"]150 num_messages = 100151 errors = []152 error_subscriber = GcsErrorSubscriber(address=gcs_server_addr)153 # Make sure subscription is registered before publishing starts.154 error_subscriber.subscribe()155 def receive_errors():156 while len(errors) < num_messages:157 _, msg = error_subscriber.poll()158 errors.append(msg)159 t1 = threading.Thread(target=receive_errors)160 t1.start()161 logs = []162 log_subscriber = GcsLogSubscriber(address=gcs_server_addr)163 # Make sure subscription is registered before publishing starts.164 log_subscriber.subscribe()165 def receive_logs():166 while len(logs) < num_messages:167 log_batch = log_subscriber.poll()168 logs.append(log_batch)169 t2 = threading.Thread(target=receive_logs)170 t2.start()171 publisher = GcsPublisher(address=gcs_server_addr)172 for i in range(0, num_messages):173 publisher.publish_error(174 b"msg_id", ErrorTableData(error_message=f"error {i}"))175 publisher.publish_logs({176 "ip": "127.0.0.1",177 "pid": "gcs",178 "job": "0001",179 "is_err": False,180 "lines": [f"log {i}"],181 "actor_name": "test actor",182 "task_name": "test task",183 })184 t1.join(timeout=10)185 assert len(errors) == num_messages, str(errors)186 assert not t1.is_alive(), str(errors)187 t2.join(timeout=10)...
CtrlNode.py
Source: CtrlNode.py
...32##33def start_scan():34 """ Starts the search for the cup """35 if current_state not in ("IDLE", "NO_CUP"):36 publish_error("Cannot initiate room scan from state '%s'" % current_state)37 else:38 change_state("ROOM_SCAN")39 room_scanner_publisher.publish(RoomScannerCommand(None, "start", True))40def start_approach(room_scanner_result):41 """42 Start approaching the cup. Above a certain distance, we do this by going half the distance and re-adjusting43 the direction. Below that distance, we go straight.44 :return: None45 """46 global walk_publisher47 angle = room_scanner_result.angle48 distance = room_scanner_result.distance49 if distance > STRAIGHT_WALK_MIN_DIST:50 distance /= 251 change_state("APPROACHING_CUP")52 else:53 change_state("ARRIVING_AT_CUP")54 walk_publisher.publish(WalkCommand(None, angle, distance))55def start_bearing_readjustment():56 """57 Finished a leg on our way to the cup. Now re-adjust the robot's bearing to still have the cup58 straight ahead.59 """60 change_state("READJUSTING_BEARING")61 room_scanner_publisher.publish(RoomScannerCommand(None, "start", False))62def stop():63 """64 Stop the robot from doing the task, in an orderly manner.65 """66 global current_state67 if (current_state == "ROOM_SCAN"):68 room_scanner_publisher.publish(RoomScannerCommand(None, "stop", False))69 # TODO send stop message to the proper node, for other states.70 change_state("IDLE")71def abort():72 """73 Admit that while we did recognize a cup, we couldn't get to it.74 """75 stop()76 change_state("MISSION_FAIL")77##78# Callbacks79##80def scan_callback(room_scanner_result):81 if current_state == "ROOM_SCAN":82 if room_scanner_result.cost > COST_THRESHOLD:83 change_state("NO_CUP")84 else:85 start_approach(room_scanner_result)86 elif current_state == "READJUSTING_BEARING":87 if room_scanner_result.cost > COST_THRESHOLD:88 publish_error("Lost track of cup")89 abort()90 else:91 start_approach(room_scanner_result)92def walk_callback(walk_result):93 if current_state == "APPROACHING_CUP":94 if walk_result.result:95 start_bearing_readjustment()96 else:97 publish_error("Walk module failed to advance.")98 abort()99 elif current_state == "ARRIVING_AT_CUP":100 change_state("AT_CUP") # Success!101 else:102 publish_error("Got a walk result while not walking. Weird. Ignoring.")103def command_callback(command_msg):104 """105 Got a command from the Cli106 """107 global room_scanner_publisher108 command = command_msg.command109 rospy.loginfo("command %s", command)110 if command == "scan":111 start_scan()112 elif command == "stop":113 stop()114 else:115 publish_error("bad command: '%s'" % command)116##117# Internal methods118##119def change_state(new_state):120 """121 Changes the state of the node, and publishes it over the topic.122 :param new_state: new stat the robot is in123 :return: None124 """125 global current_state126 current_state = new_state127 rospy.loginfo("current state: %s" % current_state)128 status_publisher.publish(CtrlStatus(None, "state_change", current_state))129def publish_error(message):130 """131 publishes an error message to the status topic132 """133 status_publisher.publish(CtrlStatus(None, "error", message))134if __name__ == '__main__':135 try:136 rospy.loginfo("Starting the controller node")137 rospy.init_node("CupinatorCtrl", anonymous=False)138 status_publisher = rospy.Publisher("/cupinator/ctrl/status", CtrlStatus, queue_size=10)139 rospy.Subscriber("/cupinator/ctrl/command", CtrlCommand, command_callback)140 walk_publisher = rospy.Publisher("/cupinator/walk/command", WalkCommand, queue_size=5)141 rospy.Subscriber("/cupinator/room_scanner/result", RoomScannerResult, scan_callback)142 room_scanner_publisher = rospy.Publisher("/cupinator/room_scanner/command", RoomScannerCommand, queue_size=5)143 rospy.Subscriber("/cupinator/walk/result", WalkResult, walk_callback)...
Check out the latest blogs from LambdaTest on this topic:
The fact is not alien to us anymore that cross browser testing is imperative to enhance your application’s user experience. Enhanced knowledge of popular and highly acclaimed testing frameworks goes a long way in developing a new app. It holds more significance if you are a full-stack developer or expert programmer.
QA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.
Having a good web design can empower business and make your brand stand out. According to a survey by Top Design Firms, 50% of users believe that website design is crucial to an organization’s overall brand. Therefore, businesses should prioritize website design to meet customer expectations and build their brand identity. Your website is the face of your business, so it’s important that it’s updated regularly as per the current web design trends.
Enterprise resource planning (ERP) is a form of business process management software—typically a suite of integrated applications—that assists a company in managing its operations, interpreting data, and automating various back-office processes. The introduction of a new ERP system is analogous to the introduction of a new product into the market. If the product is not handled appropriately, it will fail, resulting in significant losses for the business. Most significantly, the employees’ time, effort, and morale would suffer as a result of the procedure.
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!