How to use _create_message_attributes method in localstack

Best Python code snippet using localstack_python

cloud_console_client.py

Source: cloud_console_client.py Github

copy

Full Screen

...111 message['data'] = data112 if msg_attributes:113 message['attributes'] = msg_attributes114 return message115 def _create_message_attributes(self, message_type_enum):116 """Creates a cloud pubsub notification message attribute map.117 Fills in the version, moblab mac address, and moblab id information118 as attributes.119 @param message_type_enum The message type enum.120 @returns: A pubsub messsage attribute map.121 """122 msg_attributes = {}123 msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_TYPE)] = (124 _get_message_type_name(message_type_enum))125 msg_attributes[_get_attribute_name(cpcon.ATTR_MESSAGE_VERSION)] = (126 CURRENT_MESSAGE_VERSION)127 msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_MAC_ADDRESS)] = (128 utils.get_moblab_serial_number())129 msg_attributes[_get_attribute_name(cpcon.ATTR_MOBLAB_ID)] = (130 utils.get_moblab_id())131 return msg_attributes132 def _create_test_job_offloaded_message(self, gcs_uri):133 """Construct a test result notification.134 TODO(ntang): switch LEGACY to new message format.135 @param gcs_uri: The test result Google Cloud Storage URI.136 @returns The notification message.137 """138 data = base64.b64encode(LEGACY_TEST_OFFLOAD_MESSAGE)139 msg_attributes = {}140 msg_attributes[LEGACY_ATTR_VERSION] = CURRENT_MESSAGE_VERSION141 msg_attributes[LEGACY_ATTR_MOBLAB_MAC] = (142 utils.get_moblab_serial_number())143 msg_attributes[LEGACY_ATTR_MOBLAB_ID] = utils.get_moblab_id()144 msg_attributes[LEGACY_ATTR_GCS_URI] = gcs_uri145 return self._create_message(data, msg_attributes)146 def send_test_job_offloaded_message(self, gcs_uri):147 """Notify the cloud console a test job is offloaded.148 @param gcs_uri: The test result Google Cloud Storage URI.149 @returns True if the notification is successfully sent.150 Otherwise, False.151 """152 logging.info('Notification on gcs_uri %s', gcs_uri)153 message = self._create_test_job_offloaded_message(gcs_uri)154 return self._publish_notification(message)155 def _publish_notification(self, message):156 msg_ids = self._pubsub_client.publish_notifications(157 self._pubsub_topic, [message])158 if msg_ids:159 logging.debug('Successfully sent out a notification')160 return True161 logging.warning('Failed to send notification %s', str(message))162 return False163 def send_heartbeat(self):164 """Sends a heartbeat.165 @returns True if the heartbeat notification is successfully sent.166 Otherwise, False.167 """168 logging.info('Sending a heartbeat')169 event = cpcon.Heartbeat()170 # Don't sent local timestamp for now.171 data = event.SerializeToString()172 try:173 attributes = self._create_message_attributes(174 cpcon.MSG_MOBLAB_HEARTBEAT)175 message = self._create_message(data, attributes)176 except ValueError:177 logging.exception('Failed to create message.')178 return False179 return self._publish_notification(message)180 def send_event(self, event_type=None, event_data=None):181 """Sends an event notification to the remote console.182 @param event_type: The event type that is defined in the protobuffer183 file 'cloud_console.proto'.184 @param event_data: The event data.185 @returns True if the notification is successfully sent.186 Otherwise, False.187 """188 logging.info('Send an event.')189 if not event_type:190 logging.info('Failed to send event without a type.')191 return False192 event = cpcon.RemoteEventMessage()193 if event_data:194 event.data = event_data195 else:196 event.data = ''197 event.type = event_type198 data = event.SerializeToString()199 try:200 attributes = self._create_message_attributes(201 cpcon.MSG_MOBLAB_REMOTE_EVENT)202 message = self._create_message(data, attributes)203 except ValueError:204 logging.exception('Failed to create message.')205 return False...

Full Screen

Full Screen

Blogs

Check out the latest blogs from LambdaTest on this topic:

13 Best Java Testing Frameworks For 2023

The fact is not alien to us anymore that cross browser testing is imperative to enhance your application’s user experience. Enhanced knowledge of popular and highly acclaimed testing frameworks goes a long way in developing a new app. It holds more significance if you are a full-stack developer or expert programmer.

QA Innovation – Using the senseshaping concept to discover customer needs

QA Innovation - Using the senseshaping concept to discover customer needsQA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.

Best 23 Web Design Trends To Follow In 2023

Having a good web design can empower business and make your brand stand out. According to a survey by Top Design Firms, 50% of users believe that website design is crucial to an organization’s overall brand. Therefore, businesses should prioritize website design to meet customer expectations and build their brand identity. Your website is the face of your business, so it’s important that it’s updated regularly as per the current web design trends.

Acquiring Employee Support for Change Management Implementation

Enterprise resource planning (ERP) is a form of business process management software—typically a suite of integrated applications—that assists a company in managing its operations, interpreting data, and automating various back-office processes. The introduction of a new ERP system is analogous to the introduction of a new product into the market. If the product is not handled appropriately, it will fail, resulting in significant losses for the business. Most significantly, the employees’ time, effort, and morale would suffer as a result of the procedure.

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful