Best Python code snippet using localstack_python
Worker.py
Source:Worker.py
1#!/usr/bin/env python32import boto33import time4from botocore.exceptions import ClientError5from distutils.util import strtobool6from SSMDelegate import SSMDelegate7import botocore8import Utils9import logging10import yaml11from redo import retriable, retry #https://github.com/mozilla-releng/redo12__author__ = "Gary Silverman"13logger = logging.getLogger('Orchestrator') # The Module Name14class Worker(object):15 SNS_SUBJECT_PREFIX_WARNING = "Warning:"16 SNS_SUBJECT_PREFIX_INFORMATIONAL = "Info:"17 def __init__(self, workloadRegion, instance, ec2_client, dryRunFlag, snsInit):18 self.workloadRegion = workloadRegion19 self.instance = instance20 self.dryRunFlag = dryRunFlag21 self.ec2_client = ec2_client22 self.snsInit = snsInit23 self.instanceStateMap = {24 "pending": 0,25 "running": 16,26 "shutting-down": 32,27 "terminated": 48,28 "stopping": 64,29 "stopped": 8030 }31 try:32 self.ec2Resource = boto3.resource('ec2', region_name=self.workloadRegion)33 except Exception as e:34 msg = 'Worker::__init__() Exception obtaining botot3 ec2 resource in region %s -->' % workloadRegion35 logger.error(msg + str(e))36class StartWorker(Worker):37 def __init__(self, ddbRegion, workloadRegion, instance, all_elbs, elb, scalingInstanceDelay, dryRunFlag, ec2_client, snsInit):38 super(StartWorker, self).__init__(workloadRegion, instance, ec2_client, dryRunFlag, snsInit)39 self.ddbRegion = ddbRegion40 self.all_elbs = all_elbs41 self.elb = elb42 self.scalingInstanceDelay = scalingInstanceDelay43 @retriable(attempts=5, sleeptime=0, jitter=0)44 def addressELBRegistration(self):45 for i in self.all_elbs['LoadBalancerDescriptions']:46 for j in i['Instances']:47 if j['InstanceId'] == self.instance.id:48 elb_name = i['LoadBalancerName']49 logger.info("Instance %s is attached to ELB %s, and will be deregistered and re-registered" % (self.instance.id, elb_name))50 success_deregister_done = 051 while (success_deregister_done == 0):52 try:53 self.elb.deregister_instances_from_load_balancer(LoadBalancerName=elb_name, Instances=[{'InstanceId': self.instance.id}])54 logger.debug("Succesfully deregistered instance %s from load balancer %s" % (self.instance.id, elb_name))55 success_deregister_done=156 except Exception as e:57 logger.warning('Worker::addressELBRegistration()::deregister_instances_from_load_balancer() encountered an exception of -->' + str(e))58 raise e59 success_register_done=060 while (success_register_done == 0):61 try:62 self.elb.register_instances_with_load_balancer(LoadBalancerName=elb_name,Instances=[{'InstanceId': self.instance.id}])63 logger.debug('Succesfully registered instance %s to load balancer %s' % (self.instance.id, elb_name))64 success_register_done=165 except Exception as e:66 logger.warning('Worker::addressELBRegistration()::register_instances_with_load_balancer() encountered an exception of -->' + str(e))67 raise e68 def startInstance(self):69 result = 'Instance not started'70 if (self.dryRunFlag):71 logger.warning('DryRun Flag is set - instance will not be started')72 else:73 if self.all_elbs != "0":74 logger.debug('addressELBRegistration() for %s' % self.instance.id)75 try:76 self.addressELBRegistration()77 except Exception as e:78 self.snsInit.sendSns("Worker::addressELBRegistration() has encountered an exception", str(e))79 logger.debug('Starting EC2 instance: %s' % self.instance.id)80 try:81 result = retry(self.instance.start, attempts = 5, sleeptime=0, jitter=0)82 logger.debug('Starting EC2 instance: %s' % self.instance.id)83 logger.info('startInstance() for ' + self.instance.id + ' result is %s' % result)84 except Exception as e:85 msg = 'Worker::instance.start() Exception encountered during instance start ---> %s' % e86 logger.error(msg)87 self.snsInit.sendSns("Exception - EC2 instance start::Worker::instance.start() Exception encountered during instance start",str(e))88 def scaleInstance(self, modifiedInstanceType):89 instanceState = self.instance.state90 if (instanceState['Name'] == 'stopped'):91 result = 'no result'92 if (self.dryRunFlag):93 logger.warning('DryRun Flag is set - instance will not be scaled')94 else:95 logger.info(96 'Instance [%s] will be scaled to Instance Type [%s]' % (self.instance.id, modifiedInstanceType))97 # Extract whole instance type from DDB into a list98 modifiedInstanceTypeList = modifiedInstanceType.split('.')99 targetInstanceFamily = modifiedInstanceTypeList[0]100 # EC2.Instance.modify_attribute()101 # Check and exclude non-optimized instance families. Should be enhanced to be a map. Currently added as hotfix.102 preventEbsOptimizedList = ['t2']103 if (targetInstanceFamily in preventEbsOptimizedList):104 ebsOptimizedAttr = False105 # Check T2 Unlimited106 self.t2Unlimited(modifiedInstanceTypeList)107 else:108 ebsOptimizedAttr = self.instance.ebs_optimized # May have been set to True or False previously109 modifiedInstanceTypeValue = modifiedInstanceTypeList[0] + '.' + modifiedInstanceTypeList[1]110 InstanceTypeKwargs = {"InstanceType":{'Value': modifiedInstanceTypeValue}}111 try:112 result = retry(self.instance.modify_attribute,attempts=5,jitter=0,sleeptime=0,kwargs=InstanceTypeKwargs)113 except Exception as e:114 msg = 'Worker::instance.modify_attribute().modifiedInstanceTypeValue Exception for EC2 instance %s , error --> %s ' % (self.instance, str(e))115 logger.warning(msg)116 self.snsInit.sendSns("Worker::instance.modify_attribute().modifiedInstanceTypeValue has encountered an exception",str(e))117 try:118 result = retry(self.compareInstanceTypeValues, attempts = 8, sleeptime=15, jitter=0,args=(modifiedInstanceType,))119 except Exception as e:120 msg = 'Worker::compareInstanceTypeValues() Exception for instance %s, error --> currentInstanceTypeValue does not match modifiedInstanceTypeValue' % (self.instance.id)121 logger.warning(msg)122 tagsYaml = self.retrieveTags()123 snsSubject = 'Worker::compareInstanceTypeValues() has encountered an exception for instance %s' % (self.instance.id)124 snsMessage = '%s\n%s' % (str(e), str(tagsYaml))125 self.snsInit.sendSns(snsSubject, snsMessage)126 127 EbsOptimizeKwargs = {"EbsOptimized":{'Value': ebsOptimizedAttr}}128 try:129 result = retry(self.instance.modify_attribute,attempts=5,sleeptime=0,jitter=0,kwargs=EbsOptimizeKwargs)130 except Exception as e:131 msg = 'Worker::instance.modify_attribute().ebsOptimizedAttr Exception for EC2 instance %s, error --> %s' % (self.instance, str(e))132 logger.warning(msg)133 self.snsInit.sendSns("Worker::instance.modify_attribute().modifiedInstanceTypeValue has encountered an exception",str(e))134 # It appears the start instance reads 'modify_attribute' changes as eventually consistent in AWS (assume DynamoDB),135 # this can cause an issue on instance type change, whereby the LaunchPlan generates an exception.136 # To mitigate against this, we will introduce a one second sleep delay after modifying an attribute137 time.sleep(self.scalingInstanceDelay)138 logger.info('scaleInstance() for ' + self.instance.id + ' result is %s' % result)139 else:140 logMsg = 'scaleInstance() requested to change instance type for non-stopped instance ' + self.instance.id + ' no action taken'141 logger.warning(logMsg)142 def compareInstanceTypeValues(self, modifiedInstanceTypeValue):143 currentInstanceTypeValue = list(self.ec2_client.describe_instance_attribute(InstanceId=self.instance.id, Attribute='instanceType')['InstanceType'].values())144 logger.info('compareInstanceTypeValues() - Comparing Instance Type Values of %s' % (self.instance.id))145 logger.info('compareInstanceTypeValues() - modifiedInstanceTypeValue : %s ' % (modifiedInstanceTypeValue))146 logger.info('compareInstanceTypeValues() - currentInstanceTypeValue : %s ' % (currentInstanceTypeValue))147 if(currentInstanceTypeValue[0] not in modifiedInstanceTypeValue):148 raise ValueError("Current instance type value does not match modified instance type value.")149 def retrieveTags(self):150 ##Return EC2 Tags of given instance in YAML format.151 tags = self.ec2_client.describe_instances(InstanceIds=[self.instance.id])['Reservations'][0]['Instances'][0]['Tags']152 tagsYaml = yaml.safe_dump(tags, explicit_start=True, default_flow_style=False)153 logger.info('retrieveTags(): Retrieve and return instance EC2 tags in YAML format.')154 logger.debug('retrieveTags(): EC2 tag details for %s' % self.instance.id)155 logger.debug('retrieveTags(): %s' % tagsYaml)156 return tagsYaml157 def t2Unlimited(self, modifiedInstanceList):158 if len(modifiedInstanceList) == 3:159 self.modifiedInstanceFlag = modifiedInstanceList[2]160 logger.info('t2Unlimited(): Checking if T2 Unlimited flag is specified in DynamoDB')161 logger.debug('t2Unlimited(): t2_unlimited_flag: %s' % self.modifiedInstanceFlag)162 if (self.modifiedInstanceFlag == "u") or (self.modifiedInstanceFlag == "U"):163 logger.debug('t2Unlimited(): Found T2 Flag in DynamoDB: %s' % self.modifiedInstanceFlag)164 try:165 self.checkT2Unlimited()166 except Exception as e:167 self.snsInit.sendSns("Worker::describe_instance_credit_specifications() has encountered an exception" ,str(e))168 if self.current_t2_value == "standard":169 try:170 logger.info('t2Unlimited(): Trying to modify EC2 instance credit specification')171 result = retry(self.ec2_client.modify_instance_credit_specification,attempts=5, sleeptime=0,jitter=0,kwargs= {"InstanceCreditSpecifications":[{'InstanceId': self.instance.id,'CpuCredits': 'unlimited'}]} )172 logger.info('t2Unlimited(): EC2 instance credit specification modified')173 except Exception as e:174 msg = 'Worker::instance.modify_attribute().t2Unlimited Exception for EC2 instance %s, error --> %s' % (self.instance, str(e))175 logger.warning(msg)176 self.snsInit.sendSns("Worker::checkT2Unlimited::instance.modify_attribute().t2Unlimited has encountered an exception",str(e))177 else:178 try:179 self.checkT2Unlimited()180 except Exception as e:181 self.snsInit.sendSns("Worker::checkT2Unlimited::describe_instance_credit_specifications() Exception has encountered an exception",str(e))182 if self.current_t2_value == "unlimited":183 logger.debug('t2Unlimited(): Current T2 value: %s' % self.current_t2_value)184 try:185 logger.info('t2Unlimited(): Trying to modify EC2 instance credit specification')186 result = retry(self.ec2_client.modify_instance_credit_specification, attempts=5, sleeptime=0, jitter=0,187 kwargs= {"InstanceCreditSpecifications":[{'InstanceId': self.instance.id,'CpuCredits': 'standard'}]} )188 logger.info('t2Unlimited(): EC2 instance credit specification modified')189 except Exception as e:190 msg = 'Worker::instance.modify_attribute().t2standard Exception in progress for EC2 instance %s, error --> %s' % (self.instance, str(e))191 logger.warning(msg)192 self.snsInit.sendSns("Worker::instance.modify_attribute().t2standard Exception has encountered an exception",str(e))193 @retriable(attempts=5,sleeptime=0, jitter=0)194 def checkT2Unlimited(self):195 try:196 logger.info('check_t2_unlimited(): Checking current T2 value')197 self.current_t2_value = self.ec2_client.describe_instance_credit_specifications(InstanceIds=[self.instance.id,])['InstanceCreditSpecifications'][0]['CpuCredits']198 except Exception as e:199 msg = 'Worker::describe_instance_credit_specifications() exception for EC2 instance %s, error --> %s' % (self.instance, str(e))200 logger.warning(msg)201 raise e202 return self.current_t2_value203 def start(self):204 self.startInstance()205class StopWorker(Worker):206 def __init__(self, ddbRegion, workloadRegion, instance, dryRunFlag, ec2_client, snsInit):207 super(StopWorker, self).__init__(workloadRegion, instance, ec2_client, dryRunFlag, snsInit)208 self.ddbRegion = ddbRegion209 # MUST convert string False to boolean False210 self.waitFlag = strtobool('False')211 self.overrideFlag = strtobool('False')212 self.snsInit = snsInit213 def stopInstance(self):214 logger.debug('Worker::stopInstance() called')215 result = 'Instance not Stopped'216 if (self.dryRunFlag):217 logger.warning('DryRun Flag is set - instance will not be stopped')218 else:219 try:220 result = retry(self.instance.stop, attempts=5, sleeptime=0, jitter=0)221 logger.debug("Succesfully stopped EC2 instance %s" % (self.instance.id))222 logger.info('stopInstance() for ' + self.instance.id + ' result is %s' % result)223 except Exception as e:224 msg = 'Worker::instance.stop() Exception occured for EC2 instance %s, error --> %s' % (self.instance, str(e))225 logger.warning(msg)226 self.snsInit.sendSns("Exception instance.stopInstance() Exception encountered during instance stop",str(e))227 # If configured, wait for the stop to complete prior to returning228 logger.debug('The bool value of self.waitFlag %s, is %s' % (self.waitFlag, bool(self.waitFlag)))229 # self.waitFlag has been converted from str to boolean via set method230 if (self.waitFlag):231 logger.info(self.instance.id + ' :Waiting for Stop to complete...')232 if (self.dryRunFlag):233 logger.warning('DryRun Flag is set - waiter() will not be employed')234 else:235 try:236 # Need the Client to get the Waiter237 ec2Client = self.ec2Resource.meta.client238 waiter = ec2Client.get_waiter('instance_stopped')239 # Waits for 40 15 second increments (e.g. up to 10 minutes)240 waiter.wait()241 except Exception as e:242 logger.warning('Worker:: waiter block encountered an exception of -->' + str(e))243 else:244 logger.info(self.instance.id + ' Wait for Stop to complete was not requested')245 def setWaitFlag(self, flag):246 # MUST convert string False to boolean False247 self.waitFlag = strtobool(flag)248 def getWaitFlag(self):249 return (self.waitFlag)250 def isOverrideFlagSet(self, S3BucketName, S3KeyPrefixName, overrideFileName, osType):251 ''' Use SSM to check for existence of the override file in the guest OS. If exists, don't Stop instance but log.252 Returning 'True' means the instance will not be stopped.253 Either because the override file exists, or the instance couldn't be reached254 Returning 'False' means the instance will be stopped (e.g. not overridden)255 '''256 # If there is no overrideFilename specified, we need to return False. This is required because the257 # remote evaluation script may evaluate to "Bypass" with a null string for the override file. Keep in258 # mind, DynamodDB isn't going to enforce an override filename be set in the directive.259 noSSM = ""260 if (S3BucketName == noSSM) or (S3KeyPrefixName == noSSM) or (not overrideFileName):261 logger.info(262 self.instance.id + ' Override Flag not set in specification. Therefore this instance will be actioned. ')263 return False264 if not osType:265 logger.info(266 self.instance.id + ' Override Flag set BUT no Operating System attribute in specification. Therefore this instance will be actioned.')267 return False268 # Create the delegate269 ssmDelegate = SSMDelegate(self.instance.id, S3BucketName, S3KeyPrefixName, overrideFileName, osType,270 self.ddbRegion, logger, self.workloadRegion)271 # Very first thing to check is whether SSM is going to write the output to the S3 bucket.272 # If the bucket is not in the same region as where the instances are running, then SSM doesn't write273 # the result to the bucket and thus the rest of this method is completely meaningless to run.274 if (ssmDelegate.isS3BucketInWorkloadRegion() == SSMDelegate.S3_BUCKET_IN_CORRECT_REGION):275 warningMsg = ''276 msg = ''277 # Send request via SSM, and check if send was successful278 ssmSendResult = ssmDelegate.sendSSMCommand()279 if (ssmSendResult):280 # Have delegate advise if override file was set on instance. If so, the instance is not to be stopped.281 overrideRes = ssmDelegate.retrieveSSMResults(ssmSendResult)282 logger.debug('SSMDelegate retrieveSSMResults() results :' + overrideRes)283 if (overrideRes == SSMDelegate.S3_BUCKET_IN_WRONG_REGION):284 # Per SSM, the bucket must be in the same region as the target instance, otherwise the results will not be writte to S3 and cannot be obtained.285 self.overrideFlag = True286 warningMsg = Worker.SNS_SUBJECT_PREFIX_WARNING + ' ' + self.instance.id + ' Instance will be not be stopped because the S3 bucket is not in the same region as the workload'287 logger.warning(warningMsg)288 elif (overrideRes == SSMDelegate.DECISION_STOP_INSTANCE):289 # There is a result and it specifies it is ok to Stop290 self.overrideFlag = False291 logger.info(self.instance.id + ' Instance will be stopped')292 elif (overrideRes == SSMDelegate.DECISION_NO_ACTION_UNEXPECTED_RESULT):293 # Unexpected SSM Result, see log file. Will default to overrideFlag==true out of abundance for caution294 self.overrideFlag = True295 warningMsg = Worker.SNS_SUBJECT_PREFIX_WARNING + ' ' + self.instance.id + ' Instance will be not be stopped as there was an unexpected SSM result.'296 logger.warning(warningMsg)297 elif (overrideRes == SSMDelegate.DECISION_RETRIES_EXCEEDED):298 self.overrideFlag = True299 warningMsg = Worker.SNS_SUBJECT_PREFIX_WARNING + ' ' + self.instance.id + ' Instance will be not be stopped # retries to collect SSM result from S3 was exceeded'300 logger.warning(warningMsg)301 else:302 # Every other result means the instance will be bypassed (e.g. not stopped)303 self.overrideFlag = True304 msg = Worker.SNS_SUBJECT_PREFIX_INFORMATIONAL + ' ' + self.instance.id + ' Instance will be not be stopped because override file was set'305 logger.info(msg)306 else:307 self.overrideFlag = True308 warningMsg = Worker.SNS_SUBJECT_PREFIX_WARNING + ' ' + self.instance.id + ' Instance will be not be stopped because SSM could not query it'309 logger.warning(warningMsg)310 else:311 self.overrideFlag = True312 warningMsg = Worker.SNS_SUBJECT_PREFIX_WARNING + ' SSM will not be executed as S3 bucket is not in the same region as the workload. [' + self.instance.id + '] Instance will be not be stopped'313 logger.warning(warningMsg)314 if (self.overrideFlag == True):315 if (warningMsg):316 self.snsInit.sendSns(Worker.SNS_SUBJECT_PREFIX_WARNING, warningMsg)317 else:318 self.snsInit.sendSns(Worker.SNS_SUBJECT_PREFIX_INFORMATIONAL, warningMsg)319 return (self.overrideFlag)320 def setOverrideFlagSet(self, overrideFlag):321 self.overrideFlag = strtobool(overrideFlag)322 def execute(self, S3BucketName, S3KeyPrefixName, overrideFileName, osType):323 if (self.isOverrideFlagSet(S3BucketName, S3KeyPrefixName, overrideFileName, osType) == False):...
ec2_janitor.py
Source:ec2_janitor.py
...146 self.slack.send_message('\r')147 def disable_cpu_bursting(self):148 for reservation in self.ec2_instances["Reservations"]:149 for instance in reservation["Instances"]:150 self.ec2.modify_instance_credit_specification(InstanceCreditSpecifications=[{'InstanceId': instance['InstanceId'], 'CpuCredits': 'standard'}])151if __name__ == "__main__":152 parser = argparse.ArgumentParser(description='ec2_janitor.py --region us-west-2 --tag-owner Training --apply --action stop')153 parser.add_argument("--region", required=True,154 dest='region', default='ca-central-1')155 parser.add_argument("--aws_account", required=False,156 dest='aws_account', default='avitraining')157 parser.add_argument("--tag-owner", required=False,158 dest='tag_owner', help='Owner Tag Search mask: *tag_owner')159 parser.add_argument("--tag-key", required=False,160 dest='tag_key', help='Custom Tag Search')161 parser.add_argument("--tag-value", required=False,162 dest='tag_value', help='Custom Tag Search')163 parser.add_argument("--lab-timezone", required=False,164 dest='lab_timezone', help='Supported lab timezones: EST, PST, GMT, SGT')...
lambda_function.py
Source:lambda_function.py
...30 Statistics=['Average']31 )32 met = response['Datapoints'][0]['Average']33 if met < THRESHOLD:34 modify = client.modify_instance_credit_specification(35 DryRun=False,36 InstanceCreditSpecifications=[37 {38 'InstanceId': INSTANCE_ID,39 'CpuCredits': 'unlimited'40 },41 ]42 )43 print "changed to unlimited"44 if credit_type == 'unlimited' and met > THRESHOLD:45 modify = client.modify_instance_credit_specification(46 DryRun=False,47 InstanceCreditSpecifications=[48 {49 'InstanceId': INSTANCE_ID,50 'CpuCredits': 'standard'51 },52 ]53 )54 print "changed to standard"...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!