Best Python code snippet using localstack_python
register.py
Source:register.py
1import boto3, json, time, os, logging, botocore, requests2from botocore.exceptions import ClientError3logger = logging.getLogger()4logger.setLevel(logging.INFO)5logging.getLogger('boto3').setLevel(logging.CRITICAL)6logging.getLogger('botocore').setLevel(logging.CRITICAL)7session = boto3.Session()8def message_processing(messages):9 target_stackset = {}10 for message in messages:11 payload = json.loads(message['Sns']['Message'])12 stackset_check(payload)13def stackset_check(messages):14 cloudFormationClient = session.client('cloudformation')15 sqsClient = session.client('sqs')16 snsClient = session.client('sns')17 newRelicRegisterSNS = os.environ['newRelicRegisterSNS']18 newRelicDLQ = os.environ['newRelicDLQ']19 20 for stackSetName, params in messages.items():21 logger.info("Checking stack set instances: {} {}".format(stackSetName, params['OperationId']))22 try:23 stackset_status = cloudFormationClient.describe_stack_set_operation(24 StackSetName=stackSetName,25 OperationId=params['OperationId']26 )27 if 'StackSetOperation' in stackset_status:28 if stackset_status['StackSetOperation']['Status'] in ['RUNNING','STOPPING','QUEUED',]:29 logger.info("Stackset operation still running")30 messageBody = {}31 messageBody[stackSetName] = {'OperationId': params['OperationId']}32 try:33 logger.info("Sleep and wait for 20 seconds")34 time.sleep(20)35 snsResponse = snsClient.publish(36 TopicArn=newRelicRegisterSNS,37 Message = json.dumps(messageBody))38 logger.info("Re-queued for registration: {}".format(snsResponse))39 except Exception as snsException:40 logger.error("Failed to send queue for registration: {}".format(snsException))41 42 elif stackset_status['StackSetOperation']['Status'] in ['SUCCEEDED']:43 logger.info("Start registration")44 cloudFormationPaginator = cloudFormationClient.get_paginator('list_stack_set_operation_results')45 stackset_iterator = cloudFormationPaginator.paginate(46 StackSetName=stackSetName,47 OperationId=params['OperationId']48 )49 50 newRelicSecret = os.environ['newRelicSecret']51 newRelicAccId = os.environ['newRelicAccId']52 newRelicAccessKey = get_secret_value(newRelicSecret)53 newRelicIntegrationList = newrelic_get_schema(newRelicAccessKey)54 55 if newRelicAccessKey:56 for page in stackset_iterator:57 if 'Summaries' in page:58 for operation in page['Summaries']:59 if operation['Status'] in ('SUCCEEDED'):60 targetAccount = operation['Account']61 newrelic_registration(targetAccount, newRelicAccessKey, newRelicAccId, newRelicIntegrationList)62 63 elif stackset_status['StackSetOperation']['Status'] in ['FAILED','STOPPED']:64 logger.warning("Stackset operation failed/stopped")65 messageBody = {}66 messageBody[stackSetName] = {'OperationId': params['OperationId']}67 try:68 sqsResponse = sqsClient.send_message(69 QueueUrl=newRelicDLQ,70 MessageBody=json.dumps(messageBody))71 logger.info("Sent to DLQ: {}".format(sqsResponse))72 except Exception as sqsException:73 logger.error("Failed to send to DLQ: {}".format(sqsException))74 75 except Exception as e:76 logger.error(e)77def get_secret_value(secret_arn):78 secretClient = session.client('secretsmanager')79 try:80 secret_response = secretClient.get_secret_value(81 SecretId=secret_arn82 )83 if 'SecretString' in secret_response:84 secret = json.loads(secret_response['SecretString'])['AccessKey']85 return secret 86 87 except Exception as e:88 logger.error('Get Secret Failed: ' + str(e))89 90def newrelic_registration(aws_account_id, access_key, newrelic_account_id, newrelic_integration_list):91 role_arn = 'arn:aws:iam::{}:role/NewRelicIntegrationRole_{}'.format(aws_account_id, newrelic_account_id)92 nerdGraphEndPoint = os.environ['nerdGraphEndPoint']93 94 link_payload = '''95 mutation 96 {{97 cloudLinkAccount(accountId: {0}, accounts: 98 {{99 aws: [100 {{101 name: "{1}", 102 arn: "{2}"103 }}]104 }}) 105 {{106 linkedAccounts 107 {{108 id name authLabel109 }}110 errors 111 {{112 type113 message114 linkedAccountId115 }}116 }}117 }}118 '''.format(newrelic_account_id, aws_account_id, role_arn)119 logger.debug('NerdGraph link account payload : {}'.format(json.dumps(link_payload)))120 121 response = requests.post(nerdGraphEndPoint, headers={'API-Key': access_key}, verify=True, data=link_payload)122 logger.info('NerdGraph response code : {}'.format(response.status_code))123 logger.info('NerdGraph response : {}'.format(response.text))124 if response.status_code == 200:125 link_response = json.loads(response.text)126 127 try:128 link_accound_id = link_response['data']['cloudLinkAccount']['linkedAccounts'][0]['id']129 service_payload = []130 for service in newrelic_integration_list:131 service_payload.append('{0}: [{{ linkedAccountId: {1} }}]'.format(service, link_accound_id))132 133 integration_payload = '''134 mutation 135 {{136 cloudConfigureIntegration (137 accountId: {0},138 integrations: 139 {{140 aws: 141 {{142 {1}143 }}144 }} 145 ) 146 {{147 integrations 148 {{149 id150 name151 service 152 {{153 id 154 name155 }}156 }}157 errors 158 {{159 type160 message161 }}162 }}163 }}164 '''.format(newrelic_account_id, '\n'.join(service_payload))165 logger.debug('NerdGraph integration payload : {}'.format(json.dumps(integration_payload)))166 integration_response = requests.post(nerdGraphEndPoint, headers={'API-Key': access_key}, verify=True, data=integration_payload)167 logger.info('NerdGraph integration response code : {}'.format(integration_response.status_code))168 logger.info('NerdGraph integration response : {}'.format(integration_response.text))169 170 except Exception as create_exception:171 if len(link_response['data']['cloudLinkAccount']['errors']) > 0:172 logger.warning('NerdGraph error messages : {}'.format(link_response['data']['cloudLinkAccount']['errors'])) 173 for error in link_response['data']['cloudLinkAccount']['errors']:174 if 'AWS account is already linked ' in error['message']:175 logger.warning('AWS Account {} already linked, skipping'.format(aws_account_id))176 else:177 logger.error('Exception {}'.format(create_exception))178def newrelic_get_integration(access_key, newrelic_account_id):179 nerdGraphEndPoint = os.environ['nerdGraphEndPoint']180 service_query = '''181 {{182 actor 183 {{184 account(id: {0}) 185 {{186 cloud 187 {{188 provider(slug: "aws") 189 {{190 services 191 {{192 slug193 }}194 }}195 }}196 }}197 }}198 }}199 '''.format(newrelic_account_id)200 201 try:202 response = requests.post(nerdGraphEndPoint, headers={'API-Key': access_key}, verify=True, data=service_query)203 temp_list = json.loads(response.text)['data']['actor']['account']['cloud']['provider']['services']204 newrelic_integration_list = []205 for slug in temp_list:206 newrelic_integration_list.append(slug['slug'])207 logger.info('NerdGraph AWS available integrations : {}'.format(newrelic_integration_list))208 return newrelic_integration_list209 except Exception as e:210 logger.error(e)211def newrelic_get_schema(access_key):212 nerdGraphEndPoint = os.environ['nerdGraphEndPoint']213 schema_query = '''214 {215 __schema 216 {217 types 218 {219 name220 inputFields 221 {222 name223 }224 }225 }226 }227 '''228 try:229 response = requests.post(nerdGraphEndPoint, headers={'API-Key': access_key}, verify=True, data=schema_query)230 logger.info(json.loads(response.text))231 temp_types = json.loads(response.text)['data']['__schema']['types']232 temp_integration_input = [input['inputFields'] for input in temp_types if input['name'] == 'CloudAwsIntegrationsInput'][0]233 newrelic_integration_list = [input['name'] for input in temp_integration_input]234 logger.info('NerdGraph AWS available integrations : {}'.format(newrelic_integration_list))235 return newrelic_integration_list236 except Exception as e:237 logger.error(e)238def lambda_handler(event, context):239 logger.info(json.dumps(event))240 try:241 if 'Records' in event:242 message_processing(event['Records'])243 except Exception as e:...
client.py
Source:client.py
...79 def list_stack_instances(self, StackSetName: str, NextToken: str = None, MaxResults: int = None, StackInstanceAccount: str = None, StackInstanceRegion: str = None) -> Dict:80 pass81 def list_stack_resources(self, StackName: str, NextToken: str = None) -> Dict:82 pass83 def list_stack_set_operation_results(self, StackSetName: str, OperationId: str, NextToken: str = None, MaxResults: int = None) -> Dict:84 pass85 def list_stack_set_operations(self, StackSetName: str, NextToken: str = None, MaxResults: int = None) -> Dict:86 pass87 def list_stack_sets(self, NextToken: str = None, MaxResults: int = None, Status: str = None) -> Dict:88 pass89 def list_stacks(self, NextToken: str = None, StackStatusFilter: List = None) -> Dict:90 pass91 def set_stack_policy(self, StackName: str, StackPolicyBody: str = None, StackPolicyURL: str = None):92 pass93 def signal_resource(self, StackName: str, LogicalResourceId: str, UniqueId: str, Status: str):94 pass95 def stop_stack_set_operation(self, StackSetName: str, OperationId: str) -> Dict:96 pass97 def update_stack(self, StackName: str, TemplateBody: str = None, TemplateURL: str = None, UsePreviousTemplate: bool = None, StackPolicyDuringUpdateBody: str = None, StackPolicyDuringUpdateURL: str = None, Parameters: List = None, Capabilities: List = None, ResourceTypes: List = None, RoleARN: str = None, RollbackConfiguration: Dict = None, StackPolicyBody: str = None, StackPolicyURL: str = None, NotificationARNs: List = None, Tags: List = None, ClientRequestToken: str = None) -> Dict:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!