Best Python code snippet using localstack_python
message_action.py
Source:message_action.py
1# Standard Library2import json3import os4# Third Party5import boto36# First Party7from smdebug.core.logger import get_logger8# action :9# {name:'sms' or 'email', 'endpoint':'phone or emailid'}10class MessageAction:11 def __init__(self, rule_name, message_type, message_endpoint):12 self._topic_name = "SMDebugRules"13 self._logger = get_logger()14 if message_type == "sms" or message_type == "email":15 self._protocol = message_type16 else:17 self._protocol = None18 self._logger.info(19 f"Unsupported message type:{message_type} in MessageAction. Returning"20 )21 return22 self._message_endpoint = message_endpoint23 # Below 2 is to help in tests24 self._last_send_mesg_response = None25 self._last_subscription_response = None26 env_region_name = os.getenv("AWS_REGION", "us-east-1")27 self._sns_client = boto3.client("sns", region_name=env_region_name)28 self._topic_arn = self._create_sns_topic_if_not_exists()29 self._subscribe_mesgtype_endpoint()30 self._logger.info(31 f"Registering messageAction with protocol:{self._protocol} endpoint:{self._message_endpoint} and topic_arn:{self._topic_arn} region:{env_region_name}"32 )33 self._rule_name = rule_name34 def _create_sns_topic_if_not_exists(self):35 try:36 topic = self._sns_client.create_topic(Name=self._topic_name)37 self._logger.info(38 f"topic_name: {self._topic_name} , creating topic returned response:{topic}"39 )40 if topic:41 return topic["TopicArn"]42 except Exception as e:43 self._logger.info(44 f"Caught exception while creating topic:{self._topic_name} exception is: \n {e}"45 )46 return None47 def _check_subscriptions(self, topic_arn, protocol, endpoint):48 try:49 next_token = "random"50 subs = self._sns_client.list_subscriptions()51 sub_array = subs["Subscriptions"]52 while next_token is not None:53 for sub_dict in sub_array:54 proto = sub_dict["Protocol"]55 ep = sub_dict["Endpoint"]56 topic = sub_dict["TopicArn"]57 if proto == protocol and topic == topic_arn and ep == endpoint:58 self._logger.info(f"Existing Subscription found: {sub_dict}")59 return True60 if "NextToken" in subs:61 next_token = subs["NextToken"]62 subs = self._sns_client.list_subscriptions(NextToken=next_token)63 sub_array = subs["Subscriptions"]64 continue65 else:66 next_token = None67 except Exception as e:68 self._logger.info(69 f"Caught exception while list subscription topic:{self._topic_name} exception is: \n {e}"70 )71 return False72 def _subscribe_mesgtype_endpoint(self):73 response = None74 try:75 if self._topic_arn and self._protocol and self._message_endpoint:76 filter_policy = {}77 if self._protocol == "sms":78 filter_policy["phone_num"] = [self._message_endpoint]79 else:80 filter_policy["email"] = [self._message_endpoint]81 if not self._check_subscriptions(82 self._topic_arn, self._protocol, self._message_endpoint83 ):84 response = self._sns_client.subscribe(85 TopicArn=self._topic_arn,86 Protocol=self._protocol, # sms or email87 Endpoint=self._message_endpoint, # phone number or email addresss88 Attributes={"FilterPolicy": json.dumps(filter_policy)},89 ReturnSubscriptionArn=False, # True means always return ARN90 )91 else:92 response = f"Subscription exists for topic:{self._topic_arn}, protocol:{self._protocol}, endpoint:{self._message_endpoint}"93 except Exception as e:94 self._logger.info(95 f"Caught exception while subscribing endpoint on topic:{self._topic_arn} exception is: \n {e}"96 )97 self._logger.info(f"response for sns subscribe is {response} ")98 self._last_subscription_response = response99 def _send_message(self, message):100 response = None101 message = f"SMDebugRule:{self._rule_name} fired. {message}"102 try:103 if self._protocol == "sms":104 msg_attributes = {105 "phone_num": {"DataType": "String", "StringValue": self._message_endpoint}106 }107 else:108 msg_attributes = {109 "email": {"DataType": "String", "StringValue": self._message_endpoint}110 }111 response = self._sns_client.publish(112 TopicArn=self._topic_arn,113 Message=message,114 Subject=f"SMDebugRule:{self._rule_name} fired",115 # MessageStructure='json',116 MessageAttributes=msg_attributes,117 )118 except Exception as e:119 self._logger.info(120 f"Caught exception while publishing message on topic:{self._topic_arn} exception is: \n {e}"121 )122 self._logger.info(f"Response of send message:{response}")123 self._last_send_mesg_response = response124 return response125 def invoke(self, message):...
sns.py
Source:sns.py
1class SNSTopic(object):2 """ Helper for SNS Topic creation/management.3 Working with SNS topics is kind of annoying by default. This provides a4 more simple interface for working with them.5 """6 def __init__(self, connection, topic_name):7 self._conn = connection8 self._topic_name = topic_name9 self._topic_arn = None10 def _setup_topic(self):11 if self._topic_arn:12 return13 conn = self._conn14 response = conn.create_topic(self._topic_name)['CreateTopicResponse']15 self._topic_arn = response['CreateTopicResult']['TopicArn']16 def publish(self, *args, **kwargs):17 self._setup_topic()18 return self._conn.publish(self._topic_arn, *args, **kwargs)19 def subscribe_sqs_queue(self, *args, **kwargs):20 self._setup_topic()21 response = self._conn.subscribe_sqs_queue(self._topic_arn,22 *args, **kwargs)23 response = response['SubscribeResponse']24 result = response['SubscribeResult']...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!