Best Python code snippet using molecule_python
gpu-finder.py
Source:gpu-finder.py
1#!/usr/bin/env python2# Copyright 2015 Google Inc. All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15"""Example of using the Compute Engine API to create and delete instances.16Creates a new compute engine instance and uses it to apply a caption to17an image.18 https://cloud.google.com/compute/docs/tutorials/python-guide19For more information, see the README.md under /compute.20"""21import time22import json23import googleapiclient.discovery24def check_gpu_config(config):25 compute_config = config26 if compute_config['instance_config']['machine_type'].startswith('a2'):27 number_of_gpus_requested = compute_config['instance_config']['number_of_gpus']28 gpus_in_machine_type = compute_config['instance_config']['machine_type'][(compute_config['instance_config']['machine_type'].find('highgpu')+8):(len(compute_config['instance_config']['machine_type'])-1)]29 if number_of_gpus_requested != int(gpus_in_machine_type):30 raise Exception("Please match the number of GPUs parameter with the correct machine type in the config file")31def get_zone_info(compute, project):32 zone_list = []33 request = compute.zones().list(project=project)34 while request is not None:35 response = request.execute()36 for zone in response['items']:37 if zone['status'] == 'UP':38 zone_regions = {39 'region': zone['name'][0:len(zone['name'])-2],40 'zone': zone['name']41 }42 zone_list.append(zone_regions)43 request = compute.zones().list_next(previous_request=request, previous_response=response)44 return zone_list45def check_machine_type_and_accelerator(compute, project, machine_type, gpu_type, zones):46 zone_list = zones47 available_zones = []48 for zone in zone_list:49 request = compute.machineTypes().list(project=project, zone=zone['zone'])50 while request is not None:51 response = request.execute()52 for machine in response['items']:53 if 'accelerators' in machine and machine['name'] == machine_type and machine['accelerators'][0]['guestAcceleratorType'] == gpu_type:54 zones_with_instances = {55 'machine_type': machine['name'],56 'region': zone['region'],57 'zone': zone['zone'],58 'guest_cpus': machine['guestCpus'],59 'description': machine['description'],60 'accelerators': machine['accelerators']61 }62 available_zones.append(zones_with_instances)63 elif machine['name'] == machine_type:64 zones_with_instances = {65 'machine_type': machine['name'],66 'region': zone['region'],67 'zone': zone['zone'],68 'guest_cpus': machine['guestCpus'],69 'description': machine['description']70 }71 available_zones.append(zones_with_instances)72 request = compute.machineTypes().list_next(previous_request=request, previous_response=response)73 if not available_zones:74 raise Exception(f"No machine types of {machine_type} are available")75 return available_zones76def get_accelerator_quota(compute, project, config, zone, requested_gpus):77 zone_list = zone78 accelerator_list = []79 for i in zone_list:80 request = compute.acceleratorTypes().list(project=project, zone=i['zone'])81 while request is not None:82 response = request.execute()83 if 'items' in response:84 for accelerator in response['items']:85 if accelerator['name'] == config['instance_config']['gpu_type']:86 if requested_gpus <= accelerator['maximumCardsPerInstance']:87 accelerator_dict = {88 "region": i['region'],89 "zone": i['zone'],90 "machine_type": i['machine_type'],91 "guest_cpus": i['guest_cpus'],92 "name": accelerator['name'],93 "description": accelerator['description'],94 "maximum number of GPUs per instance": accelerator['maximumCardsPerInstance']95 }96 accelerator_list.append(accelerator_dict)97 print(f"{requested_gpus} GPUs requested per instance, {i['zone']} has {accelerator['name']} GPUs with a maximum of {accelerator['maximumCardsPerInstance']} per instance")98 else:99 print(100 f"{requested_gpus} GPUs requested per instance, {i['zone']} doesn't have enough GPUs, with a maximum of {accelerator['maximumCardsPerInstance']} per instance")101 request = compute.acceleratorTypes().list_next(previous_request=request, previous_response=response)102 if not accelerator_list:103 raise Exception(f"No accelerator types of {config['instance_config']['gpu_type']} are available with {config['instance_config']['machine_type']} in any zone, or wrong number of GPUs requested")104 return accelerator_list105def create_instance(compute, project, config, zone_list):106 compute_config = config107 regions_to_try = list({v['region'] for v in zone_list})108 created_instances = []109 instances = 0110 regions_attempted = 0111 print(f"There are {len(regions_to_try)} regions to try that match the GPU type and machine type configuration.")112 for region in regions_to_try:113 print(f"Attempting to create instances in {region}")114 zones = [z for z in zone_list if z['region'] == region]115 print(f"There are {len(zones)} zones to try in {region}")116 zones_attempted = 0117 move_regions = 0118 for i in range(len(zones)):119 zone_config = zones[i]120 for j in range(compute_config['number_of_instances']):121 print(f"Creating instance number {instances+1} of {compute_config['number_of_instances']} in {zone_config['zone']}, zone {zones_attempted+1} out of {len(zones)} attempted.")122 image_project = compute_config['instance_config']['image_project']123 image_family = compute_config['instance_config']['image_family']124 image_response = compute.images().getFromFamily(125 project=image_project, family=image_family).execute()126 source_disk_image = image_response['selfLink']127 instance_name = compute_config['instance_config']['name'] + '-' + str(instances+1) + '-' + zone_config['zone']128 # Configure the machine129 machine_type = f"zones/{zone_config['zone']}/machineTypes/{compute_config['instance_config']['machine_type']}"130 # startup_script = open(131 # os.path.join(132 # os.path.dirname(__file__), 'startup-script.sh'), 'r').read()133 # image_url = "http://storage.googleapis.com/gce-demo-input/photo.jpg"134 # image_caption = "Ready for dessert?"135 config = {136 'name': instance_name,137 'machineType': machine_type,138 # Specify the boot disk and the image to use as a source.139 'disks': [140 {141 'kind': 'compute#attachedDisk',142 'type': 'PERSISTENT',143 'boot': True,144 'mode': 'READ_WRITE',145 'autoDelete': True,146 'deviceName': compute_config['instance_config']['name'],147 'initializeParams': {148 'sourceImage': source_disk_image,149 'diskType': f"projects/{project}/zones/{zone_config['zone']}/diskTypes/{compute_config['instance_config']['disk_type']}",150 'diskSizeGb': compute_config['instance_config']['disk_size'],151 'labels': {}152 },153 "diskEncryptionKey": {}154 }155 ],156 'canIpForward': False,157 'guestAccelerators': [158 {159 'acceleratorCount': compute_config['instance_config']['number_of_gpus'],160 'acceleratorType': f"zones/{zone_config['zone']}/acceleratorTypes/{compute_config['instance_config']['gpu_type']}"161 }162 ],163 'tags': {164 "items": compute_config['instance_config']['firewall_rules']165 },166 # Specify a network interface with NAT to access the public167 # internet.168 'networkInterfaces': [{169 'kind': 'compute#networkInterface',170 'network': compute_config['instance_config']['network_interfaces']['network'],171 'accessConfigs': [172 {173 'kind': 'compute#accessConfig',174 'name': 'External NAT',175 'type': 'ONE_TO_ONE_NAT',176 'networkTier': 'PREMIUM'177 }178 ],179 'aliasIpRanges': []180 }181 ],182 'description': '',183 'labels': {},184 'scheduling': {185 'preemptible': False,186 'onHostMaintenance': 'TERMINATE',187 'automaticRestart': True,188 'nodeAffinities': []189 },190 'deletionProtection': False,191 'reservationAffinity': {192 'consumeReservationType': 'ANY_RESERVATION'193 },194 # Allow the instance to access cloud storage and logging.195 'serviceAccounts': [{196 'email': compute_config['instance_config']['identity_and_api_access']['service_account_email'],197 'scopes': [198 compute_config['instance_config']['identity_and_api_access']['scopes']199 ]200 }201 ],202 'shieldedInstanceConfig': {203 'enableSecureBoot': False,204 'enableVtpm': True,205 'enableIntegrityMonitoring': True206 },207 'confidentialInstanceConfig': {208 'enableConfidentialCompute': False209 },210 # Metadata is readable from the instance and allows you to211 # pass configuration from deployment scripts to instances.212 'metadata': {213 'kind': 'compute#metadata',214 'items': [],215 }216 }217 print(f"Creating instance {instance_name}.")218 operation = compute.instances().insert(219 project=project,220 zone=zone_config['zone'],221 body=config).execute()222 print('Waiting for operation to finish...')223 move_zones = 0224 while True:225 result = compute.zoneOperations().get(226 project=project,227 zone=zone_config['zone'],228 operation=operation['name']).execute()229 if result['status'] == 'DONE':230 print("done.")231 if 'error' in result:232 error_results = result['error']['errors']233 if error_results[0]['code'] in ('QUOTA_EXCEEDED', 'ZONE_RESOURCE_POOL_EXHAUSTED_WITH_DETAILS'):234 move_regions = 1235 print(Exception(result['error']))236 else:237 raise Exception(result['error'])238 else:239 instances += 1240 move_regions = 0241 print(f"Success: {instance_name} created")242 print(f"{instances} created, {compute_config['number_of_instances']-instances} more to create")243 instance_details = {244 "name": instance_name,245 "zone": zone_config['zone']246 }247 created_instances.append(instance_details)248 break249 if instances >= compute_config['number_of_instances']:250 print(f"Reached the desired number of instances")251 break252 elif move_regions == 1:253 print(f"Quota exceeded in region {region}, moving to next region")254 break255 if instances >= compute_config['number_of_instances']:256 break257 elif move_regions == 1:258 break259 zones_attempted += 1260 regions_attempted += 1261 if instances >= compute_config['number_of_instances']:262 break263 elif regions_attempted >= len(regions_to_try):264 print(f"All regions attempted, there are not enough resources to create the desired {compute_config['number_of_instances']} instances, {instances} created")265 break266 return(created_instances)267 time.sleep(1)268def delete_instance(compute, project, instance_details):269 instances = instance_details270 print(f"Deleting {len(instances)} instances.")271 for i in range(len(instances)):272 instance = instances[i]273 zone = instance["zone"]274 name = instance["name"]275 print(f"Deleting instance {name}.")276 operation = compute.instances().delete(277 project=project,278 zone=zone,279 instance=name).execute()280 print('Waiting for operation to finish...')281 while True:282 result = compute.zoneOperations().get(283 project=project,284 zone=zone,285 operation=operation['name']).execute()286 if result['status'] == 'DONE':287 print("done.")288 if 'error' in result:289 raise Exception(result['error'])290 break291def create_instance_test(compute, project, config, zone, requested_gpus):292 zone_list = zone293 accelerator_list = []294 for i in zone_list:295 request = compute.acceleratorTypes().list(project=project, zone=i['zone'])296 while request is not None:297 response = request.execute()298 if 'items' in response:299 for accelerator in response['items']:300 print(accelerator)301def main(gpu_config, wait=True):302 compute = googleapiclient.discovery.build('compute', 'v1')303 if gpu_config["instance_config"]["zone"]:304 print(f"Processing selected zones from {gpu_config['instance_config']['zone']}")305 zone_info = get_zone_info(compute, gpu_config["project_id"])306 compute_zones = [z for z in zone_info if z['zone'] in gpu_config['instance_config']['zone']]307 else:308 print("Processing all zones")309 compute_zones = get_zone_info(compute, gpu_config["project_id"])310 check_gpu_config(gpu_config)311 # distinct_zones = list({v['zone'] for v in compute_zones})312 available_zones = check_machine_type_and_accelerator(compute, gpu_config["project_id"], gpu_config["instance_config"]["machine_type"], gpu_config["instance_config"]["gpu_type"], compute_zones)313 accelerators = get_accelerator_quota(compute, gpu_config["project_id"], gpu_config, available_zones, gpu_config["instance_config"]["number_of_gpus"])314 available_regions = list({v['region'] for v in available_zones})315 if available_regions:316 print(f"Machine type {gpu_config['instance_config']['machine_type']} is available in the following regions: {available_regions}")317 instance_details = create_instance(compute, gpu_config["project_id"], gpu_config, accelerators)318 if wait:319 print("hit enter to delete instances")320 input()321 delete_instance(compute, gpu_config["project_id"], instance_details)322 else:323 print(f"No regions available with the instance configuration {gpu_config['instance_config']['machine_type']} machine type and {gpu_config['instance_config']['gpu_type']} GPU type")324if __name__ == '__main__':325 with open('gpu-config.json', 'r') as f:326 gpu_config = json.load(f)...
sfn-sagemakerprocessingjob.py
Source:sfn-sagemakerprocessingjob.py
1# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.2# SPDX-License-Identifier: MIT-03#4# Permission is hereby granted, free of charge, to any person obtaining a copy of this5# software and associated documentation files (the "Software"), to deal in the Software6# without restriction, including without limitation the rights to use, copy, modify,7# merge, publish, distribute, sublicense, and/or sell copies of the Software, and to8# permit persons to whom the Software is furnished to do so.9#10# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,11# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A12# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT13# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION14# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE15# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.16#17# Author - Govindhi Venkatachalapathy govindhi@amazon.com18 19import argparse20import os21import time22import boto323import datetime24import yaml25import sys26from sagemaker.network import NetworkConfig27from sagemaker.processing import Processor28from stepfunctions.inputs import ExecutionInput29from stepfunctions.steps.sagemaker import ProcessingStep30from stepfunctions.workflow import Workflow31from stepfunctions.steps import Chain32from sagemaker.processing import ProcessingOutput, ProcessingInput33from sagemaker import Session34from stepfunctions.steps.states import Choice35from stepfunctions.steps.states import Catch36from stepfunctions.steps.states import State37from stepfunctions.steps.choice_rule import ChoiceRule38 39INSTANCE_CFG_FILE = 'config.yml'40 41def getFailedState(id):42 fail_state = State(state_id=id, state_type="Fail")43 return fail_state44 45def createSFNProcessingJob():46 sfn_steps = []47 result = ''48 sec_group = None49 subnet_id = None50 INSTANCE_TYPE = "ml.c4.2xlarge"51 VOLUME_SIZE = 552 MAX_RUNTIME = 720053 54 # Create Session object55 sm_session = Session()56 57 instance_config = {}58 ecr_repo = "%s.dkr.ecr.%s.amazonaws.com" %(accountid, args.region)59 60 cntr_endpoint = "python3 -u convert_execute_notebook.py"61 cntr_image = "%s/%s" %(ecr_repo, args.cntrimage)62 s3_input = ''63 s3_output = "s3://%s" %sm_session.default_bucket()64 input = []65 cntr_arg_required = True66 container_output_path = '/opt/ml/processing/output'67 68 instance_config_file = os.path.join(args.workspace, INSTANCE_CFG_FILE)69 if os.path.exists(instance_config_file):70 with open(instance_config_file, 'r')as filerd:71 instance_config = yaml.load(filerd, Loader=yaml.FullLoader)72 print("Instance Config:%s" %instance_config)73 if 'instance_type' in instance_config:74 INSTANCE_TYPE = instance_config['instance_type']75 if 'volume_size' in instance_config:76 VOLUME_SIZE = instance_config['volume_size']77 if 'max_runtime' in instance_config:78 MAX_RUNTIME = instance_config['max_runtime']79 if 'security_groups' in instance_config:80 print("Security Config:%s" %instance_config['security_groups'])81 sec_group = instance_config['security_groups']82 if 'subnets' in instance_config:83 print("Subnets Config:%s" %instance_config['subnets'])84 subnet_id = instance_config['subnets']85 if 'container_endpoint' in instance_config:86 cntr_endpoint = instance_config['container_endpoint']87 cntr_arg_required = False88 if 's3_input' in instance_config:89 s3_input = instance_config['s3_input']90 if 's3_output' in instance_config:91 s3_output = instance_config['s3_output']92 if 'container_output' in instance_config:93 container_output_path = instance_config['container_output']94 95 nw_config = None96 if sec_group and subnet_id:97 print("There are security group %s and subnet_id %s" %(sec_group, subnet_id))98 nw_config = NetworkConfig(security_group_ids=sec_group, subnets=subnet_id )99 100 print("Network Config:%s" %nw_config)101 sagemaker_role = "arn:aws:iam::%s:role/sagemaker-role" %accountid102 processor_object = Processor(role=sagemaker_role,103 image_uri=cntr_image,104 instance_count=1,105 instance_type=INSTANCE_TYPE,106 volume_size_in_gb=VOLUME_SIZE,107 max_runtime_in_seconds=MAX_RUNTIME,108 sagemaker_session=sm_session,109 network_config = nw_config110 )111 print(processor_object)112 113 # Get the default bucket for sagemaker114 output = ProcessingOutput(source=container_output_path, destination=s3_output, output_name='output_data', s3_upload_mode="Continuous")115 if s3_input:116 procinput = ProcessingInput(source=s3_input, destination="/opt/ml/processing/input", input_name="input_data")117 input = [procinput]118 119 # Create steps - ProcessingSteps120 # Get the list of notebooks from workspace121 print(args.workspace)122 notebook_workspace = os.path.join(args.workspace, 'src/notebooks')123 print(notebook_workspace)124 125 notebooks_list = []126 try:127 for nbfile in os.listdir(notebook_workspace):128 if nbfile.endswith(".ipynb"):129 notebooks_list.append(nbfile)130 notebooks_list = sorted(notebooks_list)131 except Exception as e:132 # May be notebooks not present133 notebooks_list = ["test.ipynb"]134 i = 0135 workflow_input = {}136 workflow_tags = [137 {'key': 'Application', 'value': 'SFN-Sagemaker'}]138 139 for nbfile in notebooks_list:140 job_name = os.path.splitext(nbfile)[0]141 currentDT = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")142 processing_job_name = "%s-%s" %(job_name, currentDT)143 print(processing_job_name)144 job_id = i+1145 input_name = "PreprocessingJobName%s" %str(job_id) 146 execution_input = ExecutionInput(147 schema={148 input_name: processing_job_name149 }150 )151 workflow_input[input_name] = processing_job_name152 153 print("Input name:%s" %input_name)154 print(execution_input)155 if cntr_arg_required:156 cntr_arg = '-n %s' %nbfile157 cntr_arg_list = cntr_arg.split(' ')158 processing_step = ProcessingStep(state_id=job_name,159 processor=processor_object,160 job_name=execution_input[input_name],161 inputs=input,162 outputs=[output],163 container_arguments=cntr_arg_list,164 container_entrypoint=cntr_endpoint.split(' ')165 )166 else:167 processing_step = ProcessingStep(state_id=job_name,168 processor=processor_object,169 job_name=execution_input[input_name],170 inputs=input,171 outputs=[output],172 container_entrypoint=cntr_endpoint.split(' ')173 ) 174 i=i+1175 print(processing_step)176 # Goto next state when the current state is completed.177 if i < len(notebooks_list)-1:178 step_state = Choice(job_name)179 step_state.add_choice(rule=ChoiceRule.StringEquals(variable=processing_step.output()["ProcessingJobStatus"], value="Completed"),\180 next_step=notebooks_list[i])181 182 catch_state_processing = Catch(error_equals=['States.TaskFailed'],183 next_step=getFailedState("%s-fail" %job_name)184 )185 processing_step.add_catch(catch_state_processing)186 sfn_steps.append(processing_step)187 188 # Create Chain of steps189 workflow_graph = Chain(sfn_steps)190 print(workflow_graph)191 # Create Workflow object192 print(workflow_input)193 workflow_execution_input = ExecutionInput(194 schema=workflow_input195 )196 workflow_name = args.workflowname197 if not args.workflowname:198 workflow_name = "sfn-sm-workflow"199 workflow = Workflow(name=workflow_name,200 definition=workflow_graph,201 role='arn:aws:iam::%s:role/stepfunctions-role' %accountid,202 client=None,203 tags=workflow_tags)204 print(workflow)205 print(workflow.definition.to_json(pretty=True))206 try:207 print("Deleting the workflow:%s" %workflow_name)208 # workflow.delete()209 sm_arn = "arn:aws:states:us-east-1:%s:stateMachine:%s" %(accountid, workflow_name)210 client = boto3.client('stepfunctions')211 client.delete_state_machine(stateMachineArn=sm_arn)212 CURTIME = 0213 MAX_TIME_FOR_DELETION = 600 # 10 mins214 print("Please wait while the existing state machine is getting deleted...")215 while CURTIME <= MAX_TIME_FOR_DELETION:216 response = client.describe_state_machine(stateMachineArn=sm_arn)217 if response['status'] == 'DELETING':218 CURTIME = CURTIME+30219 time.sleep(30)220 else:221 break222 # Addition wait - Give some time for complete deletion223 time.sleep(60)224 print("Deleted the workflow(%s) successfully" %workflow_name)225 except Exception as e:226 print("Probably the Statemachine %s has been deleted:%s" %(workflow_name, e))227 # Ignore if deletion fails..may be workflow does not exist (already deleted)228 pass229 state_machine_arn = workflow.create()230 result = 'Workflow %s created' %state_machine_arn231 232 if args.execute:233 # Execute workflow234 execution = workflow.execute(inputs=workflow_input)235 print(execution)236 time.sleep(120)237 execution_output = execution.get_output(wait=True)238 print(execution_output)239 if execution_output:240 result = execution_output.get("ProcessingJobStatus")241 else:242 result = "Failure in execution of step functions. Please check the console for details"243 print(result)244 return result245 246def file_exist_valid(cfile):247 if not os.path.exists(cfile):248 raise argparse.ArgumentTypeError("{0} does not exist".format(cfile))249 return cfile250 251if __name__ == '__main__':252 parser = parser = argparse.ArgumentParser(description='Orechestrator for deploying ETL Notebooks using Step Functions and Sagemaker Processing Job')253 parser.add_argument('-w', '--workspace', required=True, help='Provide the workspace dir where the notebooks are located') 254 parser.add_argument('-f', '--workflowname', required=False, help='Provide a workflow name. If not provided, default will be provided')255 parser.add_argument('-k', '--accesskey', required=False, help='AWS Access Key Id')256 parser.add_argument('-s', '--secretaccess', required=False, help='AWS Secret Access Key')257 parser.add_argument('-r', '--region', required=False, help='AWS Region')258 parser.add_argument('-e', '--execute', action="store_true", help="Use this option to execute the step functions workflow after creation.\259 Do not specify this option if workflow has to be just created.")260 parser.add_argument('-i', '--cntrimage', required=True, help='Container image to be used in Sagemaker Processing job')261 args = parser.parse_args()262 263 # Set the env variables for AWS Config264 if not args.accesskey:265 args.accesskey = os.environ["AWS_ACCESS_KEY_ID"]266 if not args.secretaccess:267 args.secretaccess = os.environ["AWS_SECRET_ACCESS_KEY"]268 if not args.region:269 args.region = os.environ["AWS_DEFAULT_REGION"]270 271 client = boto3.client('sts')272 accountid = client.get_caller_identity()["Account"]273 response = createSFNProcessingJob()274 print(response)275 ...
instance_config_test.py
Source:instance_config_test.py
1#!/usr/bin/python2# Copyright 2016 Google Inc. All Rights Reserved.3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15"""Unittest for instance_config.py module."""16from google_compute_engine.instance_setup import instance_config17from google_compute_engine.test_compat import mock18from google_compute_engine.test_compat import unittest19class InstanceConfigTest(unittest.TestCase):20 def setUp(self):21 instance_config.InstanceConfig.instance_config = 'config'22 instance_config.InstanceConfig.instance_config_distro = 'distro'23 instance_config.InstanceConfig.instance_config_template = 'template'24 instance_config.InstanceConfig.instance_config_script = '/tmp/test.py'25 instance_config.InstanceConfig.instance_config_header = '%s %s'26 instance_config.InstanceConfig.instance_config_options = {27 'third': {28 'e': '3',29 'c': '1',30 'd': '2',31 },32 'first': {33 'a': 'false',34 },35 'second': {36 'b': 'true',37 },38 }39 @mock.patch('google_compute_engine.instance_setup.instance_config.os.path.exists')40 @mock.patch('google_compute_engine.instance_setup.instance_config.config_manager.ConfigManager.SetOption')41 @mock.patch('google_compute_engine.instance_setup.instance_config.config_manager.ConfigManager.__init__')42 def testInstanceConfig(self, mock_init, mock_set, mock_exists):43 mocks = mock.Mock()44 mocks.attach_mock(mock_init, 'init')45 mocks.attach_mock(mock_set, 'set')46 mocks.attach_mock(mock_exists, 'exists')47 mock_exists.return_value = False48 instance_config.InstanceConfig()49 expected_calls = [50 mock.call.init(51 config_file='template', config_header='/tmp/test.py template'),52 mock.call.exists('config'),53 mock.call.exists('distro'),54 mock.call.set('first', 'a', 'false', overwrite=False),55 mock.call.set('second', 'b', 'true', overwrite=False),56 mock.call.set('third', 'c', '1', overwrite=False),57 mock.call.set('third', 'd', '2', overwrite=False),58 mock.call.set('third', 'e', '3', overwrite=False),59 ]60 self.assertEqual(mocks.mock_calls, expected_calls)61 @mock.patch('google_compute_engine.instance_setup.instance_config.os.path.exists')62 @mock.patch('google_compute_engine.instance_setup.instance_config.parser')63 @mock.patch('google_compute_engine.instance_setup.instance_config.config_manager.ConfigManager.SetOption')64 @mock.patch('google_compute_engine.instance_setup.instance_config.config_manager.ConfigManager.__init__')65 def testInstanceConfigExists(self, mock_init, mock_set, mock_parser, mock_exists):66 mock_config = mock.create_autospec(instance_config.parser.SafeConfigParser)67 mock_config.read = mock.Mock()68 mock_config.sections = mock.Mock()69 mock_config.sections.return_value = ['a', 'b']70 mock_config.items = lambda key: {'key: %s' % key: 'value: %s' % key}71 mock_parser.SafeConfigParser.return_value = mock_config72 mocks = mock.Mock()73 mocks.attach_mock(mock_init, 'init')74 mocks.attach_mock(mock_set, 'set')75 mocks.attach_mock(mock_parser, 'parser')76 mocks.attach_mock(mock_exists, 'exists')77 mock_exists.return_value = True78 instance_config.InstanceConfig()79 expected_calls = [80 mock.call.init(config_file='template', config_header='/tmp/test.py template'),81 mock.call.exists('config'),82 mock.call.parser.SafeConfigParser(),83 mock.call.parser.SafeConfigParser().read('config'),84 mock.call.parser.SafeConfigParser().sections(),85 mock.call.exists('distro'),86 mock.call.parser.SafeConfigParser(),87 mock.call.parser.SafeConfigParser().read('distro'),88 mock.call.parser.SafeConfigParser().sections(),89 mock.call.set('a', 'key: a', 'value: a', overwrite=False),90 mock.call.set('b', 'key: b', 'value: b', overwrite=False),91 mock.call.set('a', 'key: a', 'value: a', overwrite=False),92 mock.call.set('b', 'key: b', 'value: b', overwrite=False),93 mock.call.set('first', 'a', 'false', overwrite=False),94 mock.call.set('second', 'b', 'true', overwrite=False),95 mock.call.set('third', 'c', '1', overwrite=False),96 mock.call.set('third', 'd', '2', overwrite=False),97 mock.call.set('third', 'e', '3', overwrite=False)98 ]99 self.assertEqual(mocks.mock_calls, expected_calls)100 @mock.patch('google_compute_engine.instance_setup.instance_config.config_manager.ConfigManager.WriteConfig')101 def testWriteConfig(self, mock_write):102 mock_config = instance_config.InstanceConfig()103 instance_config.InstanceConfig.WriteConfig(mock_config)104 mock_write.assert_called_once_with(config_file='config')105if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!