How to use delete_event_bus method in localstack

Best Python code snippet using localstack_python

svceventbridge.py

Source:svceventbridge.py Github

copy

Full Screen

...27 )28 return response['EventBusArn']29 except botocore.exceptions.ClientError as e:30 raise RdqError(self._utils.fail(e, op, 'EventBusName', eventBusName))31 def delete_event_bus(self, eventBusName):32 op = 'delete_event_bus'33 try:34 self._client.delete_event_bus(35 Name=eventBusName36 )37 return True38 except botocore.exceptions.ClientError as e:39 if self._utils.is_resource_not_found(e): return False40 raise RdqError(self._utils.fail(e, op, 'EventBusName', eventBusName))41 42 def put_permission_conditional(self, eventBusName, sid, action, condition):43 op = 'put_permission'44 try:45 self._client.put_permission(46 EventBusName=eventBusName,47 Principal = '*',48 Action = action,49 StatementId = sid,50 Condition = condition51 )52 except botocore.exceptions.ClientError as e:53 raise RdqError(self._utils.fail(e, op, 'EventBusName', eventBusName, "Sid", sid))54 def put_permission_principal(self, eventBusName, sid, action, principal):55 op = 'put_permission'56 try:57 self._client.put_permission(58 EventBusName=eventBusName,59 Principal = principal,60 Action = action,61 StatementId = sid62 )63 except botocore.exceptions.ClientError as e:64 raise RdqError(self._utils.fail(e, op, 'EventBusName', eventBusName, "Sid", sid))65 def list_rules(self, eventBusName):66 op = "list_rules"67 try:68 paginator = self._client.get_paginator(op)69 page_iterator = paginator.paginate(EventBusName=eventBusName)70 rules = []71 for page in page_iterator:72 items = page["Rules"]73 for item in items:74 rules.append(item)75 return rules76 except botocore.exceptions.ClientError as e:77 if self._utils.is_resource_not_found(e): return []78 raise RdqError(self._utils.fail(e, op, 'EventBusName', eventBusName))79 def describe_rule(self, eventBusName, ruleName):80 op = 'describe_rule'81 try:82 response = self._client.describe_rule(83 EventBusName=eventBusName,84 Name=ruleName85 )86 return response87 except botocore.exceptions.ClientError as e:88 if self._utils.is_resource_not_found(e): return None89 raise RdqError(self._utils.fail(e, op, 'EventBusName', eventBusName, 'RuleName', ruleName))90 # Allow events:PutRule91 def put_rule_arn(self, eventBusName, ruleName, rq, tags):92 op = 'put_rule'93 try:94 response = self._client.put_rule(95 EventBusName=eventBusName,96 Name=ruleName,97 Description=rq['Description'],98 EventPattern=rq['EventPattern'],99 Tags=tags.toList()100 )101 return response['RuleArn']102 except botocore.exceptions.ClientError as e:103 raise RdqError(self._utils.fail(e, op, 'EventBusName', eventBusName, 'RuleName', ruleName))104 def delete_rule(self, eventBusName, ruleName):105 op = 'delete_rule'106 try:107 self._client.delete_rule(108 EventBusName=eventBusName,109 Name=ruleName110 )111 return True112 except botocore.exceptions.ClientError as e:113 if self._utils.is_resource_not_found(e): return False114 raise RdqError(self._utils.fail(e, op, 'EventBusName', eventBusName, 'RuleName', ruleName))115 def list_targets_by_rule(self, eventBusName, ruleName):116 op = "list_targets_by_rule"117 try:118 paginator = self._client.get_paginator(op)119 page_iterator = paginator.paginate(EventBusName=eventBusName, Rule=ruleName)120 targets = []121 for page in page_iterator:122 items = page["Targets"]123 for item in items:124 targets.append(item)125 return targets126 except botocore.exceptions.ClientError as e:127 if self._utils.is_resource_not_found(e): return []128 raise RdqError(self._utils.fail(e, op, 'EventBusName', eventBusName, 'RuleName', ruleName))129 def list_target_ids(self, eventBusName, ruleName):130 exTargets = self.list_targets_by_rule(eventBusName, ruleName)131 targetIds = []132 for target in exTargets:133 targetIds.append(target['Id'])134 return targetIds135 def find_target(self, eventBusName, ruleName, targetId):136 exTargets = self.list_targets_by_rule(eventBusName, ruleName)137 for target in exTargets:138 if target['Id'] == targetId:139 return target140 return None141 def put_targets(self, eventBusName, ruleName, targets):142 op = 'put_targets'143 if len(targets) == 0: return144 try:145 self._client.put_targets(146 EventBusName=eventBusName,147 Rule=ruleName,148 Targets=targets149 )150 except botocore.exceptions.ClientError as e:151 raise RdqError(self._utils.fail(e, op, 'EventBusName', eventBusName, 'RuleName', ruleName))152 def remove_targets(self, eventBusName, ruleName, targetIds):153 op = 'remove_targets'154 if len(targetIds) == 0: return155 try:156 self._client.remove_targets(157 EventBusName=eventBusName,158 Rule=ruleName,159 Ids=targetIds160 )161 except botocore.exceptions.ClientError as e:162 raise RdqError(self._utils.fail(e, op, 'EventBusName', eventBusName, 'RuleName', ruleName))163 def declareEventBusArn(self, eventBusName, tags):164 exEventBus = self.describe_event_bus(eventBusName)165 if not exEventBus:166 return self.create_event_bus_arn(eventBusName, tags)167 return exEventBus['Arn']168 def declareEventBusRuleArn(self, eventBusName, ruleName, ruleDescription, eventPatternMap, tags):169 db = DeltaBuild()170 db.putRequired('Description', ruleDescription)171 db.putRequiredJson('EventPattern', eventPatternMap)172 rq = db.required()173 exRule = self.describe_rule(eventBusName, ruleName)174 if exRule:175 exArn = exRule['Arn']176 db.loadExisting(exRule)177 db.normaliseExistingJson('EventPattern')178 delta = db.delta()179 if delta:180 exArn = self.put_rule_arn(eventBusName, ruleName, rq, tags)181 return exArn182 def declareEventBusTarget(self, eventBusName, ruleName, targetId, targetArn, maxAgeSeconds):183 db = DeltaBuild()184 db.putRequired('Id', targetId)185 db.putRequired('Arn', targetArn)186 db.putRequired('RetryPolicy.MaximumEventAgeInSeconds', maxAgeSeconds)187 rq = db.required()188 exTarget = self.find_target(eventBusName, ruleName, targetId)189 if exTarget:190 db.loadExisting(exTarget)191 delta = db.delta()192 if delta:193 self.put_targets(eventBusName, ruleName, [rq])194 return delta195 def declareEventBusPublishPermissionForAccount(self, eventBusName, accountId):196 sid = "pub-{}".format(accountId)197 action = 'events:PutEvents'198 self.put_permission_principal(eventBusName, sid, action, accountId)199 def declareEventBusPublishPermissionForOrganization(self, eventBusName, organizationId):200 sid = "pub-{}".format(organizationId)201 action = 'events:PutEvents'202 condition = {203 'Type': 'StringEquals',204 'Key': 'aws:PrincipalOrgID',205 'Value': organizationId206 }207 self.put_permission_conditional(eventBusName, sid, action, condition)208 def removeEventBus(self, eventBusName):209 rules = self.list_rules(eventBusName)210 for rule in rules:211 ruleName = rule['Name']212 targetIds = self.list_target_ids(eventBusName, ruleName)213 self.remove_targets(eventBusName, ruleName, targetIds)214 self.delete_rule(eventBusName, ruleName)...

Full Screen

Full Screen

test_cloudwatch_events.py

Source:test_cloudwatch_events.py Github

copy

Full Screen

...19 """success"""20 cwe = CloudWatchEvents()21 cwe.cwe_client.create_event_bus(Name=self.event_bus_name)22 cwe.put_permission(self.correct_principal, self.event_bus_name)23 cwe.cwe_client.delete_event_bus(Name=self.event_bus_name) # clean-up24 def test__fail__resource_not_found(self):25 """fail with ResourceNotFoundException"""26 cwe = CloudWatchEvents()27 with pytest.raises(cwe.cwe_client.exceptions.ResourceNotFoundException):28 cwe.put_permission(self.correct_principal, self.event_bus_name)29 def test__fail__client_error(self):30 """fail with InvalidParameterValue"""31 cwe = CloudWatchEvents()32 cwe.cwe_client.create_event_bus(Name=self.event_bus_name)33 with pytest.raises(ClientError) as err:34 cwe.put_permission(self.wrong_principal, self.event_bus_name)35 assert err.value.response["Error"]["Code"] == "InvalidParameterValue"36 cwe.cwe_client.delete_event_bus(Name=self.event_bus_name) # clean-up37@pytest.mark.TDD38@mock_events39class TestClassRemovePermission:40 """TDD test class for CloudWatch Events remove permission calls"""41 correct_principal = "123456789012"42 statement_id = "my-statement-01"43 event_bus_name = "my-event-bus"44 def test__success(self):45 """success"""46 cwe = CloudWatchEvents()47 cwe.cwe_client.create_event_bus(Name=self.event_bus_name) # mock setup48 cwe.put_permission(self.correct_principal, self.event_bus_name)49 cwe.remove_permission(self.statement_id, self.event_bus_name)50 cwe.cwe_client.delete_event_bus(Name=self.event_bus_name) # clean-up51 def test__fail__resource_not_found_eventbus(self):52 """fail with ResourceNotFoundException"""53 cwe = CloudWatchEvents()54 assert (55 cwe.remove_permission(self.statement_id, self.event_bus_name)56 is None57 )58 def test__fail__resource_not_found_policy(self):59 """fail with ResourceNotFoundException"""60 cwe = CloudWatchEvents()61 cwe.cwe_client.create_event_bus(Name=self.event_bus_name)62 assert (63 cwe.remove_permission("my-statement-02", self.event_bus_name)64 is None65 )66 cwe.cwe_client.delete_event_bus(Name=self.event_bus_name) # clean-up67 def test__fail__client_error(self):68 """fail with ClientError"""69 cwe = CloudWatchEvents()70 stubber = Stubber(cwe.cwe_client)71 stubber.add_client_error(72 "remove_permission",73 service_error_code="InternalException",74 service_message="this is test error",75 )76 stubber.activate()77 with pytest.raises(ClientError):78 cwe.remove_permission(self.statement_id, self.event_bus_name)79 stubber.deactivate()80@pytest.mark.TDD81@mock_events82class TestClassDescribe:83 """TDD test class for CloudWatch Events describe calls"""84 event_bus_name = "my-event-bus"85 def test__success(self):86 """success"""87 cwe = CloudWatchEvents()88 cwe.cwe_client.create_event_bus(Name=self.event_bus_name)89 resp = cwe.describe_event_bus(event_bus_name=self.event_bus_name)90 assert resp["Name"] == self.event_bus_name91 _region = os.environ.get("AWS_DEFAULT_REGION")92 assert re.match(93 fr"(arn:aws:events:{_region}:)\d{{12}}:(event-bus/{self.event_bus_name})",94 resp["Arn"],95 )96 assert resp["ResponseMetadata"]["HTTPStatusCode"] == 20097 cwe.cwe_client.delete_event_bus(Name=self.event_bus_name) # clean-up98 def test__fail__resource_not_found(self):99 """fail with ResourceNotFoundException"""100 cwe = CloudWatchEvents()101 with pytest.raises(cwe.cwe_client.exceptions.ResourceNotFoundException):...

Full Screen

Full Screen

rmEvtBridge.py

Source:rmEvtBridge.py Github

copy

Full Screen

...17 print(f"Targets removed for Rule: {name}")18 19 client.delete_rule(Name=name, EventBusName=event_bus_name, Force=True)20 print(f"Deleted Rule: {name}")21 client.delete_event_bus(Name=event_bus_name)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful