Best Python code snippet using localstack_python
ScheduleMaintenanceWindow.py
Source:ScheduleMaintenanceWindow.py
...70 try:71 72 client = boto3.client('ssm')73 74 opsItem = client.get_ops_item(75 OpsItemId=opsItemId76 ) 77 78 payload = {}79 payload['stackId'] = opsItem['OpsItem']['OperationalData']['cloudformation:stack-id']['Value']80 payload['windowId'] = windowId81 payload['stage'] = "1"82 jsonStr = json.dumps(payload)83 accountId = payload['stackId'].split(":")[4]84 region = payload['stackId'].split(":")[3]85 86 ssm_doc = client.get_document(87 Name='ScheduleMaintenanceWindow'88 )89 90 response = client.register_task_with_maintenance_window(91 WindowId=windowId,92 Targets=[93 {94 'Key': 'InstanceIds',95 'Values': [96 'i-00000000000000000',97 ]98 },99 ],100 TaskArn='arn:aws:lambda:' + region + ':' + accountId + ':function:ManageUpdateProcess',101 ServiceRoleArn=json.loads(ssm_doc['Content'])['assumeRole'],102 TaskType='LAMBDA',103 TaskInvocationParameters={104 'Lambda': {105 'Payload': jsonStr.encode('utf-8')106 }107 },108 MaxConcurrency='1',109 MaxErrors='1',110 Name='Update-Stack'111 112 )113 114 print(response)115 116 except Exception as error:117 print(error)118def updateOpsItems(windowId, opsItemId, event):119 120 try:121 122 client = boto3.client('ssm')123 relatedOpsItems = getRelatedOpsItems(opsItemId)124 opsData = {}125 opsData['scheduledMaintenanceWindowDetails'] = {}126 opsData['scheduledMaintenanceWindowDetails']['Value'] = 'WindowId=' + windowId127 opsData['scheduledMaintenanceWindowDetails']['Value'] += '\nCronExpression=' + event['cronExpression']128 opsData['scheduledMaintenanceWindowDetails']['Value'] += '\nDuration=' + event['duration']129 opsData['scheduledMaintenanceWindowDetails']['Type'] = 'String'130 131 response = {}132 for opsItem in relatedOpsItems:133 response = client.update_ops_item(134 Status='InProgress',135 OperationalData=opsData,136 OpsItemId=opsItem['OpsItemId']137 )138 139 print(response)140 except Exception as error:141 print(error)142def getRelatedOpsItems(opsItemId):143 144 try:145 146 client = boto3.client('ssm')147 148 opsItem = client.get_ops_item(149 OpsItemId=opsItemId150 )151 152 stackId = opsItem['OpsItem']['OperationalData']['cloudformation:stack-id']['Value']153 stackName = opsItem['OpsItem']['OperationalData']['cloudformation:stack-name']['Value']154 155 OpsItemFilters=[156 {157 'Key': 'OperationalData',158 'Values': [159 "{\"key\":\"cloudformation:stack-name\",\"value\":\"" + stackName + "\"}",160 ],161 'Operator': 'Equal'162 },163 {164 'Key': 'Status',165 'Values': [166 "Open",167 "InProgress"168 ],169 'Operator': 'Equal'170 }171 ]172 173 relatedOpsItems = getOpsItems(OpsItemFilters)174 175 return relatedOpsItems176 177 except Exception as error:178 print(error)179def getOpsItems(filter):180 181 try:182 183 client = boto3.client('ssm')184 185 response = client.describe_ops_items(186 OpsItemFilters=filter187 )188 189 opItemIds = []190 for opsItem in response['OpsItemSummaries']:191 opItemIds.append({'OpsItemId': opsItem['OpsItemId']})192 193 return opItemIds194 195 except Exception as error:196 print(error)197 198def updateMaintenanceWindowTags(windowId, opsItemId):199 200 try:201 202 client = boto3.client('ssm')203 opsItem = client.get_ops_item(204 OpsItemId=opsItemId205 )206 207 stackId = opsItem['OpsItem']['OperationalData']['cloudformation:stack-id']['Value']208 stackName = opsItem['OpsItem']['OperationalData']['cloudformation:stack-name']['Value']209 response = client.add_tags_to_resource(210 ResourceType='MaintenanceWindow',211 ResourceId=windowId,212 Tags=[213 {214 'Key': 'cloudformation:stack-name',215 'Value': stackName 216 },217 {218 'Key': 'LastModifiedDate',219 'Value': str(datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"))220 }221 ]222 )223 224 print(response)225 except Exception as error:226 print(error)227def updateMaintenanceWindow(windowId, event):228 229 try:230 231 client = boto3.client("ssm")232 opsItem = client.get_ops_item(233 OpsItemId=event['opsItemId']234 )235 236 windowName = "mw-" + opsItem['OpsItem']['OperationalData']['cloudformation:stack-name']['Value']237 238 response = client.update_maintenance_window(239 WindowId=windowId,240 Name=windowName,241 Schedule="at(" + event['cronExpression'] + ")",242 ScheduleTimezone=event['timezone'],243 Duration=int(event['duration']),244 Cutoff=1,245 AllowUnassociatedTargets=True,246 Enabled=True,247 Replace=True248 )249 250 print(response)251 252 except Exception as error:253 print(error)254 255def createMaintenanceWindow(event):256 257 try:258 259 client = boto3.client("ssm")260 opsItem = client.get_ops_item(261 OpsItemId=event['opsItemId']262 )263 264 windowName = "mw-" + opsItem['OpsItem']['OperationalData']['cloudformation:stack-name']['Value']265 266 response = client.create_maintenance_window(267 Name=windowName,268 Schedule="at(" + event['cronExpression'] + ")",269 ScheduleTimezone=event['timezone'],270 Duration=int(event['duration']),271 Cutoff=1,272 AllowUnassociatedTargets=True273 )274 275 print(response)276 277 return response['WindowId']278 279 except Exception as error:280 print("Exception encountered while trying to create a maintenance window.")281 print(error)282 return False283 284#SSM Functions285def getOpsItem(OpsItemId, **kwargs):286 region = os.environ['AWS_REGION']287 if 'region' in kwargs:288 region = kwargs['region']289 session = boto3.session.Session()290 ssm_client = session.client('ssm', region)291 292 try:293 response = ssm_client.get_ops_item(294 OpsItemId=OpsItemId295 )296 return response297 except Exception as error:298 print("Exception encountered while getting opsItem [" + OpsItemId + "]: " + str(error))299 return False300def findActiveMaintenanceWindow(name, **kwargs):301 region = os.environ['AWS_REGION']302 if 'region' in kwargs:303 region = kwargs['region']304 session = boto3.session.Session()305 ssm_client = session.client('ssm', region)306 307 try:...
test_ssm.py
Source:test_ssm.py
...30 'type': 'post-item'}]},31 session_factory=factory, config={'region': 'us-east-1'})32 resources = p.run()33 client = factory().client('ssm', region_name='us-east-1')34 item = client.get_ops_item(35 OpsItemId=resources[0]['c7n:opsitem']).get('OpsItem')36 arn = p.resource_manager.get_arns(resources)[0]37 self.assertTrue(38 arn in item['OperationalData']['/aws/resources']['Value'])39 self.assertTrue(item['OperationalData']['/aws/dedup'])40 self.assertEqual(item['Title'], p.name)41 self.assertEqual(item['Description'], p.data['description'])42 def test_ops_item_filter(self):43 factory = self.replay_flight_data('test_ops_item_filter')44 p = self.load_policy({45 'name': 'checking-lambdas',46 'description': 'something good',47 'resource': 'aws.lambda',48 'source': 'config',49 'query': [50 {'clause': "resourceId = 'custodian-aws'"}],51 'filters': [{52 'type': 'ops-item',53 'priority': [3, 4, 5],54 'title': 'checking-lambdas',55 'source': 'Cloud Custodian',56 }]},57 session_factory=factory)58 resources = p.run()59 self.assertEqual(len(resources), 1)60 self.assertEqual(61 resources[0]['c7n:opsitems'],62 ['oi-9be57440dcb3'])63 def test_post_ops_item_update(self):64 factory = self.replay_flight_data('test_post_ops_item_update')65 p = self.load_policy({66 'name': 'checking-lambdas',67 'description': 'something good',68 'resource': 'aws.lambda',69 'source': 'config',70 'query': [71 {'clause': "resourceId = 'custodian-nuke-emr'"}],72 'actions': [{73 'type': 'post-item'}]},74 session_factory=factory)75 resources = p.run()76 self.assertEqual(len(resources), 1)77 client = factory().client('ssm', region_name='us-east-1')78 item = client.get_ops_item(79 OpsItemId=resources[0]['c7n:opsitem']).get('OpsItem')80 self.assertEqual(81 json.loads(item['OperationalData']['/aws/resources']['Value']),82 [{'arn': 'arn:aws:lambda:us-east-1::function:custodian-aws'},83 {'arn': 'arn:aws:lambda:us-east-1::function:custodian-nuke-emr'}])84 def test_update_ops_item(self):85 factory = self.replay_flight_data('test_update_ops_item')86 p = self.load_policy({87 'name': 'checking-lambdas',88 'description': 'something good',89 'resource': 'aws.ops-item',90 'query': [91 {'Key': 'Status', 'Operator': 'Equal', 'Values': ['Open']}92 ],93 'actions': [{94 'type': 'update',95 'topics': ['arn:aws:sns:us-west-2:644160558196:aws-command'],96 'status': 'Resolved',97 }]},98 config={'region': 'us-west-2'},99 session_factory=factory)100 resources = p.run()101 self.assertEqual(len(resources), 1)102 client = factory().client('ssm', region_name='us-west-2')103 if self.recording:104 time.sleep(5)105 item = client.get_ops_item(106 OpsItemId=resources[0]['OpsItemId'])['OpsItem']107 self.assertEqual(item['Status'], 'Resolved')108 self.assertEqual(109 item['Notifications'],110 [{'Arn': 'arn:aws:sns:us-west-2:644160558196:aws-command'}])111 def test_invalid_resource_query(self):112 self.assertRaises(113 PolicyValidationError, self.load_policy,114 {'name': 'value',115 'resource': 'aws.ops-item',116 'query': [117 {'Key': 'Status', 'Operator': 'Equals', 'Values': ['Open']}]},118 validate=True)119 def test_get_resources(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!