How to use _topic_arn method in localstack

Best Python code snippet using localstack_python

message_action.py

Source: message_action.py Github

copy

Full Screen

1# Standard Library2import json3import os4# Third Party5import boto36# First Party7from smdebug.core.logger import get_logger8# action :9# {name:'sms' or 'email', 'endpoint':'phone or emailid'}10class MessageAction:11 def __init__(self, rule_name, message_type, message_endpoint):12 self._topic_name = "SMDebugRules"13 self._logger = get_logger()14 if message_type == "sms" or message_type == "email":15 self._protocol = message_type16 else:17 self._protocol = None18 self._logger.info(19 f"Unsupported message type:{message_type} in MessageAction. Returning"20 )21 return22 self._message_endpoint = message_endpoint23 # Below 2 is to help in tests24 self._last_send_mesg_response = None25 self._last_subscription_response = None26 env_region_name = os.getenv("AWS_REGION", "us-east-1")27 self._sns_client = boto3.client("sns", region_name=env_region_name)28 self._topic_arn = self._create_sns_topic_if_not_exists()29 self._subscribe_mesgtype_endpoint()30 self._logger.info(31 f"Registering messageAction with protocol:{self._protocol} endpoint:{self._message_endpoint} and topic_arn:{self._topic_arn} region:{env_region_name}"32 )33 self._rule_name = rule_name34 def _create_sns_topic_if_not_exists(self):35 try:36 topic = self._sns_client.create_topic(Name=self._topic_name)37 self._logger.info(38 f"topic_name: {self._topic_name} , creating topic returned response:{topic}"39 )40 if topic:41 return topic["TopicArn"]42 except Exception as e:43 self._logger.info(44 f"Caught exception while creating topic:{self._topic_name} exception is: \n {e}"45 )46 return None47 def _check_subscriptions(self, topic_arn, protocol, endpoint):48 try:49 next_token = "random"50 subs = self._sns_client.list_subscriptions()51 sub_array = subs["Subscriptions"]52 while next_token is not None:53 for sub_dict in sub_array:54 proto = sub_dict["Protocol"]55 ep = sub_dict["Endpoint"]56 topic = sub_dict["TopicArn"]57 if proto == protocol and topic == topic_arn and ep == endpoint:58 self._logger.info(f"Existing Subscription found: {sub_dict}")59 return True60 if "NextToken" in subs:61 next_token = subs["NextToken"]62 subs = self._sns_client.list_subscriptions(NextToken=next_token)63 sub_array = subs["Subscriptions"]64 continue65 else:66 next_token = None67 except Exception as e:68 self._logger.info(69 f"Caught exception while list subscription topic:{self._topic_name} exception is: \n {e}"70 )71 return False72 def _subscribe_mesgtype_endpoint(self):73 response = None74 try:75 if self._topic_arn and self._protocol and self._message_endpoint:76 filter_policy = {}77 if self._protocol == "sms":78 filter_policy["phone_num"] = [self._message_endpoint]79 else:80 filter_policy["email"] = [self._message_endpoint]81 if not self._check_subscriptions(82 self._topic_arn, self._protocol, self._message_endpoint83 ):84 response = self._sns_client.subscribe(85 TopicArn=self._topic_arn,86 Protocol=self._protocol, # sms or email87 Endpoint=self._message_endpoint, # phone number or email addresss88 Attributes={"FilterPolicy": json.dumps(filter_policy)},89 ReturnSubscriptionArn=False, # True means always return ARN90 )91 else:92 response = f"Subscription exists for topic:{self._topic_arn}, protocol:{self._protocol}, endpoint:{self._message_endpoint}"93 except Exception as e:94 self._logger.info(95 f"Caught exception while subscribing endpoint on topic:{self._topic_arn} exception is: \n {e}"96 )97 self._logger.info(f"response for sns subscribe is {response} ")98 self._last_subscription_response = response99 def _send_message(self, message):100 response = None101 message = f"SMDebugRule:{self._rule_name} fired. {message}"102 try:103 if self._protocol == "sms":104 msg_attributes = {105 "phone_num": {"DataType": "String", "StringValue": self._message_endpoint}106 }107 else:108 msg_attributes = {109 "email": {"DataType": "String", "StringValue": self._message_endpoint}110 }111 response = self._sns_client.publish(112 TopicArn=self._topic_arn,113 Message=message,114 Subject=f"SMDebugRule:{self._rule_name} fired",115 # MessageStructure='json',116 MessageAttributes=msg_attributes,117 )118 except Exception as e:119 self._logger.info(120 f"Caught exception while publishing message on topic:{self._topic_arn} exception is: \n {e}"121 )122 self._logger.info(f"Response of send message:{response}")123 self._last_send_mesg_response = response124 return response125 def invoke(self, message):...

Full Screen

Full Screen

sns.py

Source: sns.py Github

copy

Full Screen

1class SNSTopic(object):2 """ Helper for SNS Topic creation/​management.3 Working with SNS topics is kind of annoying by default. This provides a4 more simple interface for working with them.5 """6 def __init__(self, connection, topic_name):7 self._conn = connection8 self._topic_name = topic_name9 self._topic_arn = None10 def _setup_topic(self):11 if self._topic_arn:12 return13 conn = self._conn14 response = conn.create_topic(self._topic_name)['CreateTopicResponse']15 self._topic_arn = response['CreateTopicResult']['TopicArn']16 def publish(self, *args, **kwargs):17 self._setup_topic()18 return self._conn.publish(self._topic_arn, *args, **kwargs)19 def subscribe_sqs_queue(self, *args, **kwargs):20 self._setup_topic()21 response = self._conn.subscribe_sqs_queue(self._topic_arn,22 *args, **kwargs)23 response = response['SubscribeResponse']24 result = response['SubscribeResult']...

Full Screen

Full Screen

Blogs

Check out the latest blogs from LambdaTest on this topic:

13 Best Java Testing Frameworks For 2023

The fact is not alien to us anymore that cross browser testing is imperative to enhance your application’s user experience. Enhanced knowledge of popular and highly acclaimed testing frameworks goes a long way in developing a new app. It holds more significance if you are a full-stack developer or expert programmer.

QA Innovation – Using the senseshaping concept to discover customer needs

QA Innovation - Using the senseshaping concept to discover customer needsQA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.

Best 23 Web Design Trends To Follow In 2023

Having a good web design can empower business and make your brand stand out. According to a survey by Top Design Firms, 50% of users believe that website design is crucial to an organization’s overall brand. Therefore, businesses should prioritize website design to meet customer expectations and build their brand identity. Your website is the face of your business, so it’s important that it’s updated regularly as per the current web design trends.

Acquiring Employee Support for Change Management Implementation

Enterprise resource planning (ERP) is a form of business process management software—typically a suite of integrated applications—that assists a company in managing its operations, interpreting data, and automating various back-office processes. The introduction of a new ERP system is analogous to the introduction of a new product into the market. If the product is not handled appropriately, it will fail, resulting in significant losses for the business. Most significantly, the employees’ time, effort, and morale would suffer as a result of the procedure.

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful