How to use describe_instances method in localstack

Best Python code snippet using localstack_python

load_balancer.py

Source:load_balancer.py Github

copy

Full Screen

...44 },45 ],46 )47time.sleep(120)48existing_instances = ec2.describe_instances()49pub_ip={}50for i in (existing_instances["Reservations"]):51 if ("Tags" in list(i["Instances"][0].keys())):52 for tag in (i["Instances"][0]["Tags"]):53 if(tag["Value"]=="tirta"):54 status = i["Instances"][0]["State"]["Name"]55 if status == "running":56 pub_ip[(i["Instances"][0]["InstanceId"])]= i["Instances"][0]["PublicIpAddress"]57print("As publics IP que estao rodando sao: ", pub_ip)58@app.route('/', defaults={'path': ''})59@app.route('/<path:path>')60def catch_all(path):61 global pub_ip62 ip = random.choice(list(pub_ip.values()))63 return redirect("http://" + ip + ":5000/" + path,code=307)64#https://www.saltycrane.com/blog/2008/09/simplistic-python-thread-example/65def timeout():66 print("Deu ruim... timeout")67 global id_atual68 global pub_ip69 global quant70 print("a id que falhou eh",id_atual)71 existing_instances = ec2.describe_instances()72 ec2.terminate_instances(InstanceIds=[id_atual])73 74 75 76 existing_instances = ec2.describe_instances()77 for i in range(len(existing_instances["Reservations"])):78 for tag in (existing_instances["Reservations"][i]["Instances"]):79 if(tag["InstanceId"]==id_atual):80 status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]81 while (status!="terminated"):82 existing_instances = ec2.describe_instances()83 status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]84 time.sleep(8)85 continue86 pub_ip={}87 existing_instances = ec2.describe_instances()88 for i in (existing_instances["Reservations"]):89 if ("Tags" in list(i["Instances"][0].keys())):90 for tag in (i["Instances"][0]["Tags"]):91 if(tag["Value"]=="tirta"):92 status = i["Instances"][0]["State"]["Name"]93 if status == "running":94 pub_ip[(i["Instances"][0]["InstanceId"])]= i["Instances"][0]["PublicIpAddress"]95 if (len(list(pub_ip.values())))>int(quant):96 existing_instances = ec2.describe_instances()97 id_extra=random.choice(list(pub_ip.keys()))98 ec2.terminate_instances(InstanceIds=[id_extra])99 existing_instances = ec2.describe_instances()100 for i in range(len(existing_instances["Reservations"])):101 for tag in (existing_instances["Reservations"][i]["Instances"]):102 if(tag["InstanceId"]==id_atual):103 status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]104 while (status!="terminated"):105 existing_instances = ec2.describe_instances()106 status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]107 time.sleep(8)108 continue109 while(len(list(pub_ip.values()))<int(quant)):110 print("Creating Instance")111 ec2_service.create_instances(ImageId='ami-0ac019f4fcb7cb7e6', MinCount=1, MaxCount=1,112 InstanceType='t2.micro',113 KeyName='teste',114 SecurityGroups=['aps'],115 UserData="""#!/bin/bash116 cd home/ubuntu/117 git clone https://github.com/eduardotp1/projeto_cloud.git118 sudo apt-get -y update119 sudo apt-get install -y python3-pip120 sudo pip3 install flask121 sudo pip3 install flask_restful122 sudo pip3 install boto3123 cd projeto_cloud/124 python3 app.py125 """,126 TagSpecifications=[127 {128 'ResourceType': 'instance',129 'Tags': [130 {131 'Key': 'Owner',132 'Value': 'tirta'133 },134 ]135 },136 ],137 )138 time.sleep(120)139 pub_ip={}140 existing_instances = ec2.describe_instances()141 for i in (existing_instances["Reservations"]):142 if ("Tags" in list(i["Instances"][0].keys())):143 for tag in (i["Instances"][0]["Tags"]):144 if(tag["Value"]=="tirta"):145 status = i["Instances"][0]["State"]["Name"]146 if status == "running":147 pub_ip[(i["Instances"][0]["InstanceId"])]= i["Instances"][0]["PublicIpAddress"]148 149 existing_instances = ec2.describe_instances()150 for i in (existing_instances["Reservations"]):151 if ("Tags" in list(i["Instances"][0].keys())):152 for tag in (i["Instances"][0]["Tags"]):153 if(tag["Value"]=="tirta"):154 status = i["Instances"][0]["State"]["Name"]155 if status == "running":156 pub_ip[(i["Instances"][0]["InstanceId"])]= i["Instances"][0]["PublicIpAddress"]157 health()158if (len(list(pub_ip.values()))!=0):159 id_atual = list(pub_ip.values())[0]160def health():161 global pub_ip162 global id_atual163 global quant164 while(True):165 if (len(list(pub_ip.keys()))<int(quant)):166 print("Creating Instance")167 ec2_service.create_instances(ImageId='ami-0ac019f4fcb7cb7e6', MinCount=1, MaxCount=1,168 InstanceType='t2.micro',169 KeyName='teste',170 SecurityGroups=['aps'],171 UserData="""#!/bin/bash172 cd home/ubuntu/173 git clone https://github.com/eduardotp1/projeto_cloud.git174 sudo apt-get -y update175 sudo apt-get install -y python3-pip176 sudo pip3 install flask177 sudo pip3 install flask_restful178 sudo pip3 install boto3179 cd projeto_cloud/180 python3 app.py181 """,182 TagSpecifications=[183 {184 'ResourceType': 'instance',185 'Tags': [186 {187 'Key': 'Owner',188 'Value': 'tirta'189 },190 ]191 },192 ],193 )194 time.sleep(120)195 elif (len(list(pub_ip.values()))>int(quant)):196 existing_instances = ec2.describe_instances()197 id_extra=random.choice(list(pub_ip.keys()))198 ec2.terminate_instances(InstanceIds=[id_extra])199 existing_instances = ec2.describe_instances()200 for i in range(len(existing_instances["Reservations"])):201 for tag in (existing_instances["Reservations"][i]["Instances"]):202 if(tag["InstanceId"]==id_atual):203 status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]204 while (status!="terminated"):205 existing_instances = ec2.describe_instances()206 status=existing_instances["Reservations"][i]["Instances"][0]["State"]["Name"]207 time.sleep(8)208 continue209 pub_ip={}210 existing_instances = ec2.describe_instances()211 for i in (existing_instances["Reservations"]):212 if ("Tags" in list(i["Instances"][0].keys())):213 for tag in (i["Instances"][0]["Tags"]):214 if(tag["Value"]=="tirta"):215 status = i["Instances"][0]["State"]["Name"]216 if status == "running":217 pub_ip[(i["Instances"][0]["InstanceId"])]= i["Instances"][0]["PublicIpAddress"]218 for key, values in pub_ip.items():219 id_atual=key220 t = Timer(50.0,timeout)221 t.start()222 try:223 r=requests.get('http://' + values + ':5000/healthcheck')224 except:...

Full Screen

Full Screen

test_aws.py

Source:test_aws.py Github

copy

Full Screen

1"""Test the AWS module"""2# pylint: disable=redefined-outer-name,no-self-use,unused-argument,protected-access,missing-docstring,invalid-name,line-too-long3from collections import namedtuple4import json5try:6 from unittest.mock import patch7except ImportError:8 from mock import patch9import pytest10from aws_ssh import aws, errors11SessionVars = namedtuple('SessionVars', 'session instance client describe_instances')12@pytest.fixture13def session_vars():14 with patch('boto3.session.Session') as session_mock:15 instance = session_mock.return_value16 client = instance.client17 describe_instances = client.return_value.describe_instances18 yield SessionVars(session_mock, instance, client, describe_instances)19def test_get_session():20 with patch('boto3.session.Session') as session_mock:21 aws.get_session('foobar')22 session_mock.assert_called_with(profile_name='foobar')23def test_get_instance_info(session_vars):24 session_vars.describe_instances.return_value = get_sample_response()25 info = aws.get_instance_info('foobar', 'test-', 'name')26 session_vars.session.assert_called_with(profile_name='foobar')27 session_vars.client.assert_called_with('ec2')28 assert session_vars.describe_instances.called29 assert info['PublicIpAddress'] == '52.90.39.59'30def test_get_instance_info_empty(session_vars):31 session_vars.describe_instances.return_value = get_sample_response(0)32 with pytest.raises(errors.NoInstanceFoundError):33 aws.get_instance_info('foobar', 'test-', 'name')34def test_get_instance_info_multiple(session_vars):35 session_vars.describe_instances.return_value = get_sample_response(2)36 with pytest.raises(errors.TooManyInstancesError):37 aws.get_instance_info('foobar', 'test-', 'name')38def get_sample_response(instance_count=1):39 response = {'Reservations': [{'Instances': []}]}40 if instance_count == 0:41 response['Reservations'] = []42 return response43 response['Reservations'][0]['Instances'] = [json.loads(SAMPLE_INSTANCE_BODY) for _ in range(instance_count)]44 return response45SAMPLE_INSTANCE_BODY = """46{"InstanceId": "i-0958008e", "ImageId": "ami-d05e75b8", "State": {"Code": 16, "Name": "running"}, "PrivateDnsName": "ip-10-0-0-186.ec2.internal", "PublicDnsName": "ec2-52.90.39.59.compute-1.amazonaws.com", "StateTransitionReason": "", "KeyName": "project", "AmiLaunchIndex": 0, "ProductCodes": [], "InstanceType": "m3.large", "LaunchTime": "2016-05-12T19:38:00.000Z", "Placement": {"AvailabilityZone": "us-east-1a", "GroupName": "", "Tenancy": "default"}, "Monitoring": {"State": "disabled"}, "SubnetId": "subnet-8dadbffa", "VpcId": "vpc-e821f18c", "PrivateIpAddress": "10.0.0.186", "PublicIpAddress": "52.90.39.59", "Architecture": "x86_64", "RootDeviceType": "ebs", "RootDeviceName": "/dev/sda1", "BlockDeviceMappings": [{"DeviceName": "/dev/sda1", "Ebs": {"VolumeId": "vol-3f4df594", "Status": "attached", "AttachTime": "2016-05-12T19:38:01.000Z", "DeleteOnTermination": true}}], "VirtualizationType": "hvm", "ClientToken": "project-Comput-1HKOY7086GV2G", "Tags": [{"Key": "POC", "Value": "Aru Sahni"}, {"Key": "Name", "Value": "project-compute"}, {"Key": "aws:cloudformation:logical-id", "Value": "ComputeInstance"}, {"Key": "Class", "Value": "project-compute"}, {"Key": "Project", "Value": "project"}, {"Key": "Stack", "Value": "project-cloudformation"}, {"Key": "aws:cloudformation:stack-id", "Value": "arn:aws:cloudformation:us-east-1:577354146450:stack/project/7ab2d640-66e6-11e5-955a-500150b34c7c"}, {"Key": "aws:cloudformation:stack-name", "Value": "project"}], "SecurityGroups": [{"GroupName": "project-SecurityGroupComputeWeb-W8OZK5PH7KRN", "GroupId": "sg-5457982f"}, {"GroupName": "project-SecurityGroupHGHQ-G86ETFB3FG4O", "GroupId": "sg-de9476b8"}], "SourceDestCheck": true, "Hypervisor": "xen", "NetworkInterfaces": [{"NetworkInterfaceId": "eni-800518c0", "SubnetId": "subnet-8dadbffa", "VpcId": "vpc-e821f18c", "Description": "Primary network interface", "OwnerId": "577354146450", "Status": "in-use", "MacAddress": "0a:1e:c9:33:d8:15", "PrivateIpAddress": "10.0.0.186", "PrivateDnsName": "ip-10-0-0-186.ec2.internal", "SourceDestCheck": true, "Groups": [{"GroupName": "project-SecurityGroupComputeWeb-W8OZK5PH7KRN", "GroupId": "sg-5457982f"}, {"GroupName": "project-SecurityGroupHGHQ-G86ETFB3FG4O", "GroupId": "sg-de9476b8"}], "Attachment": {"AttachmentId": "eni-attach-f61a4b0b", "DeviceIndex": 0, "Status": "attached", "AttachTime": "2016-05-12T19:38:00.000Z", "DeleteOnTermination": true}, "Association": {"PublicIp": "52.90.39.59", "PublicDnsName": "ec2-52.90.39.59.compute-1.amazonaws.com", "IpOwnerId": "amazon"}, "PrivateIpAddresses": [{"PrivateIpAddress": "10.0.0.186", "PrivateDnsName": "ip-10-0-0-186.ec2.internal", "Primary": true, "Association": {"PublicIp": "52.90.39.59", "PublicDnsName": "ec2-52.90.39.59.compute-1.amazonaws.com", "IpOwnerId": "amazon"}}], "Ipv6Addresses": []}], "IamInstanceProfile": {"Arn": "arn:aws:iam::577354146450:instance-profile/project-ComputeInstanceProfile-J4Q37C5EEWSI", "Id": "AIPAIGVRQT3L5JVKDWKCS"}, "EbsOptimized": false}...

Full Screen

Full Screen

test_policy_gen.py

Source:test_policy_gen.py Github

copy

Full Screen

...15 stub.add_response('describe_images', {}, {})16 policy_gen = PolicyGenerator()17 policy_gen.record()18 stub.activate()19 ec2.describe_instances()20 ec2.describe_images()21 policy = json.loads(policy_gen.generate())22 assert 'Statement' in policy23 assert len(policy['Statement']) == 124 assert len(policy['Statement'][0]['Action']) == 225 assert 'ec2:DescribeInstances' in policy['Statement'][0]['Action']26 assert 'ec2:DescribeImages' in policy['Statement'][0]['Action']27 def test_records_policy_from_multiple_clients(self):28 ec2, ec2_stub = self.gen_client_and_stub('ec2')29 rds, rds_stub = self.gen_client_and_stub('rds')30 s3, s3_stub = self.gen_client_and_stub('s3')31 ec2_stub.add_response('describe_instances', {}, {})32 rds_stub.add_response('describe_db_instances', {}, {})33 s3_stub.add_response('list_buckets', {}, {})34 policy_gen = PolicyGenerator()35 policy_gen.record()36 ec2_stub.activate()37 rds_stub.activate()38 s3_stub.activate()39 ec2.describe_instances()40 rds.describe_db_instances()41 s3.list_buckets()42 policy = json.loads(policy_gen.generate())43 assert len(policy['Statement'][0]['Action']) == 344 assert 'ec2:DescribeInstances' in policy['Statement'][0]['Action']45 assert 's3:ListBuckets' in policy['Statement'][0]['Action']46 assert 'rds:DescribeDBInstances' in policy['Statement'][0]['Action']47 def test_actions_in_policy_are_unique(self):48 ec2, stub = self.gen_client_and_stub('ec2')49 for i in range(10):50 stub.add_response('describe_instances', {}, {})51 policy_gen = PolicyGenerator()52 policy_gen.record()53 stub.activate()54 for i in range(10):55 ec2.describe_instances()56 policy = json.loads(policy_gen.generate())57 assert len(policy['Statement'][0]['Action']) == 158 assert 'ec2:DescribeInstances' in policy['Statement'][0]['Action']59 def test_stub_can_be_activated_before_policy_gen(self):60 ec2, stub = self.gen_client_and_stub('ec2')61 stub.add_response('describe_instances', {}, {})62 stub.activate()63 policy_gen = PolicyGenerator()64 policy_gen.record()65 ec2.describe_instances()66 policy = json.loads(policy_gen.generate())67 assert len(policy['Statement'][0]['Action']) == 168 assert 'ec2:DescribeInstances' in policy['Statement'][0]['Action']69 @mock_ec270 def test_policy_is_recorded_when_not_stubbed(self):71 session = boto3.Session(region_name='eu-west-1')72 ec2 = session.client('ec2')73 policy_gen = PolicyGenerator()74 policy_gen.record()75 ec2.describe_instances()76 policy = json.loads(policy_gen.generate())77 assert len(policy['Statement'][0]['Action']) == 1...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful