Best Python code snippet using localstack_python
test_vpc_peering_connection.py
Source:test_vpc_peering_connection.py
1# Copyright (c) 2014 Skytap http://skytap.com/2#3# Permission is hereby granted, free of charge, to any person obtaining a4# copy of this software and associated documentation files (the5# "Software"), to deal in the Software without restriction, including6# without limitation the rights to use, copy, modify, merge, publish, dis-7# tribute, sublicense, and/or sell copies of the Software, and to permit8# persons to whom the Software is furnished to do so, subject to the fol-9# lowing conditions:10#11# The above copyright notice and this permission notice shall be included12# in all copies or substantial portions of the Software.13#14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS15# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-16# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT17# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,18# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS20# IN THE SOFTWARE.21from tests.unit import mock, unittest22from tests.unit import AWSMockServiceTestCase23from boto.vpc import VpcPeeringConnection, VPCConnection, Subnet24class TestDescribeVpcPeeringConnections(AWSMockServiceTestCase):25 DESCRIBE_VPC_PEERING_CONNECTIONS= b"""<?xml version="1.0" encoding="UTF-8"?>26<DescribeVpcPeeringConnectionsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">27 <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>28 <vpcPeeringConnectionSet>29 <item>30 <vpcPeeringConnectionId>pcx-111aaa22</vpcPeeringConnectionId>31 <requesterVpcInfo>32 <ownerId>777788889999</ownerId>33 <vpcId>vpc-1a2b3c4d</vpcId>34 <cidrBlock>172.31.0.0/16</cidrBlock>35 </requesterVpcInfo>36 <accepterVpcInfo>37 <ownerId>111122223333</ownerId>38 <vpcId>vpc-aa22cc33</vpcId>39 </accepterVpcInfo>40 <status>41 <code>pending-acceptance</code>42 <message>Pending Acceptance by 111122223333</message>43 </status>44 <expirationTime>2014-02-17T16:00:50.000Z</expirationTime>45 </item>46 <item>47 <vpcPeeringConnectionId>pcx-444bbb88</vpcPeeringConnectionId>48 <requesterVpcInfo>49 <ownerId>1237897234</ownerId>50 <vpcId>vpc-2398abcd</vpcId>51 <cidrBlock>172.30.0.0/16</cidrBlock>52 </requesterVpcInfo>53 <accepterVpcInfo>54 <ownerId>98654313</ownerId>55 <vpcId>vpc-0983bcda</vpcId>56 </accepterVpcInfo>57 <status>58 <code>pending-acceptance</code>59 <message>Pending Acceptance by 98654313</message>60 </status>61 <expirationTime>2015-02-17T16:00:50.000Z</expirationTime>62 </item>63 </vpcPeeringConnectionSet>64</DescribeVpcPeeringConnectionsResponse>"""65 66 connection_class = VPCConnection67 def default_body(self):68 return self.DESCRIBE_VPC_PEERING_CONNECTIONS69 def test_get_vpc_peering_connections(self):70 self.set_http_response(status_code=200)71 api_response = self.service_connection.get_all_vpc_peering_connections(72 ['pcx-111aaa22', 'pcx-444bbb88'], filters=[('status-code', ['pending-acceptance'])])73 self.assertEqual(len(api_response), 2)74 for vpc_peering_connection in api_response:75 if vpc_peering_connection.id == 'pcx-111aaa22':76 self.assertEqual(vpc_peering_connection.id, 'pcx-111aaa22')77 self.assertEqual(vpc_peering_connection.status_code, 'pending-acceptance')78 self.assertEqual(vpc_peering_connection.status_message, 'Pending Acceptance by 111122223333')79 self.assertEqual(vpc_peering_connection.requester_vpc_info.owner_id, '777788889999')80 self.assertEqual(vpc_peering_connection.requester_vpc_info.vpc_id, 'vpc-1a2b3c4d')81 self.assertEqual(vpc_peering_connection.requester_vpc_info.cidr_block, '172.31.0.0/16')82 self.assertEqual(vpc_peering_connection.accepter_vpc_info.owner_id, '111122223333')83 self.assertEqual(vpc_peering_connection.accepter_vpc_info.vpc_id, 'vpc-aa22cc33')84 self.assertEqual(vpc_peering_connection.expiration_time, '2014-02-17T16:00:50.000Z')85 else:86 self.assertEqual(vpc_peering_connection.id, 'pcx-444bbb88')87 self.assertEqual(vpc_peering_connection.status_code, 'pending-acceptance')88 self.assertEqual(vpc_peering_connection.status_message, 'Pending Acceptance by 98654313')89 self.assertEqual(vpc_peering_connection.requester_vpc_info.owner_id, '1237897234')90 self.assertEqual(vpc_peering_connection.requester_vpc_info.vpc_id, 'vpc-2398abcd')91 self.assertEqual(vpc_peering_connection.requester_vpc_info.cidr_block, '172.30.0.0/16')92 self.assertEqual(vpc_peering_connection.accepter_vpc_info.owner_id, '98654313')93 self.assertEqual(vpc_peering_connection.accepter_vpc_info.vpc_id, 'vpc-0983bcda')94 self.assertEqual(vpc_peering_connection.expiration_time, '2015-02-17T16:00:50.000Z')95class TestCreateVpcPeeringConnection(AWSMockServiceTestCase):96 CREATE_VPC_PEERING_CONNECTION= b"""<CreateVpcPeeringConnectionResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">97 <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>98 <vpcPeeringConnection>99 <vpcPeeringConnectionId>pcx-73a5401a</vpcPeeringConnectionId>100 <requesterVpcInfo>101 <ownerId>777788889999</ownerId>102 <vpcId>vpc-1a2b3c4d</vpcId>103 <cidrBlock>10.0.0.0/28</cidrBlock>104 </requesterVpcInfo>105 <accepterVpcInfo>106 <ownerId>123456789012</ownerId>107 <vpcId>vpc-a1b2c3d4</vpcId>108 </accepterVpcInfo>109 <status>110 <code>initiating-request</code>111 <message>Initiating Request to 123456789012</message>112 </status>113 <expirationTime>2014-02-18T14:37:25.000Z</expirationTime>114 <tagSet/>115 </vpcPeeringConnection>116</CreateVpcPeeringConnectionResponse>"""117 118 connection_class = VPCConnection119 def default_body(self):120 return self.CREATE_VPC_PEERING_CONNECTION121 def test_create_vpc_peering_connection(self):122 self.set_http_response(status_code=200)123 vpc_peering_connection = self.service_connection.create_vpc_peering_connection('vpc-1a2b3c4d', 'vpc-a1b2c3d4', '123456789012')124 self.assertEqual(vpc_peering_connection.id, 'pcx-73a5401a')125 self.assertEqual(vpc_peering_connection.status_code, 'initiating-request')126 self.assertEqual(vpc_peering_connection.status_message, 'Initiating Request to 123456789012')127 self.assertEqual(vpc_peering_connection.requester_vpc_info.owner_id, '777788889999')128 self.assertEqual(vpc_peering_connection.requester_vpc_info.vpc_id, 'vpc-1a2b3c4d')129 self.assertEqual(vpc_peering_connection.requester_vpc_info.cidr_block, '10.0.0.0/28')130 self.assertEqual(vpc_peering_connection.accepter_vpc_info.owner_id, '123456789012')131 self.assertEqual(vpc_peering_connection.accepter_vpc_info.vpc_id, 'vpc-a1b2c3d4')132 self.assertEqual(vpc_peering_connection.expiration_time, '2014-02-18T14:37:25.000Z')133class TestDeleteVpcPeeringConnection(AWSMockServiceTestCase):134 DELETE_VPC_PEERING_CONNECTION= b"""<DeleteVpcPeeringConnectionResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">135 <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>136 <return>true</return>137</DeleteVpcPeeringConnectionResponse>"""138 139 connection_class = VPCConnection140 def default_body(self):141 return self.DELETE_VPC_PEERING_CONNECTION142 def test_delete_vpc_peering_connection(self):143 self.set_http_response(status_code=200)144 self.assertEquals(self.service_connection.delete_vpc_peering_connection('pcx-12345678'), True)145class TestDeleteVpcPeeringConnectionShortForm(unittest.TestCase):146 DESCRIBE_VPC_PEERING_CONNECTIONS= b"""<?xml version="1.0" encoding="UTF-8"?>147<DescribeVpcPeeringConnectionsResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">148 <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>149 <vpcPeeringConnectionSet>150 <item>151 <vpcPeeringConnectionId>pcx-111aaa22</vpcPeeringConnectionId>152 <requesterVpcInfo>153 <ownerId>777788889999</ownerId>154 <vpcId>vpc-1a2b3c4d</vpcId>155 <cidrBlock>172.31.0.0/16</cidrBlock>156 </requesterVpcInfo>157 <accepterVpcInfo>158 <ownerId>111122223333</ownerId>159 <vpcId>vpc-aa22cc33</vpcId>160 </accepterVpcInfo>161 <status>162 <code>pending-acceptance</code>163 <message>Pending Acceptance by 111122223333</message>164 </status>165 <expirationTime>2014-02-17T16:00:50.000Z</expirationTime>166 </item>167 </vpcPeeringConnectionSet>168</DescribeVpcPeeringConnectionsResponse>"""169 DELETE_VPC_PEERING_CONNECTION= b"""<DeleteVpcPeeringConnectionResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">170 <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>171 <return>true</return>172</DeleteVpcPeeringConnectionResponse>"""173 def test_delete_vpc_peering_connection(self):174 vpc_conn = VPCConnection(aws_access_key_id='aws_access_key_id',175 aws_secret_access_key='aws_secret_access_key')176 mock_response = mock.Mock()177 mock_response.read.return_value = self.DESCRIBE_VPC_PEERING_CONNECTIONS178 mock_response.status = 200179 vpc_conn.make_request = mock.Mock(return_value=mock_response)180 vpc_peering_connections = vpc_conn.get_all_vpc_peering_connections()181 self.assertEquals(1, len(vpc_peering_connections))182 vpc_peering_connection = vpc_peering_connections[0]183 mock_response = mock.Mock()184 mock_response.read.return_value = self.DELETE_VPC_PEERING_CONNECTION185 mock_response.status = 200186 vpc_conn.make_request = mock.Mock(return_value=mock_response)187 self.assertEquals(True, vpc_peering_connection.delete())188 self.assertIn('DeleteVpcPeeringConnection', vpc_conn.make_request.call_args_list[0][0])189 self.assertNotIn('DeleteVpc', vpc_conn.make_request.call_args_list[0][0])190class TestRejectVpcPeeringConnection(AWSMockServiceTestCase):191 REJECT_VPC_PEERING_CONNECTION= b"""<RejectVpcPeeringConnectionResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">192 <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>193 <return>true</return>194</RejectVpcPeeringConnectionResponse>"""195 196 connection_class = VPCConnection197 def default_body(self):198 return self.REJECT_VPC_PEERING_CONNECTION199 def test_reject_vpc_peering_connection(self):200 self.set_http_response(status_code=200)201 self.assertEquals(self.service_connection.reject_vpc_peering_connection('pcx-12345678'), True)202class TestAcceptVpcPeeringConnection(AWSMockServiceTestCase):203 ACCEPT_VPC_PEERING_CONNECTION= b"""<AcceptVpcPeeringConnectionResponse xmlns="http://ec2.amazonaws.com/doc/2014-05-01/">204 <requestId>7a62c49f-347e-4fc4-9331-6e8eEXAMPLE</requestId>205 <vpcPeeringConnection>206 <vpcPeeringConnectionId>pcx-1a2b3c4d</vpcPeeringConnectionId>207 <requesterVpcInfo>208 <ownerId>123456789012</ownerId>209 <vpcId>vpc-1a2b3c4d</vpcId>210 <cidrBlock>10.0.0.0/28</cidrBlock>211 </requesterVpcInfo>212 <accepterVpcInfo>213 <ownerId>777788889999</ownerId>214 <vpcId>vpc-111aaa22</vpcId>215 <cidrBlock>10.0.1.0/28</cidrBlock>216 </accepterVpcInfo>217 <status>218 <code>active</code>219 <message>Active</message>220 </status>221 <tagSet/>222 </vpcPeeringConnection>223</AcceptVpcPeeringConnectionResponse>"""224 225 connection_class = VPCConnection226 def default_body(self):227 return self.ACCEPT_VPC_PEERING_CONNECTION228 def test_accept_vpc_peering_connection(self):229 self.set_http_response(status_code=200)230 vpc_peering_connection = self.service_connection.accept_vpc_peering_connection('pcx-1a2b3c4d')231 self.assertEqual(vpc_peering_connection.id, 'pcx-1a2b3c4d')232 self.assertEqual(vpc_peering_connection.status_code, 'active')233 self.assertEqual(vpc_peering_connection.status_message, 'Active')234 self.assertEqual(vpc_peering_connection.requester_vpc_info.owner_id, '123456789012')235 self.assertEqual(vpc_peering_connection.requester_vpc_info.vpc_id, 'vpc-1a2b3c4d')236 self.assertEqual(vpc_peering_connection.requester_vpc_info.cidr_block, '10.0.0.0/28')237 self.assertEqual(vpc_peering_connection.accepter_vpc_info.owner_id, '777788889999')238 self.assertEqual(vpc_peering_connection.accepter_vpc_info.vpc_id, 'vpc-111aaa22')239 self.assertEqual(vpc_peering_connection.accepter_vpc_info.cidr_block, '10.0.1.0/28')240if __name__ == '__main__':...
ec2_vpc_peer_describe.py
Source:ec2_vpc_peer_describe.py
...148 HAS_BOTO3 = True149except ImportError:150 HAS_BOTO3 = False151def get_peer_connection(vpc, status, vpc_id, peer_vpc_id):152 result = vpc.describe_vpc_peering_connections(Filters=[153 {'Name': 'requester-vpc-info.vpc-id', 'Values': [vpc_id]},154 {'Name': 'accepter-vpc-info.vpc-id', 'Values': [peer_vpc_id]},155 {'Name': 'status-code', 'Values' : [status]}156 ])157 if result['VpcPeeringConnections'] == []:158 result = vpc.describe_vpc_peering_connections(Filters=[159 {'Name': 'requester-vpc-info.vpc-id', 'Values': [peer_vpc_id]},160 {'Name': 'accepter-vpc-info.vpc-id', 'Values': [vpc_id]},161 {'Name': 'status-code', 'Values' : [status]}162 ])163 return None if len(result['VpcPeeringConnections']) != 1 else result['VpcPeeringConnections'][0]164def list_peer_connections(vpc, status, vpc_id):165 result = vpc.describe_vpc_peering_connections(Filters=[166 {'Name': 'requester-vpc-info.vpc-id', 'Values': [vpc_id]},167 {'Name': 'status-code', 'Values' : [status]}168 ])169 if result['VpcPeeringConnections'] == []:170 result = vpc.describe_vpc_peering_connections(Filters=[171 {'Name': 'accepter-vpc-info.vpc-id', 'Values': [vpc_id]},172 {'Name': 'status-code', 'Values' : [status]}173 ])174 return result['VpcPeeringConnections']175def main():176 argument_spec = ec2_argument_spec()177 argument_spec.update(dict(178 vpc_id = dict(),179 peer_vpc_id = dict(),180 status = dict(choices=['pending-acceptance', 'failed', 'expired', 'provisioning', 'active', 'deleted', 'rejected'], default='active')181 ),182 )183 module = AnsibleModule(argument_spec=argument_spec, supports_check_mode=True)184 if not HAS_BOTO3:...
peering.py
Source:peering.py
...27 i_am_plural = 'VPC peering connections'28 def __init__(self, accounts=None, debug=False):29 super(Peering, self).__init__(accounts=accounts, debug=debug)30 @record_exception()31 def describe_vpc_peering_connections(self, **kwargs):32 from security_monkey.common.sts_connect import connect33 conn = connect(kwargs['account_name'], 'boto3.ec2.client', region=kwargs['region'],34 assumed_role=kwargs['assumed_role'])35 peering_info = self.wrap_aws_rate_limited_call(36 conn.describe_vpc_peering_connections)37 return peering_info38 def slurp(self):39 """40 :returns: item_list - list of vpc peerings.41 :returns: exception_map - A dict where the keys are a tuple containing the42 location of the exception and the value is the actual exception43 """44 self.prep_for_slurp()45 @iter_account_region(index=self.index, accounts=self.accounts, service_name='ec2')46 def slurp_items(**kwargs):47 item_list = []48 exception_map = {}49 kwargs['exception_map'] = exception_map50 app.logger.debug("Checking {}/{}/{}".format(self.index,51 kwargs['account_name'], kwargs['region']))52 peering_info = self.describe_vpc_peering_connections(**kwargs)53 if peering_info:54 all_peerings = peering_info.get('VpcPeeringConnections', [])55 app.logger.debug("Found {} {}".format(56 len(all_peerings), self.i_am_plural))57 for peering in all_peerings:58 connection_id = peering['VpcPeeringConnectionId']59 tags = peering.get('Tags')60 peering_name = None61 if tags:62 peering_name = tags[0].get('Value', None)63 if not (peering_name is None):64 peering_name = "{0} ({1})".format(65 peering_name.encode('utf-8', 'ignore'), connection_id)66 else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!