Best Python code snippet using autotest_python
tail_stack_events.py
Source:tail_stack_events.py
1#!/usr/bin/env python2"""Usage:3 tail_stack_events.py [--follow] [--number=<n>] [--depth=<d>] [--max-column-length=<x>] [--profile=<profile>] <stack>4 tail_stack_events.py [--postmortem] [--find-last-failure] [--show-all-failures] [--max-column-length=<x>] [--profile=<profile>] <stack>5Options:6 -f --follow Follow the stack events and output new ones as they are received.7 -p <p> --profile=<profile> The aws profile to use.8 -n <n> --number=<n> The number of lines to display. [default: 10]9 -d <d> --depth=<d> The maximum depth to get events for. Use -1 for unlimited depth. [default: 2]10 -m --postmortem Find the failures in the last stack update.11 --find-last-failure Search for the last rollback and show the failures from that update.12 By default --postmortem only looks at the latest stack update.13 -x <x> --max-column-length=<x> The maximum column length for tabular output.14 Defaults to 200 for postmortem, 40 otherwise.15 --show-all-failures Show all failures for the stack update, not just the one that caused the rollback.16 <stack> The top-level stack to get events for.17"""18import collections19import logging20import math21import sys22import time23import traceback24import eventlet25eventlet.monkey_patch()26import ansiwrap27import botocore.exceptions28import boto329import colorama30import docopt31import eventlet.greenpool32import tenacity33STACK_TYPE = "AWS::CloudFormation::Stack"34ELLIPSIS = u"\u2026"35LOG = logging.getLogger(__name__)36def retry(func):37 tretry = tenacity.retry(38 wait=(tenacity.wait_random_exponential(multiplier=1, min=0.1, max=10)),39 after=tenacity.after_log(LOG, logging.WARNING),40 )41 def log_exc(*a, **k):42 try:43 return func(*a, **k)44 except Exception:45 LOG.exception("Exception calling %r" % (func,))46 raise47 return tretry(log_exc)48class Column(object):49 def __init__(self, mvl, ml):50 self.max_value_length = mvl51 self.max_length = ml52@retry53def get_stack(stack_name_or_arn):54 cf = boto3.resource("cloudformation")55 stack = cf.Stack(stack_name_or_arn)56 # Switch to the ARN if a stack name was passed in57 if stack.stack_id != stack_name_or_arn:58 stack = cf.Stack(stack.stack_id)59 return stack60# TODO: make the retries per-api-call rather than on this function to reduce repeated API calls61@retry62def get_nested_stacks(stack_name_or_arn, depth=None, status_check=None):63 stack = get_stack(stack_name_or_arn)64 stacks = {stack.stack_id: stack}65 if depth == 0:66 return stacks67 for sub in stack.resource_summaries.all():68 if sub.resource_type != STACK_TYPE:69 continue70 if status_check is not None and status_check(sub.resource_status):71 stacks.update(72 get_nested_stacks(73 sub.physical_resource_id,74 depth - 1 if depth is not None else None,75 status_check,76 )77 )78 return stacks79def get_stack_events(stack, limit=5, _mem={}):80 # TODO: preload _mem with the timestamp of the first event in the parent stack that caused us to add the stack81 pages = stack.events.pages()82 events = next_page(pages)83 if not events:84 if stack.stack_id in _mem:85 del _mem[stack.stack_id]86 return []87 events.sort(key=lambda e: e.timestamp)88 if stack.stack_id in _mem:89 while events[0].timestamp > _mem[stack.stack_id]:90 more = list(next(pages))91 if not more:92 break93 more.sort(key=lambda e: e.timestamp)94 events = more + events95 _mem[stack.stack_id] = events[-1].timestamp96 if stack.stack_id not in _mem:97 return events[-limit:]98 return events99def get_events(stacks, limit=5):100 all_events = []101 pool = eventlet.greenpool.GreenPool(5)102 remove_stacks = set()103 for stack_id, events in zip(104 list(stacks.keys()),105 pool.starmap(get_stack_events, ((s, limit) for s in list(stacks.values()))),106 ):107 if not events:108 remove_stacks.add(stack_id)109 continue110 all_events.extend(events)111 for stack_id in remove_stacks:112 del stacks[stack_id]113 all_events.sort(key=lambda e: e.timestamp)114 return all_events115def update_columns(columns, events):116 for event in events:117 for column in columns.keys():118 columns[column].max_value_length = max(119 columns[column].max_value_length, len(str(getattr(event, column)))120 )121def format_column(column_name, column, value):122 text = str(value)123 if column_name == "resource_status":124 uvalue = value.upper()125 if "FAIL" in uvalue:126 color = colorama.Fore.RED127 elif "ROLLBACK" in uvalue:128 color = colorama.Fore.YELLOW129 elif "IN_PROGRESS" in uvalue:130 color = colorama.Fore.BLUE131 elif uvalue == "DELETE_COMPLETE":132 color = colorama.Fore.LIGHTBLACK_EX133 elif "COMPLETE" in uvalue:134 color = colorama.Fore.GREEN135 else:136 color = colorama.Fore.WHITE137 text = "%s%s%s" % (color, value, colorama.Style.RESET_ALL)138 if ansiwrap.ansilen(text) > column.max_length:139 half_length = (column.max_length - 1) / 2.0140 output_text = "%s%s%s" % (141 text[: int(math.floor(half_length))],142 ELLIPSIS,143 text[-int(math.ceil(half_length)) :],144 )145 else:146 output_text = text147 padding = " " * max(148 0,149 min(column.max_value_length, column.max_length) - ansiwrap.ansilen(output_text),150 )151 return "%s%s" % (padding, output_text)152def output_events(columns, events):153 for e in events:154 fmt = " ".join("%s" for _ in columns.values())155 print(156 fmt157 % tuple([format_column(n, c, getattr(e, n)) for n, c in columns.items()])158 )159def update_stacks_from_events(stacks, events, main_stack, max_depth=None):160 cf = boto3.resource("cloudformation")161 to_remove = set()162 to_add = set()163 # NOTE: Assuming that events are in proper order here164 for event in events:165 if event.resource_type == STACK_TYPE:166 if event.resource_status.endswith("COMPLETE"):167 # print('Stack %s COMPLETE' % (event.physical_resource_id,))168 if event.physical_resource_id in to_add:169 to_add.remove(event.physical_resource_id)170 to_remove.add(event.physical_resource_id)171 # if event.physical_resource_id not in stacks:172 # print('Stack %s COMPLETE but not in stack list' % (event.physical_resource_id,))173 else:174 if event.physical_resource_id in to_remove:175 to_remove.remove(event.physical_resource_id)176 if not event.physical_resource_id:177 LOG.debug(178 "stack event has an empty physical_resource_id: %r", event179 )180 continue181 to_add.add(event.physical_resource_id)182 # if event.physical_resource_id not in stacks:183 # print('Found new substack %s' % (event.physical_resource_id,))184 for stack_id in to_remove:185 if stack_id != main_stack.stack_id and stack_id in stacks:186 if len(stacks) != 1:187 del stacks[stack_id]188 # else:189 # print('Final stack COMPLETE')190 for stack_id in to_add:191 if stack_id not in stacks and (192 max_depth is None or len(stack_id.split("-")) - 1 <= max_depth193 ):194 stack = cf.Stack(stack_id)195 # Force loading with a retry as it can incur a potentially-failing API call196 retry(lambda: stack.stack_id)()197 # try:198 # get_stack_events(stack, 1)199 # except botocore.exceptions.ClientError:200 # continue201 stacks[stack_id] = stack202def do_tail_stack_events(main_stack, num, columns, headers, max_depth, follow):203 stacks = get_nested_stacks(204 main_stack.stack_id, status_check=lambda status: "IN_PROGRESS" in status205 )206 print("Getting events...")207 events = get_events(stacks, limit=num)208 outputted = set(e.id for e in events)209 update_columns(columns, events[-num:])210 update_stacks_from_events(stacks, events, main_stack, max_depth=max_depth)211 output_events(columns, [headers])212 output_events(columns, events[-num:])213 last_event_timestamp = events[-1].timestamp214 if not follow:215 return216 # TODO: Keep track of last updated timestamp for each stack and don't ask for more events217 # if it hasn't changed?218 # This would require updating the stack every time, though, which means adding another API call.219 while True:220 try:221 time.sleep(5)222 events = get_events(stacks)223 new_events = []224 for event in events:225 # Don't re-ouput events and don't output events older than the latest event shown (not doing this means226 # we can get events from the previous stack updates, which we don't want).227 if event.id in outputted or event.timestamp < last_event_timestamp:228 continue229 new_events.append(event)230 if not new_events:231 continue232 last_event_timestamp = new_events[-1].timestamp233 update_stacks_from_events(stacks, events, main_stack, max_depth=max_depth)234 # TODO: If an event for a stack comes in that isn't in stacks, add it to stacks.235 # TODO: Remove a stack from stacks if there is an "end" event? DELETE_COMPLETE or UPDATE_COMPLETE perhaps?236 # If adding a stack is implemented this should be fine.237 update_columns(columns, new_events)238 # TODO: Only output headers if the column width has changed or we've output more than X rows since the last239 # header.240 output_events(columns, [headers])241 output_events(columns, new_events)242 for event in new_events:243 # TODO: outputted grows constantly over time, it needs to be culled at some point.244 # Potential fix: outputted is replaced each time we loop with only the ids from the loop.245 outputted.add(event.id)246 except Exception:247 traceback.print_exc()248@retry249def next_page(pages):250 try:251 return list(next(pages))252 except botocore.exceptions.ClientError:253 return None254 # traceback.print_exc()255 except StopIteration:256 return None257def get_stack_failure_events(stack, columns, headers, start_func=None):258 pages = stack.events.pages()259 events = []260 end = False261 ready = start_func is None262 first = True263 while not end:264 page = next_page(pages)265 # update_columns(columns, page)266 # output_events(columns, [headers])267 # output_events(columns, page)268 # print(page)269 # print(stack.stack_id)270 if not page:271 break272 for event in page:273 events.append(event)274 if not ready:275 if start_func(event):276 ready = True277 continue278 elif (279 event.resource_type == STACK_TYPE280 and event.resource_status.upper().endswith("COMPLETE")281 and event.physical_resource_id == stack.stack_id282 and (283 not first284 or event.resource_status.upper() != "UPDATE_ROLLBACK_COMPLETE"285 )286 ):287 end = True288 break289 first = False290 # update_columns(columns, events)291 # output_events(columns, [headers])292 # output_events(columns, events)293 events = sorted(294 (event for event in events if "FAIL" in event.resource_status.upper()),295 key=lambda e: e.timestamp,296 )297 # update_columns(columns, events)298 # output_events(columns, [headers])299 # output_events(columns, events)300 return events301def do_postmortem(302 stack, columns, headers, search_for_failure=False, show_all_failures=False303):304 print("Getting events...")305 start_func = (306 (307 lambda event: (308 event.resource_type == STACK_TYPE309 and event.resource_status.upper() == "UPDATE_ROLLBACK_COMPLETE"310 and event.physical_resource_id == stack.stack_id311 )312 )313 if search_for_failure314 else None315 )316 top_level = True317 events = []318 while True:319 new_events = get_stack_failure_events(320 stack, columns, headers, start_func=start_func321 )322 if not new_events:323 if top_level:324 print(325 "The last stack update succeeded or there is an ongoing update which has no failures yet."326 )327 sys.exit(1)328 else:329 print("No failure events found in nested stack %r." % (stack,))330 break331 if not show_all_failures:332 new_events = [new_events[0]]333 events.extend(new_events)334 fail_event = new_events[0]335 if (336 fail_event.resource_type != STACK_TYPE337 or "failed to" not in fail_event.resource_status_reason.lower()338 ):339 break340 start_func = lambda event: event.timestamp <= fail_event.timestamp341 stack = get_stack(fail_event.physical_resource_id)342 events.sort(key=lambda e: e.timestamp, reverse=show_all_failures)343 update_columns(columns, events)344 output_events(columns, [headers])345 output_events(columns, events)346def main():347 args = docopt.docopt(__doc__)348 if args["--profile"]:349 boto3.setup_default_session(profile_name=args["--profile"])350 postmortem = args["--postmortem"]351 max_column_length = args["--max-column-length"]352 if max_column_length is None:353 max_column_length = 200 if postmortem else 40354 max_column_length = int(max_column_length)355 columns = collections.OrderedDict(356 [357 ("timestamp", Column(0, max_column_length)),358 ("stack_name", Column(0, max_column_length)),359 # ('stack_id', Column(0, max_column_length)),360 ("resource_type", Column(0, max_column_length)),361 ("logical_resource_id", Column(0, max_column_length)),362 # ('physical_resource_id', Column(0, max_column_length)),363 ("resource_status", Column(0, max_column_length)),364 ("resource_status_reason", Column(0, max_column_length)),365 ]366 )367 headers = collections.namedtuple("Headers", columns.keys())(368 *[369 colorama.Style.BRIGHT + t + colorama.Style.RESET_ALL370 for t in [371 "Timestamp",372 "Stack Name",373 # 'Stack ID',374 "Resource Type",375 "Logical Resource ID",376 # 'Physical Resource ID',377 "Status",378 "Reason",379 ]380 ]381 )382 update_columns(columns, [headers])383 print("Getting stack...")384 main_stack = get_stack(args["<stack>"])385 if postmortem:386 do_postmortem(387 main_stack,388 columns,389 headers,390 search_for_failure=args["--find-last-failure"],391 show_all_failures=args["--show-all-failures"],392 )393 else:394 num = int(args["--number"])395 max_depth = int(args["--depth"])396 if max_depth == -1:397 max_depth = None398 do_tail_stack_events(399 main_stack, num, columns, headers, max_depth, args["--follow"]400 )401if __name__ == "__main__":402 try:403 main()404 except KeyboardInterrupt:...
atest.py
Source:atest.py
...76 action_obj.output(results)77 except Exception:78 traceback.print_exc()79 finally:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!