Best Python code snippet using localstack_python
transit-gateway.py
Source:transit-gateway.py
...151 DryRun=False152 )153 return response154#Delete transit gateway155def delete_transit_gateway(transit_gateway_id):156 client = boto3.client('ec2', region_name='us-east-1')157 response = client.delete_transit_gateway(158 TransitGatewayId=transit_gateway_id,159 DryRun=False160 )161 return response162# dissacotiate transit gateway multicast domain163def disassociate_transit_gateway_multicast_domain(tgw_multicast_domain_id, tgw_attachment_id, subnet_id):164 client = boto3.client('ec2', region_name='us-east-1')165 response = client.disassociate_transit_gateway_multicast_domain(166 TransitGatewayMulticastDomainId=tgw_multicast_domain_id,167 TransitGatewayAttachmentId=tgw_attachment_id,168 SubnetIds=[169 subnet_id,170 ],171 DryRun=False172 )173 return response174#register transit gateway multi-cast members175def register_transit_gateway_multicast_group_members(tgw_multicast_domain_id, group_address, network_interface3, network_interface4):176 client = boto3.client('ec2', region_name='us-east-1')177 response = client.register_transit_gateway_multicast_group_members(178 TransitGatewayMulticastDomainId = tgw_multicast_domain_id,179 GroupIpAddress = group_address,180 NetworkInterfaceIds = [181 network_interface3, network_interface4,182 ],183 DryRun = False184 )185 return response186#register transit gateway multi cast group sources187def register_transit_gateway_multicast_group_sources(tgw_multicast_domain_id, nic1id, group_address):188 client = boto3.client('ec2', region_name='us-east-1')189 response = client.register_transit_gateway_multicast_group_sources(190 TransitGatewayMulticastDomainId=tgw_multicast_domain_id,191 GroupIpAddress=group_address,192 NetworkInterfaceIds=[193 nic1id,194 ],195 DryRun=False196 )197 return response198#Delete transit gateway route table199def delete_transit_gateway_route_table(rtb_id):200 client = boto3.client('ec2', region_name='us-east-1')201 response = client.delete_transit_gateway_route_table(202 TransitGatewayRouteTableId=rtb_id,203 DryRun=False204 )205 return response206if __name__ == '__main__':207 action = sys.argv[1]208 if action == "apply":209 #Get values from python input when calling python and assign to variables using sys.argv210 vpc_id = sys.argv[2]211 subnet_id = sys.argv[3]212 processor1nic = sys.argv[4]213 processor2nic = sys.argv[5]214 processor3nic = sys.argv[6]215 #Create transit gateway216 tgw_response = create_transit_gateway()217 #Collect transit gateway id from response218 tgw_id = tgw_response['TransitGateway']['TransitGatewayId']219 time.sleep(120)220 #Create transit gateway attachment221 tgw_attachment_response = create_transit_gateway_vpc_attachment(tgw_id, vpc_id, subnet_id)222 #get tgw attachment id response223 tgw_attachment_id = tgw_attachment_response['TransitGatewayVpcAttachment']['TransitGatewayAttachmentId']224 time.sleep(90)225 #Create transit gateway route table and get response226 tgw_rt_response = create_transit_gateway_rt(tgw_id)227 #Get transit gateway route table id from the response228 tgw_rt_id = tgw_rt_response ['TransitGatewayRouteTable']['TransitGatewayRouteTableId']229 time.sleep(90)230 #associate transit gateway route table231 associate_transit_gateway_route_table(tgw_rt_id,tgw_attachment_id)232 time.sleep(60)233 #enable transit gateway route table propogation234 enable_transit_gateway_route_table_propagation(tgw_rt_id, tgw_attachment_id)235 time.sleep(30)236 #Create transit gateway route237 create_transit_gateway_route(tgw_rt_id, "10.123.112.0/24", tgw_attachment_id)238 time.sleep(60)239 #Create transit gateway multi cast domain240 tgw_multicast_domain_response = create_transit_gateway_multicast_domain(tgw_id)241 #Get transit gateway multi cast domain id242 tgw_multicast_domain_id = tgw_multicast_domain_response['TransitGatewayMulticastDomain']['TransitGatewayMulticastDomainId']243 time.sleep(90)244 #Associate transit gateway multi-cast domain245 associate_transit_gateway_multicast_domain(tgw_multicast_domain_id, tgw_attachment_id, subnet_id)246 time.sleep(60)247 #register transit gateway multicast group members 1248 register_transit_gateway_multicast_group_members(tgw_multicast_domain_id, "224.3.2.2", processor2nic, processor3nic)249 time.sleep(60)250 #register transit gateway multicast group members 2251 register_transit_gateway_multicast_group_members(tgw_multicast_domain_id, "224.3.2.3", processor2nic, processor3nic)252 time.sleep(60)253 #register transit gateway multicast group sources 1254 register_transit_gateway_multicast_group_sources(tgw_multicast_domain_id,processor1nic, "224.3.2.3")255 # register transit gateway multicast group sources 2256 register_transit_gateway_multicast_group_sources(tgw_multicast_domain_id, processor1nic, "224.3.2.2")257 if action == "destroy":258 #Get values from python input when calling python and assign to variables using sys.argv259 transit_gateway_multicast_domain_id = sys.argv[2]260 transit_gateway_attachment_id = sys.argv[3]261 transit_gateway_id = sys.argv[4]262 subnet_id = sys.argv[5]263 rtb_id = sys.argv[6]264 #Dissacotiate transit gate way multicast domain with subnet265 disassociate_transit_gateway_multicast_domain(transit_gateway_multicast_domain_id, transit_gateway_attachment_id, subnet_id)266 time.sleep(120)267 #Delete transit gateway multicast domain268 delete_transit_gateway_multicast_domain(transit_gateway_multicast_domain_id)269 time.sleep(120)270 #Delete transit gateway VPC attachment271 delete_transit_gateway_vpc_attachment(transit_gateway_attachment_id)272 time.sleep(120)273 #Delete transit gateway274 delete_transit_gateway(transit_gateway_id)275 time.sleep(30)276 #Delete transit gateway route table...
test_transit_gateway.py
Source:test_transit_gateway.py
...25 states=[AttachmentState.AVAILABLE],26 )27 assert len(peers) == 028 # clean-up29 tgw.ec2_client.delete_transit_gateway(30 TransitGatewayId=sample_tgw1["TransitGateway"]["TransitGatewayId"]31 )32 def test__fail__client_error(self):33 """fail with client error"""34 tgw = TGWPeering()35 stubber = Stubber(tgw.ec2_client)36 stubber.add_client_error(37 "describe_transit_gateway_peering_attachments",38 service_error_code="InternalException",39 service_message="this is test error",40 )41 stubber.activate()42 peers = tgw.get_tgw_peers(43 tgw_id="",44 states=[AttachmentState.AVAILABLE],45 )46 assert len(peers) == 047 stubber.deactivate()48@pytest.mark.BDD49@mock_ec250class TestCreatePeeringAttachment:51 """BDD test class for TGW create peering attachment methods"""52 owner = "123456789012"53 def test__success(self):54 """success"""55 tgw = TGWPeering()56 # mock setup57 sample_tgw1 = tgw.ec2_client.create_transit_gateway()58 sample_tgw2 = tgw.ec2_client.create_transit_gateway()59 sample_peer = TGWPeer(60 transit_gateway=sample_tgw2["TransitGateway"]["TransitGatewayId"],61 aws_region=environ.get("AWS_DEFAULT_REGION"),62 )63 attachment = tgw.create_tgw_peering_attachment(64 tgw_id=sample_tgw1["TransitGateway"]["TransitGatewayId"],65 peer=sample_peer,66 peer_account_id=self.owner,67 )68 sample_peer.attachment_id = attachment["TransitGatewayAttachmentId"]69 # test attachment is tagged with solution identifier70 assert attachment["Tags"][0]["Key"] == "SolutionId"71 assert attachment["Tags"][0]["Value"] == environ.get("SOLUTION_ID")72 peers = tgw.get_tgw_peers(73 tgw_id=sample_tgw1["TransitGateway"]["TransitGatewayId"],74 states=[AttachmentState.AVAILABLE],75 )76 assert isinstance(peers[0], TGWPeer)77 assert peers[0] == sample_peer78 # clean-up79 tgw.ec2_client.delete_transit_gateway_peering_attachment(80 TransitGatewayAttachmentId=attachment["TransitGatewayAttachmentId"]81 )82 tgw.ec2_client.delete_transit_gateway(83 TransitGatewayId=sample_tgw1["TransitGateway"]["TransitGatewayId"]84 )85 tgw.ec2_client.delete_transit_gateway(86 TransitGatewayId=sample_tgw2["TransitGateway"]["TransitGatewayId"]87 )88 def test__fail__client_error(self):89 """fail with client error"""90 tgw = TGWPeering()91 stubber = Stubber(tgw.ec2_client)92 _err_code = "InternalException"93 _message = "test error"94 stubber.add_client_error(95 "create_transit_gateway_peering_attachment",96 service_error_code=_err_code,97 service_message=_message,98 )99 stubber.activate()...
transit_gateway_tasks.py
Source:transit_gateway_tasks.py
...74 elif task:75 task.status = FAILED76 doosradb.session.commit()77@celery.task(name="delete_transit_gateway", bind=True)78def task_delete_transit_gateway(self, gateway_id):79 transit_gateway = TransitGateway.query.get(gateway_id)80 task = (81 doosradb.session.query(IBMTask)82 .filter_by(id=str(self.request.id).strip())83 .first()84 )85 if task and delete_transit_gateway(transit_gateway):86 task.status = SUCCESS87 elif task:88 task.status = FAILED89 doosradb.session.commit()90@celery.task(name="delete_transit_gateway_connection", bind=True)91def task_delete_transit_gateway_connection(self, connection_id):92 transit_gateway_connection = TransitGatewayConnection.query.get(connection_id)93 task = (94 doosradb.session.query(IBMTask)95 .filter_by(id=str(self.request.id).strip())96 .first()97 )98 if task and delete_transit_gateway_connection(transit_gateway_connection):99 task.status = SUCCESS...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!