Best Python code snippet using autotest_python
result.py
Source:result.py
1from __future__ import with_statement2from blessings import Terminal3from nose.plugins.skip import SkipTest4from nose.result import TextTestResult5from nose.util import isclass6from noseprogressive.bar import ProgressBar, NullProgressBar7from noseprogressive.tracebacks import format_traceback, extract_relevant_tb8from noseprogressive.utils import nose_selector, index_of_test_frame9class ProgressiveResult(TextTestResult):10 """Test result which updates a progress bar instead of printing dots11 Nose's ResultProxy will wrap it, and other plugins can still print12 stuff---but without smashing into my progress bar, care of my Plugin's13 stderr/out wrapping.14 """15 def __init__(self, cwd, total_tests, stream, config=None):16 super(ProgressiveResult, self).__init__(stream, None, 0, config=config)17 self._cwd = cwd18 self._options = config.options19 self._term = Terminal(stream=stream,20 force_styling=config.options.with_styling)21 if self._term.is_a_tty or self._options.with_bar:22 # 1 in case test counting failed and returned 023 self.bar = ProgressBar(total_tests or 1,24 self._term,25 config.options.bar_filled_color,26 config.options.bar_empty_color)27 else:28 self.bar = NullProgressBar()29 # Declare errorclass-savviness so ErrorClassPlugins don't monkeypatch30 # half my methods away:31 self.errorClasses = {}32 def startTest(self, test):33 """Update the progress bar."""34 super(ProgressiveResult, self).startTest(test)35 self.bar.update(nose_selector(test), self.testsRun)36 def _printTraceback(self, test, err):37 """Print a nicely formatted traceback.38 :arg err: exc_info()-style traceback triple39 :arg test: the test that precipitated this call40 """41 # Don't bind third item to a local var; that can create42 # circular refs which are expensive to collect. See the43 # sys.exc_info() docs.44 exception_type, exception_value = err[:2]45 # TODO: In Python 3, the traceback is attached to the exception46 # instance through the __traceback__ attribute. If the instance47 # is saved in a local variable that persists outside the except48 # block, the traceback will create a reference cycle with the49 # current frame and its dictionary of local variables. This will50 # delay reclaiming dead resources until the next cyclic garbage51 # collection pass.52 extracted_tb = extract_relevant_tb(53 err[2],54 exception_type,55 exception_type is test.failureException)56 test_frame_index = index_of_test_frame(57 extracted_tb,58 exception_type,59 exception_value,60 test)61 if test_frame_index:62 # We have a good guess at which frame is the test, so63 # trim everything until that. We don't care to see test64 # framework frames.65 extracted_tb = extracted_tb[test_frame_index:]66 with self.bar.dodging():67 self.stream.write(''.join(68 format_traceback(69 extracted_tb,70 exception_type,71 exception_value,72 self._cwd,73 self._term,74 self._options.function_color,75 self._options.dim_color,76 self._options.editor,77 self._options.editor_shortcut_template)))78 def _printHeadline(self, kind, test, is_failure=True):79 """Output a 1-line error summary to the stream if appropriate.80 The line contains the kind of error and the pathname of the test.81 :arg kind: The (string) type of incident the precipitated this call82 :arg test: The test that precipitated this call83 """84 if is_failure or self._options.show_advisories:85 with self.bar.dodging():86 self.stream.writeln(87 '\n' +88 (self._term.bold if is_failure else '') +89 '%s: %s' % (kind, nose_selector(test)) +90 (self._term.normal if is_failure else '')) # end bold91 def _recordAndPrintHeadline(self, test, error_class, artifact):92 """Record that an error-like thing occurred, and print a summary.93 Store ``artifact`` with the record.94 Return whether the test result is any sort of failure.95 """96 # We duplicate the errorclass handling from super rather than calling97 # it and monkeying around with showAll flags to keep it from printing98 # anything.99 is_error_class = False100 for cls, (storage, label, is_failure) in self.errorClasses.items():101 if isclass(error_class) and issubclass(error_class, cls):102 if is_failure:103 test.passed = False104 storage.append((test, artifact))105 is_error_class = True106 if not is_error_class:107 self.errors.append((test, artifact))108 test.passed = False109 is_any_failure = not is_error_class or is_failure110 self._printHeadline(label if is_error_class else 'ERROR',111 test,112 is_failure=is_any_failure)113 return is_any_failure114 def addSkip(self, test, reason):115 """Catch skipped tests in Python 2.7 and above.116 Though ``addSkip()`` is deprecated in the nose plugin API, it is very117 much not deprecated as a Python 2.7 ``TestResult`` method. In Python118 2.7, this will get called instead of ``addError()`` for skips.119 :arg reason: Text describing why the test was skipped120 """121 self._recordAndPrintHeadline(test, SkipTest, reason)122 # Python 2.7 users get a little bonus: the reason the test was skipped.123 if isinstance(reason, Exception):124 reason = getattr(reason, 'message', None) or getattr(125 reason, 'args')[0]126 if reason and self._options.show_advisories:127 with self.bar.dodging():128 self.stream.writeln(reason)129 def addError(self, test, err):130 # We don't read this, but some other plugin might conceivably expect it131 # to be there:132 excInfo = self._exc_info_to_string(err, test)133 is_failure = self._recordAndPrintHeadline(test, err[0], excInfo)134 if is_failure:135 self._printTraceback(test, err)136 def addFailure(self, test, err):137 super(ProgressiveResult, self).addFailure(test, err)138 self._printHeadline('FAIL', test)139 self._printTraceback(test, err)140 def printSummary(self, start, stop):141 """As a final summary, print number of tests, broken down by result."""142 def renderResultType(type, number, is_failure):143 """Return a rendering like '2 failures'.144 :arg type: A singular label, like "failure"145 :arg number: The number of tests with a result of that type146 :arg is_failure: Whether that type counts as a failure147 """148 # I'd rather hope for the best with plurals than totally punt on149 # being Englishlike:150 ret = '%s %s%s' % (number, type, 's' if number != 1 else '')151 if is_failure and number:152 ret = self._term.bold(ret)153 return ret154 # Summarize the special cases:155 counts = [('test', self.testsRun, False),156 ('failure', len(self.failures), True),157 ('error', len(self.errors), True)]158 # Support custom errorclasses as well as normal failures and errors.159 # Lowercase any all-caps labels, but leave the rest alone in case there160 # are hard-to-read camelCaseWordBreaks.161 counts.extend([(label.lower() if label.isupper() else label,162 len(storage),163 is_failure)164 for (storage, label, is_failure) in165 self.errorClasses.values() if len(storage)])166 summary = (', '.join(renderResultType(*a) for a in counts) +167 ' in %.1fs' % (stop - start))168 # Erase progress bar. Bash doesn't clear the whole line when printing169 # the prompt, leaving a piece of the bar. Also, the prompt may not be170 # at the bottom of the terminal.171 self.bar.erase()172 self.stream.writeln()173 if self.wasSuccessful():174 self.stream.write(self._term.bold_green('OK! '))...
ssh_parse.py
Source:ssh_parse.py
1import ipaddress2import time3from datetime import datetime4import re5class Parse_SSH:6 dict = {}7 user_account = ['osamac', 'kamran', 'student', 'root']8 number_of_failure = 09 def isValid(self,name):10 if name in self.user_account:11 return "1"12 else:13 return "0"14 def GetFailure(self):15 return self.number_of_failure16 def ParseUsr(self,line):17 usr = None18 flag = 019 if "Accepted password" in line:20 usr = re.search(r'(\bfor\s)(\w+)', line)21 elif "sudo:" in line:22 usr = re.search(r'(sudo:\s+)(\w+)', line)23 elif "authentication failure" in line:24 usr = re.search(r'USER=\w+', line)25 elif "for invalid user" in line:26 usr = re.search(r'(\buser\s)(\w+)', line)27 elif "Invalid user" in line:28 flag = 129 str_ = line30 loc_start = str_.find("Invalid user ") + len("Invalid user ")31 loc_end = str_.find(" from")32 usr = str_[loc_start:loc_end]33 elif "Failed password for" in line:34 flag = 135 str_ = line36 loc_start = str_.find("Failed password for ") + len("Failed password for ")37 loc_end = str_.find(" from")38 usr = str_[loc_start:loc_end]39 if usr is not None:40 if flag == 1:41 return usr42 return usr.group(2)43 else:44 return "-1"45 # parse an IP from a line46 def ParseIP(self,line):47 ip = re.search(r'(\bfrom\s)(\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b)', line)48 if ip is not None:49 return ip.group(2)50 else:51 ip = "-1"52 return ip53 def isPrivate(self,ip):54 try:55 is_private = int(ipaddress.ip_address(ip).is_private)56 return is_private57 except:58 return 059 # parse a date from the line60 def ParseDate(self,line):61 #date = re.search(r'^[A-Za-z]{3}\s*[0-9]{1,2}\s[0-9]{1,2}:[0-9]{2}:[0-9]{2}', line)62 date = line[0:15]63 date = datetime.strptime(date, '%b %d %H:%M:%S')64 date = date.replace(year=2018)65 if date is not None:66 return date.timestamp()67 else:68 datetime.now().timestamp()69 # parse a command from a line70 def ParseCmd(line):71 # parse command to end of line72 cmd = re.search(r'(\bCOMMAND=)(.+?$)', line)73 if cmd is not None:74 return cmd.group(2)75 def isRoot(self,line):76 if line.find("root") != -1:77 return "1"78 else:79 return "0"80 def SSHProcessed(self,line):81 t = self.ParseDate(str(line))82 # match a login83 if "Invalid user" in line:84 usr = self.ParseUsr(line)85 ip = self.ParseIP(line)86 is_private = self.isPrivate(ip)87 self.number_of_failure= self.number_of_failure+188 is_failure = "1"89 is_root = self.isRoot(line)90 is_valid = self.isValid(usr)91 elif "Accepted password for" in line:92 usr = self.ParseUsr(line)93 ip = self.ParseIP(line)94 is_private = self.isPrivate(ip)95 self.number_of_failure=096 is_failure = "0"97 is_root = self.isRoot(line)98 is_valid = self.isValid(usr)99 # match a failed login100 elif "Failed password for" in line:101 usr = self.ParseUsr(line)102 ip = self.ParseIP(line)103 is_private = self.isPrivate(ip)104 self.number_of_failure = self.number_of_failure + 1105 is_failure = "1"106 is_root = self.isRoot(line)107 is_valid = self.isValid(usr)108 elif "authentication failure;" in line:109 usr = self.ParseUsr(line)110 ip = self.ParseIP(line)111 is_private = self.isPrivate(ip)112 self.number_of_failure = self.number_of_failure + 1113 is_failure = "1"114 is_root = self.isRoot(line)115 is_valid = self.isValid(usr)116 else:117 usr = "-1"118 ip = "-1"119 return {}120 print("IP ADDRESS : " + ip)121 if usr != "-1" or ip != "-1":122 if self.dict.get(ip) == None:123 count = -1124 if is_valid == "1":125 count = 0126 else:127 count = 1128 if is_failure == "0":129 self.dict.update({ip: {"is_private": is_private, "is_failure": is_failure, "is_root": is_root,"is_valid": is_valid,"not_valid_count":count, "user": usr,"ip_failure":0,"ip_success":1, "no_failure": self.number_of_failure, "td": int(0),"first":1,"ts":t}})130 else:131 self.dict.update({ip: {"is_private": is_private, "is_failure": is_failure, "is_root": is_root,"is_valid": is_valid,"not_valid_count":count, "user": usr,"ip_failure":1,"ip_success":0, "no_failure": self.number_of_failure, "td": int(0),"first":1,"ts":t}})132 else:133 count = -1134 if is_valid == "1":135 count = 0136 else:137 count = int(self.dict[ip]["not_valid_count"])+1138 if is_failure == "0":139 c = int(self.dict[ip]["ip_success"]) +1140 f = 0141 if(f <= 0):142 f = 0143 td = t - int(self.dict[ip]["ts"])144 self.dict.update({ip: {"is_private": is_private, "is_failure": is_failure, "is_root": is_root,"is_valid": is_valid,"not_valid_count":count, "user": usr,"ip_failure":f,"ip_success":c, "no_failure": self.number_of_failure, "td": int(td),"first":0,"ts":int(t)}})145 else:146 c = int(self.dict[ip]["ip_success"])147 f = int(self.dict[ip]["ip_failure"]) + 1148 td = t-int(self.dict[ip]["ts"])149 self.dict.update({ip: {"is_private": is_private, "is_failure": is_failure, "is_root": is_root,"is_valid": is_valid,"not_valid_count":count, "user": usr,"ip_failure":f,"ip_success":c, "no_failure": self.number_of_failure, "td": int(td),"first":0,"ts":int(t)}})150 if self.dict.get(ip) == None:151 return {}152 return self.dict[ip]153 # match commands154 #elif "sudo:" in line:155 # # parse user156 # usr = self.ParseUsr(line)157 # cmd = self.ParseCmd(line)158 # # append the command if it isn't there already159 # if cmd is not None:160 # if not cmd in logs[usr].commands:161 # logs[usr].commands.append(cmd)...
helper.py
Source:helper.py
1#!/usr/bin/env python32#3# Helper script written in python. Provide some ultilities for CI checks and workflow4# Usage: ./scripts/helper.py --help5#6import argparse7import os8import subprocess9from typing import List10def print_usage():11 print('Run with --help to get more info')12def get_working_directories(file_list: str) -> List[str]:13 files = file_list.split(' ')14 # Get list of working directories: file_path can be a file or a directory15 directories = []16 for fpath in files:17 if os.path.isfile(fpath) and fpath.endswith('.tf'):18 dir_path = fpath.rsplit('/', 1)[0]19 directories.append(dir_path)20 21 # Include also /tests sub-direcotory if we are changing the TF files in root module22 tests_dir = [x[0] for x in os.walk(dir_path) if '/tests/' in x[0]]23 directories.extend(tests_dir)24 elif os.path.isdir(fpath):25 directories.append(fpath)26 # Exclude ".terraform" folder27 directories = list(set([d for d in directories if '.terraform' not in d]))28 directories.sort()29 return directories30def run_terraform_fmt(file_list: str, is_check: bool):31 directories = get_working_directories(file_list)32 is_failure = False33 for directory in directories:34 terraform_fmt_cmd = 'terraform fmt -check -no-color %s' % directory if is_check else 'terraform fmt -no-color %s' % directory35 result = subprocess.run([terraform_fmt_cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE,36 timeout=30, shell=True, universal_newlines=True)37 print('Running %s' % terraform_fmt_cmd)38 if result.returncode != 0:39 print('Terraform Format error : terraform fmt %s %s' % (result.stdout, result.stderr))40 is_failure = True41 if is_failure:42 exit(1)43def run_terraform_validate(file_list: str):44 directories = get_working_directories(file_list)45 # Filter only directories that contains '/tests' in its path46 directories = [d for d in directories if "/tests" in d]47 is_failure = False48 for directory in directories:49 terraform_validate_cmd = 'terraform init && terraform validate -no-color'50 result = subprocess.run([terraform_validate_cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE,51 timeout=30, shell=True, universal_newlines=True, cwd=directory)52 print('Running %s %s' % (terraform_validate_cmd, directory))53 if result.returncode != 0:54 print('Terraform Validate error on directory : %s \n%s %s' % (directory, result.stdout, result.stderr))55 is_failure = True56 if is_failure:57 exit(1)58def run_terraform_plan(file_list: str):59 directories = get_working_directories(file_list)60 # Filter only directories that contains '/tests' in its path61 directories = [d for d in directories if "/tests" in d]62 is_failure = False63 for directory in directories:64 terraform_plan_cmd = 'terraform init && terraform plan -no-color'65 result = subprocess.run([terraform_plan_cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE,66 timeout=30, shell=True, universal_newlines=True, cwd=directory)67 print('Running %s %s' % (terraform_plan_cmd, directory))68 if result.returncode != 0:69 print('Terraform Plan error on directory : %s \n%s %s' % (directory, result.stdout, result.stderr))70 is_failure = True71 if is_failure:72 exit(1)73def parse_args():74 parser = argparse.ArgumentParser(description='Helper script')75 subparsers = parser.add_subparsers(dest='func')76 run_terraform_fmt_parser = subparsers.add_parser(77 'run-terraform-fmt', help='Run terraform fmt on list of working directories')78 run_terraform_fmt_parser.add_argument(79 '-f', '--file_list', help='List of changed files separated by space')80 run_terraform_fmt_parser.add_argument(81 '-c', '--check', help='Whether to run terraform format check instead', action="store_true")82 run_terraform_validate_parser = subparsers.add_parser(83 'run-terraform-validate', help='Run terraform validate on list of working directories')84 run_terraform_validate_parser.add_argument(85 '-f', '--file_list', help='List of changed files separated by space')86 run_terraform_plan_parser = subparsers.add_parser(87 'run-terraform-plan', help='Run terraform plan on list of working directories')88 run_terraform_plan_parser.add_argument(89 '-f', '--file_list', help='List of changed files separated by space')90 return parser.parse_args()91if __name__ == '__main__':92 args = parse_args()93 if args.func == 'run-terraform-fmt':94 run_terraform_fmt(args.file_list, args.check)95 elif args.func == 'run-terraform-validate':96 run_terraform_validate(args.file_list)97 elif args.func == 'run-terraform-plan':98 run_terraform_plan(args.file_list)99 else:...
json_responses.py
Source:json_responses.py
1from django.http import HttpResponse2import json3class JsonResponse(HttpResponse):4 """5 A general class for returning JSON documents as HttpResponses6 """7 failure_string = "FAILURE"8 success_string = "SUCCESS"9 10 def __init__(self, is_failure, data_or_message):11 """Return a JsonResponse with a success/failure indicator, a failure message if required, and a data12 field for returning information to the client13 14 Keyword Aruments:15 is_failure - A Boolean value indicating whether the request was successful or not.16 data_or_message - A failure message or a string with data to return depending on the value of is_failure17 """18 19 response_object = { 20 "status" : self.failure_string if is_failure else self.success_string,21 "error_message" : data_or_message if is_failure else None,22 "data" : None if is_failure else data_or_message23 }24 25 super(JsonResponse, self).__init__(json.dumps(response_object), content_type="application/json")26class FailureResponse(JsonResponse):27 """28 The FailureResponse is a json response object that contains the single json string 'SUCCESS'29 """30 31 32 def __init__(self, failure_message):33 """Create a default HttpResponse subclass for communicating a Tangra failure."""34 super(FailureResponse, self).__init__(True, failure_message)35class SuccessResponse(JsonResponse):36 """37 The SuccessResponse is a json response object that contains the single json string 'SUCCESS'38 """ 39 40 def __init__(self, data):41 """Create a default HttpResponse subclass for communicating a Tangra success."""42 super(SuccessResponse, self).__init__(False, data)43# TODO: list response? ...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!