Best Python code snippet using localstack_python
main.py
Source:main.py
...102 except Exception as err:103 # Publish error to IPC and MQTT on default error topic. 104 protocol = 'ipc_mqtt'105 err_msg = 'Exception in main process loop: {}'.format(err)106 self.publish_error(protocol, err_msg)107 108 finally:109 time.sleep(10)110 def default_message_handler(self, protocol, topic, message_id, status, route, message):111 '''112 This default message handler function is a route of last resort to handle 113 PubSub messages received by the SDK with a route value that does not 114 match any functions in the registered message_handler classes.115 In this example, we generate and publish an error message to both IPC and MQTT.116 You could instead handle this locally or just sliently discard messages here depending 117 on the application.118 '''119 # Publish error to IPC and MQTT on default error topic, will log locally as an error as well. 120 err_msg = 'Received message to unknown route / message handler: {} - message: {}'.format(route, message)121 self.publish_error(protocol, err_msg)122if __name__ == "__main__":123 try:124 # Parse config from component recipe into sys.argv[1] 125 ggv2_component_config = sys.argv[1]126 ggv2_component_config = json.loads(ggv2_component_config)127 128 log.info('GG PubSub SDK Config: {}'.format(ggv2_component_config))129 # Create the component class with the parsed Greengrass recipe config.130 ggv2_component = MyAwsGreengrassV2Component(ggv2_component_config)131 132 # Start the main process loop to hold up the process.133 ggv2_component.service_loop()134 except Exception as err:135 log.error('Exception occurred initialising component. ERROR MESSAGE: {}'.format(err))
test_gcs_pubsub.py
Source:test_gcs_pubsub.py
...19 subscriber.subscribe()20 publisher = GcsPublisher(address=gcs_server_addr)21 err1 = ErrorTableData(error_message="test error message 1")22 err2 = ErrorTableData(error_message="test error message 2")23 publisher.publish_error(b"aaa_id", err1)24 publisher.publish_error(b"bbb_id", err2)25 assert subscriber.poll() == (b"aaa_id", err1)26 assert subscriber.poll() == (b"bbb_id", err2)27 subscriber.close()28@pytest.mark.asyncio29@pytest.mark.parametrize(30 "ray_start_regular", [{31 "_system_config": {32 "gcs_grpc_based_pubsub": True33 }34 }],35 indirect=True)36async def test_aio_publish_and_subscribe_error_info(ray_start_regular):37 address_info = ray_start_regular38 gcs_server_addr = address_info["gcs_address"]39 subscriber = GcsAioErrorSubscriber(address=gcs_server_addr)40 await subscriber.subscribe()41 publisher = GcsAioPublisher(address=gcs_server_addr)42 err1 = ErrorTableData(error_message="test error message 1")43 err2 = ErrorTableData(error_message="test error message 2")44 await publisher.publish_error(b"aaa_id", err1)45 await publisher.publish_error(b"bbb_id", err2)46 assert await subscriber.poll() == (b"aaa_id", err1)47 assert await subscriber.poll() == (b"bbb_id", err2)48 await subscriber.close()49@pytest.mark.parametrize(50 "ray_start_regular", [{51 "_system_config": {52 "gcs_grpc_based_pubsub": True53 }54 }],55 indirect=True)56def test_publish_and_subscribe_logs(ray_start_regular):57 address_info = ray_start_regular58 gcs_server_addr = address_info["gcs_address"]59 subscriber = GcsLogSubscriber(address=gcs_server_addr)60 subscriber.subscribe()61 publisher = GcsPublisher(address=gcs_server_addr)62 log_batch = {63 "ip": "127.0.0.1",64 "pid": 1234,65 "job": "0001",66 "is_err": False,67 "lines": ["line 1", "line 2"],68 "actor_name": "test actor",69 "task_name": "test task",70 }71 publisher.publish_logs(log_batch)72 # PID is treated as string.73 log_batch["pid"] = "1234"74 assert subscriber.poll() == log_batch75 subscriber.close()76@pytest.mark.asyncio77@pytest.mark.parametrize(78 "ray_start_regular", [{79 "_system_config": {80 "gcs_grpc_based_pubsub": True81 }82 }],83 indirect=True)84async def test_aio_publish_and_subscribe_logs(ray_start_regular):85 address_info = ray_start_regular86 gcs_server_addr = address_info["gcs_address"]87 subscriber = GcsAioLogSubscriber(address=gcs_server_addr)88 await subscriber.subscribe()89 publisher = GcsAioPublisher(address=gcs_server_addr)90 log_batch = {91 "ip": "127.0.0.1",92 "pid": "gcs",93 "job": "0001",94 "is_err": False,95 "lines": ["line 1", "line 2"],96 "actor_name": "test actor",97 "task_name": "test task",98 }99 await publisher.publish_logs(log_batch)100 assert await subscriber.poll() == log_batch101 await subscriber.close()102@pytest.mark.parametrize(103 "ray_start_regular", [{104 "_system_config": {105 "gcs_grpc_based_pubsub": True106 }107 }],108 indirect=True)109def test_publish_and_subscribe_function_keys(ray_start_regular):110 address_info = ray_start_regular111 gcs_server_addr = address_info["gcs_address"]112 subscriber = GcsFunctionKeySubscriber(address=gcs_server_addr)113 subscriber.subscribe()114 publisher = GcsPublisher(address=gcs_server_addr)115 publisher.publish_function_key(b"111")116 publisher.publish_function_key(b"222")117 assert subscriber.poll() == b"111"118 assert subscriber.poll() == b"222"119 subscriber.close()120@pytest.mark.asyncio121@pytest.mark.parametrize(122 "ray_start_regular", [{123 "_system_config": {124 "gcs_grpc_based_pubsub": True125 }126 }],127 indirect=True)128async def test_aio_publish_and_subscribe_resource_usage(ray_start_regular):129 address_info = ray_start_regular130 gcs_server_addr = address_info["gcs_address"]131 subscriber = GcsAioResourceUsageSubscriber(address=gcs_server_addr)132 await subscriber.subscribe()133 publisher = GcsAioPublisher(address=gcs_server_addr)134 await publisher.publish_resource_usage("aaa_id", "{\"cpu\": 1}")135 await publisher.publish_resource_usage("bbb_id", "{\"cpu\": 2}")136 assert await subscriber.poll() == ("aaa_id", "{\"cpu\": 1}")137 assert await subscriber.poll() == ("bbb_id", "{\"cpu\": 2}")138 await subscriber.close()139@pytest.mark.parametrize(140 "ray_start_regular", [{141 "_system_config": {142 "gcs_grpc_based_pubsub": True143 }144 }],145 indirect=True)146def test_two_subscribers(ray_start_regular):147 """Tests concurrently subscribing to two channels work."""148 address_info = ray_start_regular149 gcs_server_addr = address_info["gcs_address"]150 num_messages = 100151 errors = []152 error_subscriber = GcsErrorSubscriber(address=gcs_server_addr)153 # Make sure subscription is registered before publishing starts.154 error_subscriber.subscribe()155 def receive_errors():156 while len(errors) < num_messages:157 _, msg = error_subscriber.poll()158 errors.append(msg)159 t1 = threading.Thread(target=receive_errors)160 t1.start()161 logs = []162 log_subscriber = GcsLogSubscriber(address=gcs_server_addr)163 # Make sure subscription is registered before publishing starts.164 log_subscriber.subscribe()165 def receive_logs():166 while len(logs) < num_messages:167 log_batch = log_subscriber.poll()168 logs.append(log_batch)169 t2 = threading.Thread(target=receive_logs)170 t2.start()171 publisher = GcsPublisher(address=gcs_server_addr)172 for i in range(0, num_messages):173 publisher.publish_error(174 b"msg_id", ErrorTableData(error_message=f"error {i}"))175 publisher.publish_logs({176 "ip": "127.0.0.1",177 "pid": "gcs",178 "job": "0001",179 "is_err": False,180 "lines": [f"log {i}"],181 "actor_name": "test actor",182 "task_name": "test task",183 })184 t1.join(timeout=10)185 assert len(errors) == num_messages, str(errors)186 assert not t1.is_alive(), str(errors)187 t2.join(timeout=10)...
CtrlNode.py
Source:CtrlNode.py
...32##33def start_scan():34 """ Starts the search for the cup """35 if current_state not in ("IDLE", "NO_CUP"):36 publish_error("Cannot initiate room scan from state '%s'" % current_state)37 else:38 change_state("ROOM_SCAN")39 room_scanner_publisher.publish(RoomScannerCommand(None, "start", True))40def start_approach(room_scanner_result):41 """42 Start approaching the cup. Above a certain distance, we do this by going half the distance and re-adjusting43 the direction. Below that distance, we go straight.44 :return: None45 """46 global walk_publisher47 angle = room_scanner_result.angle48 distance = room_scanner_result.distance49 if distance > STRAIGHT_WALK_MIN_DIST:50 distance /= 251 change_state("APPROACHING_CUP")52 else:53 change_state("ARRIVING_AT_CUP")54 walk_publisher.publish(WalkCommand(None, angle, distance))55def start_bearing_readjustment():56 """57 Finished a leg on our way to the cup. Now re-adjust the robot's bearing to still have the cup58 straight ahead.59 """60 change_state("READJUSTING_BEARING")61 room_scanner_publisher.publish(RoomScannerCommand(None, "start", False))62def stop():63 """64 Stop the robot from doing the task, in an orderly manner.65 """66 global current_state67 if (current_state == "ROOM_SCAN"):68 room_scanner_publisher.publish(RoomScannerCommand(None, "stop", False))69 # TODO send stop message to the proper node, for other states.70 change_state("IDLE")71def abort():72 """73 Admit that while we did recognize a cup, we couldn't get to it.74 """75 stop()76 change_state("MISSION_FAIL")77##78# Callbacks79##80def scan_callback(room_scanner_result):81 if current_state == "ROOM_SCAN":82 if room_scanner_result.cost > COST_THRESHOLD:83 change_state("NO_CUP")84 else:85 start_approach(room_scanner_result)86 elif current_state == "READJUSTING_BEARING":87 if room_scanner_result.cost > COST_THRESHOLD:88 publish_error("Lost track of cup")89 abort()90 else:91 start_approach(room_scanner_result)92def walk_callback(walk_result):93 if current_state == "APPROACHING_CUP":94 if walk_result.result:95 start_bearing_readjustment()96 else:97 publish_error("Walk module failed to advance.")98 abort()99 elif current_state == "ARRIVING_AT_CUP":100 change_state("AT_CUP") # Success!101 else:102 publish_error("Got a walk result while not walking. Weird. Ignoring.")103def command_callback(command_msg):104 """105 Got a command from the Cli106 """107 global room_scanner_publisher108 command = command_msg.command109 rospy.loginfo("command %s", command)110 if command == "scan":111 start_scan()112 elif command == "stop":113 stop()114 else:115 publish_error("bad command: '%s'" % command)116##117# Internal methods118##119def change_state(new_state):120 """121 Changes the state of the node, and publishes it over the topic.122 :param new_state: new stat the robot is in123 :return: None124 """125 global current_state126 current_state = new_state127 rospy.loginfo("current state: %s" % current_state)128 status_publisher.publish(CtrlStatus(None, "state_change", current_state))129def publish_error(message):130 """131 publishes an error message to the status topic132 """133 status_publisher.publish(CtrlStatus(None, "error", message))134if __name__ == '__main__':135 try:136 rospy.loginfo("Starting the controller node")137 rospy.init_node("CupinatorCtrl", anonymous=False)138 status_publisher = rospy.Publisher("/cupinator/ctrl/status", CtrlStatus, queue_size=10)139 rospy.Subscriber("/cupinator/ctrl/command", CtrlCommand, command_callback)140 walk_publisher = rospy.Publisher("/cupinator/walk/command", WalkCommand, queue_size=5)141 rospy.Subscriber("/cupinator/room_scanner/result", RoomScannerResult, scan_callback)142 room_scanner_publisher = rospy.Publisher("/cupinator/room_scanner/command", RoomScannerCommand, queue_size=5)143 rospy.Subscriber("/cupinator/walk/result", WalkResult, walk_callback)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!