Best Python code snippet using localstack_python
test_amazon_driver.py
Source:test_amazon_driver.py
1""" AmazonDriver for Compute2 based on BaseDriver for Compute Resource3"""4import mock5from botocore.exceptions import ClientError6from calplus.tests import base7from calplus.v1.compute.drivers.amazon import AmazonDriver8fake_config_driver = {9 'driver_name': 'AMAZON1',10 'aws_access_key_id': 'fake_id',11 'aws_secret_access_key': 'fake_key',12 'region_name': 'us-east-1',13 'endpoint_url': 'http://localhost:8788',14 'limit': {15 }16}17fake_error_code = {18 'ResponseMetadata': {19 'HTTPStatusCode': 400,20 'RequestId': 'req-a4570d1a-8319-4d6f-8077-7044d72ef449',21 'HTTPHeaders': {22 'date': 'Sat, 24 Sep 2016 03:09:39 GMT',23 'content-length': '250',24 'content-type': 'text/xml'25 }26 },27 'Error': {28 'Message': "The subnet ID 'fake_id' does not exist",29 'Code': 'InvalidSubnetID.NotFound'30 }31}32fake_describe_return = {33 'Reservations': [34 {35 'Groups': [],36 'Instances': [37 {38 'AmiLaunchIndex': 0,39 'ImageId': 'ami-2624df94',40 'InstanceId': 'i-72827591',41 'InstanceType': 'm1.ec2api-alt',42 'KernelId': 'aki-512bee3f',43 'KeyName': '',44 'LaunchTime': 'fake_time',45 'NetworkInterfaces': [46 {47 'Attachment': {48 'AttachTime': 'fake_time',49 'AttachmentId': 'eni-attach-ce1477aa',50 'DeleteOnTermination': True,51 'DeviceIndex': 0,52 'Status': 'attached'53 },54 'Description': '',55 'Groups': [56 {57 'GroupId': 'sg-9ed2e7ec',58 'GroupName': 'default'59 }60 ],61 'MacAddress': 'fa:16:3e:27:af:a2',62 'NetworkInterfaceId': 'eni-ce1477aa',63 'OwnerId': '3b91bb4e974a4729b3596f8cebb9b559',64 'PrivateIpAddress': '10.10.10.76',65 'PrivateIpAddresses': [66 {67 'Primary': True,68 'PrivateIpAddress': '10.10.10.76'69 }70 ],71 'SourceDestCheck': True,72 'Status': 'in-use',73 'SubnetId': 'subnet-b3a91954',74 'VpcId': 'vpc-9ed2e7ec'75 }76 ],77 'Placement': {78 'AvailabilityZone': ''79 },80 'PrivateDnsName': 'r-atxu9l73-0',81 'PrivateIpAddress': '10.10.10.76',82 'PublicDnsName': '',83 'RamdiskId': 'ari-b7e05ed6',84 'RootDeviceName': '/dev/vda',85 'RootDeviceType': 'instance-store',86 'SecurityGroups': [87 {88 'GroupId': 'sg-9ed2e7ec',89 'GroupName': 'default'90 }91 ],92 'SourceDestCheck': True,93 'State': {94 'Code': 0,95 'Name': 'error'96 },97 'SubnetId': 'subnet-b3a91954',98 'VpcId': 'vpc-9ed2e7ec'99 }100 ],101 'OwnerId': '3b91bb4e974a4729b3596f8cebb9b559',102 'ReservationId': 'r-atxu9l73'103 }104 ],105 'ResponseMetadata': {106 'HTTPHeaders': {107 'content-length': '2971',108 'content-type': 'text/xml',109 'date': 'Wed, 07 Dec 2016 16:17:11 GMT'110 },111 'HTTPStatusCode': 200,112 'RequestId': 'req-646be08c-9104-4855-a0d9-62013ba9566d'113 }114}115class FakeInstance(object):116 """In fact, this class is boto3.resources.factory.ec2.Instance117 """118 def __init__(self):119 super(FakeInstance, self).__init__()120 def terminate(self):121 pass122 def stop(self):123 pass124 def start(self):125 pass126 def reboot(self):127 pass128class AmazonDriverTest(base.TestCase):129 """docstring for AmazonDriverTest"""130 def setUp(self):131 super(AmazonDriverTest, self).setUp()132 self.fake_driver = AmazonDriver(fake_config_driver)133 def test_create_successfully(self):134 self.mock_object(135 self.fake_driver.resource, 'create_instances',136 mock.Mock(return_value=mock.Mock))137 self.fake_driver.create(138 'fake_image_id',139 'fake_flavor_id',140 'fake_net_id',141 'fake_name'142 )143 self.fake_driver.resource.create_instances.\144 assert_called_once_with(145 ImageId='fake_image_id',146 MinCount=1,147 MaxCount=1,148 InstanceType='fake_flavor_id',149 SubnetId='fake_net_id',150 IamInstanceProfile={151 'Arn': '',152 'Name': 'fake_name'153 }154 )155 def test_create_without_instance_name(self):156 self.mock_object(157 self.fake_driver.resource, 'create_instances',158 mock.Mock(return_value=mock.Mock))159 self.fake_driver.create(160 'fake_image_id',161 'fake_flavor_id',162 'fake_net_id'163 )164 self.fake_driver.resource.create_instances. \165 assert_called_once_with(166 ImageId='fake_image_id',167 MinCount=1,168 MaxCount=1,169 InstanceType='fake_flavor_id',170 SubnetId='fake_net_id',171 IamInstanceProfile={172 'Arn': '',173 'Name': mock.ANY174 }175 )176 def test_create_unable_to_create_instance(self):177 self.mock_object(178 self.fake_driver.resource, 'create_instances',179 mock.Mock(side_effect=ClientError(180 fake_error_code,181 'operation_name'182 ))183 )184 self.assertRaises(ClientError, self.fake_driver.create,185 'fake_image_id',186 'fake_flavor_id',187 'fake_net_id',188 'fake_name')189 self.fake_driver.resource.create_instances. \190 assert_called_once_with(191 ImageId='fake_image_id',192 MinCount=1,193 MaxCount=1,194 InstanceType='fake_flavor_id',195 SubnetId='fake_net_id',196 IamInstanceProfile={197 'Arn': '',198 'Name': mock.ANY199 }200 )201 def test_show_successfully(self):202 self.mock_object(203 self.fake_driver.client, 'describe_instances',204 mock.Mock(return_value=fake_describe_return))205 self.fake_driver.show('fake_id')206 self.fake_driver.client.describe_instances. \207 assert_called_once_with(InstanceIds=['fake_id'])208 def test_show_unable_to_show(self):209 self.mock_object(210 self.fake_driver.client, 'describe_instances',211 mock.Mock(side_effect=ClientError(212 fake_error_code,213 'operation_name'214 ))215 )216 self.assertRaises(ClientError,217 self.fake_driver.show, 'fake_id')218 self.fake_driver.client.describe_instances. \219 assert_called_once_with(InstanceIds=['fake_id'])220 def test_list_successfully(self):221 self.mock_object(222 self.fake_driver.client, 'describe_instances',223 mock.Mock(return_value=fake_describe_return))224 self.fake_driver.list()225 self.fake_driver.client.describe_instances. \226 assert_called_once_with()227 def test_list_unable_to_list(self):228 self.mock_object(229 self.fake_driver.client, 'describe_instances',230 mock.Mock(side_effect=ClientError(231 fake_error_code,232 'operation_name'233 ))234 )235 self.assertRaises(ClientError,236 self.fake_driver.list)237 self.fake_driver.client.describe_instances. \238 assert_called_once_with()239 def test_delete_successfully(self):240 self.mock_object(241 self.fake_driver.resource, 'Instance',242 mock.Mock(return_value=FakeInstance()))243 self.fake_driver.delete('fake_id')244 def test_delete_unable_to_list(self):245 self.mock_object(246 self.fake_driver.resource, 'Instance',247 mock.Mock(side_effect=ClientError(248 fake_error_code,249 'operation_name'250 ))251 )252 self.assertRaises(ClientError,253 self.fake_driver.delete, 'fake_id')254 def test_shutdown_successfully(self):255 self.mock_object(256 self.fake_driver.resource, 'Instance',257 mock.Mock(return_value=FakeInstance()))258 self.fake_driver.shutdown('fake_id')259 def test_shutdown_unable_to_list(self):260 self.mock_object(261 self.fake_driver.resource, 'Instance',262 mock.Mock(side_effect=ClientError(263 fake_error_code,264 'operation_name'265 ))266 )267 self.assertRaises(ClientError,268 self.fake_driver.shutdown, 'fake_id')269 def test_start_successfully(self):270 self.mock_object(271 self.fake_driver.resource, 'Instance',272 mock.Mock(return_value=FakeInstance()))273 self.fake_driver.start('fake_id')274 def test_start_unable_to_list(self):275 self.mock_object(276 self.fake_driver.resource, 'Instance',277 mock.Mock(side_effect=ClientError(278 fake_error_code,279 'operation_name'280 ))281 )282 self.assertRaises(ClientError,283 self.fake_driver.start, 'fake_id')284 def test_reboot_successfully(self):285 self.mock_object(286 self.fake_driver.resource, 'Instance',287 mock.Mock(return_value=FakeInstance()))288 self.fake_driver.reboot('fake_id')289 def test_reboot_unable_to_list(self):290 self.mock_object(291 self.fake_driver.resource, 'Instance',292 mock.Mock(side_effect=ClientError(293 fake_error_code,294 'operation_name'295 ))296 )297 self.assertRaises(ClientError,298 self.fake_driver.reboot, 'fake_id')299 def test_add_nic_successfully(self):300 self.mock_object(301 self.fake_driver.client, 'attach_network_interface',302 mock.Mock(return_value=mock.Mock))303 self.fake_driver.add_nic('fake_id', 'fake_net_id')304 self.fake_driver.client.attach_network_interface. \305 assert_called_once_with('fake_id', 'fake_net_id', 1)306 def test_add_nic_unable_to_add(self):307 self.mock_object(308 self.fake_driver.client, 'attach_network_interface',309 mock.Mock(side_effect=ClientError(310 fake_error_code,311 'operation_name'312 )))313 self.assertRaises(ClientError,314 self.fake_driver.add_nic, 'fake_id', 'fake_net_id')315 self.fake_driver.client.attach_network_interface. \316 assert_called_once_with('fake_id', 'fake_net_id', 1)317 def test_delete_nic_successfully(self):318 self.mock_object(319 self.fake_driver.client, 'detach_network_interface',320 mock.Mock(return_value=mock.Mock))321 self.fake_driver.delete_nic('fake_id', 'fake_attachment_id')322 self.fake_driver.client.detach_network_interface. \323 assert_called_once_with('fake_attachment_id')324 def test_delete_nic_unable_to_delete(self):325 self.mock_object(326 self.fake_driver.client, 'detach_network_interface',327 mock.Mock(side_effect=ClientError(328 fake_error_code,329 'operation_name'330 )))331 self.assertRaises(ClientError,332 self.fake_driver.delete_nic, 'fake_id', 'fake_attachment_id')333 self.fake_driver.client.detach_network_interface. \334 assert_called_once_with('fake_attachment_id')335 def test_list_nic_successfully(self):336 self.mock_object(337 self.fake_driver.client, 'describe_instances',338 mock.Mock(return_value=fake_describe_return))339 self.fake_driver.list_nic('fake_id')340 self.fake_driver.client.describe_instances. \341 assert_called_once_with(InstanceIds=['fake_id'])342 def test_list_nic_unable_to_list(self):343 self.mock_object(344 self.fake_driver.client, 'describe_instances',345 mock.Mock(side_effect=ClientError(346 fake_error_code,347 'operation_name'348 )))349 self.assertRaises(ClientError,350 self.fake_driver.list_nic, 'fake_id')351 self.fake_driver.client.describe_instances. \352 assert_called_once_with(InstanceIds=['fake_id'])353 def test_list_ip_successfully(self):354 self.mock_object(355 self.fake_driver.client, 'describe_instances',356 mock.Mock(return_value=fake_describe_return))357 self.fake_driver.list_ip('fake_id')358 self.fake_driver.client.describe_instances. \359 assert_called_once_with(InstanceIds=['fake_id'])360 def test_list_ip_unable_to_delete(self):361 self.mock_object(362 self.fake_driver.client, 'describe_instances',363 mock.Mock(side_effect=ClientError(364 fake_error_code,365 'operation_name'366 )))367 self.assertRaises(ClientError,368 self.fake_driver.list_ip, 'fake_id')369 self.fake_driver.client.describe_instances. \...
aws.py
Source:aws.py
...49 status = get_network_interface_state(network_interface)50 return status == 'attached'51 except KeyError:52 return False53def detach_network_interface(network_interface):54 def get_attachment_id(boto_client, interface):55 response = boto_client.describe_network_interfaces(56 NetworkInterfaceIds=[57 interface58 ]59 )60 return response['NetworkInterfaces'][0]['Attachment']['AttachmentId']61 client = boto3.client('ec2')62 client.detach_network_interface(63 AttachmentId=get_attachment_id(client, network_interface)64 )65def ensure_network_interface_is_detached(network_interface):66 try:67 while get_network_interface_state(network_interface) != 'detached':68 time.sleep(1)69 except KeyError:70 pass71def attach_network_interface(network_interface, instance_id):72 client = boto3.client('ec2')73 for _ in xrange(10):74 try:75 client.attach_network_interface(76 NetworkInterfaceId=network_interface,77 InstanceId=instance_id,78 DeviceIndex=DEVICE_INDEX79 )80 except ClientError:81 time.sleep(1)82def configure_local_interface(local_interface, ip, netmask):83 cmd = [84 'ifconfig',85 local_interface,86 'inet',87 ip,88 'netmask',89 netmask90 ]91 for _ in xrange(10):92 os.environ['PATH'] = "%s:/sbin" % os.environ['PATH']93 env = os.environ94 proc = Popen(cmd, env=env)95 proc.communicate()96 if proc.returncode:97 time.sleep(1)98 else:99 return100def aws_notify_master(cfg):101 """The function moves network interface to local instance and brings it up.102 Steps:103 - Detach network interface if attached to anywhere.104 - Attach the network interface to the local instance.105 - Configure IP address on this instance106 :param cfg: config object107 """108 try:109 os.environ['AWS_ACCESS_KEY_ID'] = cfg.get('aws', 'aws_access_key_id')110 os.environ['AWS_SECRET_ACCESS_KEY'] = cfg.get('aws',111 'aws_secret_access_key')112 os.environ['AWS_DEFAULT_REGION'] = cfg.get('aws', 'aws_default_region')113 except NoOptionError:114 LOG.error('aws_access_key_id, aws_secret_access_key and '115 'aws_default_region must be defined in '116 'aws section of the config file.')117 exit(-1)118 instance_id = get_my_instance_id()119 try:120 ip = cfg.get('proxysql', 'virtual_ip')121 netmask = cfg.get('proxysql', 'virtual_netmask')122 network_interface = get_network_interface(ip)123 if network_interface_attached(network_interface):124 detach_network_interface(network_interface)125 local_interface = "eth%d" % DEVICE_INDEX126 ensure_local_interface_is_gone(local_interface)127 ensure_network_interface_is_detached(network_interface)128 attach_network_interface(network_interface, instance_id)129 configure_local_interface(local_interface, ip, netmask)130 except NoOptionError as err:131 LOG.error('virtual_ip and virtual_netmask must be defined in '132 'proxysql section of the config file.')...
test_aws.py
Source:test_aws.py
...25def test__network_interface_attached_if_side_effect_is_key_error(mocker):26 mock_get_interface_state = mocker.patch('proxysql_tools.aws.aws.get_network_interface_state')27 mock_get_interface_state.side_effect = KeyError28 assert not network_interface_attached('1.1.1.1')29def test__detach_network_interface(mocker):30 mock_boto3 = mocker.patch('proxysql_tools.aws.aws.boto3')31 mock_ec2 = MagicMock()32 mock_boto3.client.return_value = mock_ec233 detach_network_interface('stub')...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!