Best Python code snippet using localstack_python
sysdig_ct_onboarding.py
Source:sysdig_ct_onboarding.py
...124 managementAccountId = context.invoked_function_arn.split(":")[4]125 regionName = context.invoked_function_arn.split(":")[3]126 127 cloudFormation_client = session.client('cloudformation')128 cloudFormation_client.describe_stack_set(StackSetName=stackSetName)129 logger.info('Stack set {} exist'.format(stackSetName))130 131 paginator = cloudFormation_client.get_paginator('list_stack_instances')132 pageIterator = paginator.paginate(StackSetName= stackSetName)133 stackSetList = []134 accountList = []135 regionList = []136 for page in pageIterator:137 if 'Summaries' in page:138 stackSetList.extend(page['Summaries'])139 for instance in stackSetList:140 accountList.append(instance['Account'])141 regionList.append(instance['Region'])142 regionList = list(set(regionList))143 accountList = list(set(accountList))144 logger.info("StackSet instances found in region(s): {}".format(regionList))145 logger.info("StackSet instances found in account(s): {}".format(accountList))146 147 try:148 if len(accountList) > 0:149 response = cloudFormation_client.delete_stack_instances(150 StackSetName=stackSetName,151 Accounts=accountList,152 Regions=regionList,153 RetainStacks=False)154 logger.info(response)155 156 status = cloudFormation_client.describe_stack_set_operation(157 StackSetName=stackSetName,158 OperationId=response['OperationId'])159 160 while status['StackSetOperation']['Status'] == 'RUNNING' and deleteWaitTime>0:161 time.sleep(deleteSleepTime)162 deleteWaitTime=deleteWaitTime-deleteSleepTime163 status = cloudFormation_client.describe_stack_set_operation(164 StackSetName=stackSetName,165 OperationId=response['OperationId'])166 logger.info("StackSet instance delete status {}".format(status))167 168 try:169 response = cloudFormation_client.delete_stack_set(StackSetName=stackSetName)170 logger.info("StackSet template delete status {}".format(response))171 except Exception as stackSetException:172 logger.warning("Problem occured while deleting, StackSet still exist : {}".format(stackSetException))173 174 except Exception as describeException:175 logger.error(describeException)176 except Exception as describeException:177 logger.error(describeException)178 return None179 180 return None #Generate random ID181def get_secret_value(secret_arn, key):182 secretClient = session.client('secretsmanager')183 try:184 secret_response = secretClient.get_secret_value(185 SecretId=secret_arn186 )187 if 'SecretString' in secret_response:188 secret = json.loads(secret_response['SecretString'])[key]189 return secret 190 191 except Exception as e:192 logger.error('Get Secret Failed: ' + str(e))193 194def generate_cft_params(keys, values):195 param_keys = ['ParameterKey', 'ParameterValue']196 param_vals = []197 cft_params = []198 for i in range(len(keys)):199 param_vals.append([keys[i], values[i]])200 for item in param_vals:201 cft_params.append(dict(zip(param_keys, item)))202 return cft_params203# This module perform the following:204# 1. Check if Sysdig stackset exist205# 2. If not exist, create new stackset206# 3. Add Log Archive account as stackset instance207def ct_stackset_handler(regionName, managementAccountId, logArchiveAccount, auditAccount, cloudTrailSNSTopic, cloudTrailKMSArn):208 try:209 stackSetName = os.environ['stackSetName']210 stackSetUrl = os.environ['stackSetUrl']211 sysdigSecureEndpoint = os.environ['sysdigSecureEndpoint']212 sysdigSecureSecretArn = os.environ['sysdigSecureSecret']213 callbackSNS = os.environ['callbackSNS']214 215 sysdigSecureSecret = get_secret_value(sysdigSecureSecretArn, 'Token')216 if not sysdigSecureSecret:217 raise Exception('Error trying to access / read secret credentials')218 219 try:220 cloudFormation_client = session.client('cloudformation')221 cloudFormation_client.describe_stack_set(StackSetName=stackSetName)222 logger.info('StackSet {} already exist'.format(stackSetName))223 helper.Data.update({"result": stackSetName})224 225 except Exception as describeException:226 logger.info('StackSet {} does not exist, creating it now.'.format(stackSetName))227 228 cloudFormation_client.create_stack_set(229 StackSetName=stackSetName,230 Description='Sysdig for Cloud - AWS Control Tower Edition',231 TemplateURL=stackSetUrl,232 Parameters=generate_cft_params(233 keys=['SysdigSecureEndpoint', 'SysdigSecureAPIToken', 'CloudBenchDeploy', 'CloudConnectorDeploy','ECRImageScanningDeploy','ECSImageScanningDeploy','LogArchiveAccount','AuditAccount','ExistentCloudTrailSNSTopic', 'CloudTrailKMS', 'CallbackSNS'],234 values=[sysdigSecureEndpoint, sysdigSecureSecret, 'No', 'Yes', 'No', 'No', logArchiveAccount, auditAccount, cloudTrailSNSTopic, cloudTrailKMSArn, callbackSNS]),235 Capabilities=[236 'CAPABILITY_NAMED_IAM'237 ],238 AdministrationRoleARN='arn:aws:iam::' + managementAccountId + ':role/service-role/AWSControlTowerStackSetRole',239 ExecutionRoleName=sysdig_ct_assume_role)240 logger.info('StackSet {} created'.format(stackSetName))241 242 try:243 #check if there are any existing stackset operations244 cloudFormation_client.describe_stack_set(StackSetName=stackSetName)245 cloudFormationPaginator = cloudFormation_client.get_paginator('list_stack_set_operations')246 stackset_iterator = cloudFormationPaginator.paginate(247 StackSetName=stackSetName248 )249 stackset_ready = True250 for page in stackset_iterator:251 if 'Summaries' in page:252 for operation in page['Summaries']:253 if operation['Status'] in ('RUNNING', 'STOPPING'):254 stackset_ready = False255 break256 if stackset_ready == False: 257 break258 #launch stackset instance to log archive account on the CT main region259 if stackset_ready:260 response = cloudFormation_client.create_stack_instances(StackSetName=stackSetName, Accounts=[logArchiveAccount], Regions=[regionName])261 logger.info("StackSet instance created {}".format(response))262 else:263 logger.error("Unable to proceed, another StackSet operations underway, run stack update to retry")264 265 except cloudFormation_client.exceptions.StackSetNotFoundException as describeException:266 logger.error("Exception getting new stack set, {}".format(describeException))267 raise describeException268 269 return True270 271 except Exception as e:272 logger.error("CloudTrail StackSet Handler error: {}".format(e))273 return False274 275# This module perform the following:276# 1. Assume role to the Audit Account277# 2. If not exist, add topic policy to AWS CT CloudTrail SNS topic278def ct_sns_topic_handler(account, topic, log_archive):279 try:280 # Assume role to Audit account281 sts_client = session.client('sts')282 partition = sts_client.get_caller_identity()['Arn'].split(":")[1]283 response = sts_client.assume_role(284 RoleArn='arn:{}:iam::{}:role/{}'.format(285 partition, account, sysdig_ct_assume_role),286 RoleSessionName=str(os.environ['stackSetName'] + '-' + account + 'CT-Integration')287 )288 289 audit_session = boto3.Session(290 aws_access_key_id=response['Credentials']['AccessKeyId'],291 aws_secret_access_key=response['Credentials']['SecretAccessKey'],292 aws_session_token=response['Credentials']['SessionToken']293 )294 295 # Locate SNS topic policy296 sns_client = audit_session.client('sns')297 sns_topic_attributes = sns_client.get_topic_attributes(TopicArn=topic)298 logger.debug(sns_topic_attributes['Attributes']['Policy'])299 sns_topic_policy = json.loads(sns_topic_attributes['Attributes']['Policy'])300 sysdig_ct_sid = next((item for item in sns_topic_policy['Statement'] if item['Sid'] == sysdig_sns_sid), False)301 302 # Modify SNS topic policy303 if sysdig_ct_sid:304 logger.info("Found SNS Topic policy: {}".format(sysdig_ct_sid))305 else:306 logger.info("Add new SNS Topic policy to {}".format(topic))307 new_sid = {308 "Sid": sysdig_sns_sid,309 "Effect": "Allow",310 "Principal": {311 "AWS": "arn:aws:iam::{}:role/{}".format(log_archive, sysdig_ct_assume_role)312 },313 "Action": "sns:Subscribe",314 "Resource": topic315 }316 logger.debug("New SID: {}".format(new_sid))317 sns_topic_policy['Statement'].append(new_sid)318 logger.info("New SNS Topic Policy: {}".format(sns_topic_policy))319 sns_response = sns_client.set_topic_attributes(320 TopicArn=topic,321 AttributeName='Policy',322 AttributeValue=json.dumps(sns_topic_policy))323 logger.info("SNS Topic updated : {}".format(topic))324 325 return True326 except Exception as e:327 logger.error("SNS Topic Handler error: {}".format(e))328 return False329# This module perform the following:330# 1. Check if AWS CT CloudTrail uses KMS encryption331# 2. Ifexist, add permission for Fargate task to decrypt using CloudTrail KMS key332def ct_cloudtrail_kms_handler(allowed_role):333 try:334 # Check StackSet param335 cloudFormation_client = session.client('cloudformation')336 ct_cloudtrail_stackset = cloudFormation_client.describe_stack_set(StackSetName=sysdig_ct_cloudtrail_stackset)337 logger.info('AWS CT CloudTrail Baseline StackSet found: {}'.format(sysdig_ct_cloudtrail_stackset))338 339 # Add KMS key policy340 ct_cloudtrail_stackset_kms = next((item for item in ct_cloudtrail_stackset['StackSet']['Parameters'] if item['ParameterKey'] == 'KMSKeyArn'), False)341 if ct_cloudtrail_stackset_kms and ct_cloudtrail_stackset_kms['ParameterValue'] != 'NONE':342 ct_cloudtrail_kms_key = ct_cloudtrail_stackset_kms['ParameterValue']343 logger.info('AWS CT CloudTrail KMS Key found: {}'.format(ct_cloudtrail_kms_key))344 345 kms_client = session.client('kms')346 kms_policy = kms_client.get_key_policy(347 KeyId=ct_cloudtrail_kms_key,348 PolicyName='default')349 logger.debug(kms_policy['Policy'])350 ct_kms_policy = json.loads(kms_policy['Policy'])351 sysdig_ct_kms_sid = next((item for item in ct_kms_policy['Statement'] if item['Sid'] == sysdig_kms_sid), False)352 353 # Modify KMS Key policy354 if sysdig_ct_kms_sid:355 logger.info("No changes made, found KMS Key policy: {}".format(sysdig_ct_kms_sid))356 logger.warning("If you redeploy the stack, you need to manually delete the old KMS policy or replace it")357 else:358 logger.info("Add new KMS Key policy to {}".format(ct_cloudtrail_kms_key))359 new_sid = {360 "Sid": sysdig_kms_sid,361 "Effect": "Allow",362 "Principal": {363 "AWS": allowed_role364 },365 "Action": "kms:Decrypt",366 "Resource": "*"367 }368 logger.debug("New SID: {}".format(new_sid))369 ct_kms_policy['Statement'].append(new_sid)370 logger.info("New KMS key Policy: {}".format(ct_kms_policy))371 372 kms_response = kms_client.put_key_policy(373 KeyId=ct_cloudtrail_kms_key,374 PolicyName='default',375 Policy=json.dumps(ct_kms_policy),376 BypassPolicyLockoutSafetyCheck=False377 )378 logger.info("KMS key policy updated : {}".format(ct_cloudtrail_kms_key))379 else:380 logger.info('AWS CT CloudTrail KMS Key not found, skipping')381 return ct_cloudtrail_kms_key382 except Exception as e:383 logger.error("CloudTrail KMS Key Handler error: {}".format(e))384 return False385# This module perform the following:386# 1. Check if AWS CT CloudTrail uses KMS encryption387def ct_cloudtrail_kms_finder():388 try:389 # Check StackSet param390 cloudFormation_client = session.client('cloudformation')391 ct_cloudtrail_stackset = cloudFormation_client.describe_stack_set(StackSetName=sysdig_ct_cloudtrail_stackset)392 logger.info('AWS CT CloudTrail Baseline StackSet found: {}'.format(sysdig_ct_cloudtrail_stackset))393 394 # Add KMS key policy395 ct_cloudtrail_stackset_kms = next((item for item in ct_cloudtrail_stackset['StackSet']['Parameters'] if item['ParameterKey'] == 'KMSKeyArn'), False)396 if ct_cloudtrail_stackset_kms and ct_cloudtrail_stackset_kms['ParameterValue'] != 'NONE':397 ct_cloudtrail_kms_key = ct_cloudtrail_stackset_kms['ParameterValue']398 logger.info('AWS CT CloudTrail KMS Key found: {}'.format(ct_cloudtrail_kms_key))399 return ct_cloudtrail_kms_key400 else:401 logger.info('AWS CT CloudTrail KMS Key not found')402 return False403 except Exception as e:404 logger.error("CloudTrail KMS Key Handler error: {}".format(e))405 return False406# This module perform the following:407# 1. Check if AWS CT CloudTrail StackSet exist408# 2. Ifexist, find the log archive and audit account id409def ct_core_account_handler(regionName):410 try:411 # Check StackSet param412 cloudFormation_client = session.client('cloudformation')413 ct_cloudtrail_stackset = cloudFormation_client.describe_stack_set(StackSetName=sysdig_ct_cloudtrail_stackset)414 logger.info('AWS CT CloudTrail Baseline StackSet found: {}'.format(sysdig_ct_cloudtrail_stackset))415 416 # Find audit and log archive account417 auditAccount = next((item['ParameterValue'] for item in ct_cloudtrail_stackset['StackSet']['Parameters'] if item['ParameterKey'] == 'SecurityAccountId'), False)418 logger.debug("Audit Account: {}".format(auditAccount))419 420 logArchiveAccount = next((item['ParameterValue'].split('-')[3] for item in ct_cloudtrail_stackset['StackSet']['Parameters'] if item['ParameterKey'] == 'AuditBucketName'), False)421 logger.debug("Log Archive Account: {}".format(logArchiveAccount))422 423 # Find CloudTrail SNS topic name424 cloudTrailSNSTopic = next((item['ParameterValue'] for item in ct_cloudtrail_stackset['StackSet']['Parameters'] if item['ParameterKey'] == 'AllConfigTopicName'), False)425 cloudTrailSNSTopic = 'arn:aws:sns:' + regionName + ':' + auditAccount + ':' + cloudTrailSNSTopic426 427 logger.info("Audit & Log Archive account found: {} {}".format(auditAccount, logArchiveAccount))...
cloudformation.py
Source:cloudformation.py
...20 response = self.client.delete_stack_set(21 StackSetName=stack_set_name22 )23 return response24 def describe_stack_set(self, stack_set_name):25 stack_set_info = self.client.describe_stack_set(26 StackSetName=stack_set_name27 )["StackSet"]28 return stack_set_info29 def list_stack_sets(self, status):30 summaries = self.client.list_stack_sets(31 Status=status32 )["Summaries"]33 return summaries34 def update_stack_set(self, stack_set_name, description, tags, parameters=None, capabilities=None, region_order=None):35 if parameters is None:36 parameters = []37 if capabilities is None:38 capabilities = []39 if region_order is None:...
aws_stack_set_snapshot.py
Source:aws_stack_set_snapshot.py
...55 return {"Instances": instances}56def list_operations(cfn, stack_set):57 operations = cfn.list_stack_set_operations(StackSetName=stack_set["StackSetId"])["Summaries"]58 return {"Operations": operations}59def describe_stack_set(cfn, stack_set):60 description = cfn.describe_stack_set(StackSetName=stack_set["StackSetId"])["StackSet"]61 return description62def dump_snapshot_to_json(snapshot):63 class DateEncoder(json.JSONEncoder):64 65 def default(self, obj):66 if isinstance(obj, datetime.datetime):67 return obj.isoformat()68 return json.JSONEncoder.default(self, obj)69 json.dump(snapshot, sys.stdout, cls=DateEncoder)70if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!