Best Python code snippet using robotframework
aptest.py
Source: aptest.py
1import subprocess2import time3from runner.utils.sonicauto import SonicAuto4from runner.settings import python_logger5class UploadAptestException(Exception):6 def __init__(self, info):7 super().__init__(f"Upload result to ApTest is skipped: {info}")8class ApTest:9 def __init__(self, host='10.208.1.8', token='qatoken', user_name='automation', retry=5):10 self.host = host11 self.token = token12 self.user_name = user_name13 self.retry = retry14 self.uuid_map = {}15 def upload_aptest(self, results, platform, scmlabel):16 num_of_case_uploaded = 017 try:18 # get session_group_id19 sa = SonicAuto()20 product_id = str(sa.get_product_id_from_platform(platform))21 if product_id == 0:22 raise UploadAptestException("product id is not found")23 session_group_id = sa.get_session_group_name(scmlabel, product_id)24 if session_group_id == '':25 raise UploadAptestException("session group id is not found")26 # get ApTest suite name from product_name27 application_platform_aptest_mapping = {28 'GMS': 'GMSVP',29 'LICENSEMANAGER': 'License_Manager',30 'MYSONICWALL': 'Latte',31 'SANDBOX': 'Latte',32 'SNOWSHOE': 'Latte',33 'CFC': 'Desktop_Clients'34 }35 productid_aptest_mapping = {36 '2': 'SMA_1000_AO_Phase-6',37 '3': 'EmailSecurity',38 '5': 'SMA_100_AO_Phase6',39 '6': 'firmware_sonicos',40 '7': 'wxa',41 '8': 'Web_Application_Firewall',42 '9': 'SMA_1000_AO_Phase-6',43 '10': 'CloudWAF',44 '11': 'Capture_Client',45 '12': 'Latte'46 }47 aptest_suite_name = ''48 if product_id == '1':49 aptest_suite_name = application_platform_aptest_mapping[platform]50 else:51 aptest_suite_name = productid_aptest_mapping[product_id]52 retry = self.retry53 while (retry):54 if self.get_session_numbers_from_group_id(aptest_suite_name, session_group_id):55 retry -= 156 time.sleep(20)57 else:58 break59 for tc in results:60 if not 'uuid' in tc:61 python_logger.write("INFO: Testcase uuid not defined. Results not uploaded to ApTest" + '\n')62 continue63 uuid = tc['uuid']64 if uuid in self.uuid_map:65 session_number = self.uuid_map[uuid]66 if self.update_results(scmlabel, platform, aptest_suite_name, session_number, **tc) == '0':67 num_of_case_uploaded += 168 except UploadAptestException as e:69 python_logger.write(e.args[0] + '\n')70 except Exception as e:71 python_logger.write(e.args[0] + '\n')72 return num_of_case_uploaded73 def get_session_numbers_from_group_id(self, aptest_suite_name, session_group_id):74 rpc_query_string = '?suite=' + aptest_suite_name + '&sessiongroupid=' + session_group_id75 rpc_url = 'https://' + self.host + '/sw-getSessionsWithVar.pl' + rpc_query_string76 python_logger.write("DEBUG: RPC URL is: " + rpc_url + '\n')77 params = ['--connect-timeout', '20', '--silent', '-k']78 params.append(rpc_url)79 try:80 rpc_result_string = subprocess.Popen(['curl'] + params, stdout=subprocess.PIPE).communicate()[0]81 return_string = rpc_result_string.decode('ASCII')82 rpc_result_list = return_string.split('\n')83 rpc_return_value = rpc_result_list.pop(0)84 python_logger.write("DEBUG: RPC result value is: " + rpc_return_value + '\n')85 for session_setname in rpc_result_list:86 if session_setname == '':87 continue88 session_number, set_name = session_setname.split(',')89 rpc_query_string = '?suite=' + aptest_suite_name + '&set=' + set_name + '&session=' + session_number90 rpc_url = 'https://' + self.host + '/sw-autotclist.pl' + rpc_query_string91 python_logger.write("DEBUG: RPC URL is: " + rpc_url + '\n')92 params = ['--connect-timeout', '20', '--silent', '-k']93 params.append(rpc_url)94 rpc_result_string = subprocess.Popen(['curl'] + params, stdout=subprocess.PIPE).communicate()[0]95 return_string = rpc_result_string.decode('ASCII')96 rpc_result_list_uuid = return_string.split('\n')97 rpc_return_code = rpc_result_list_uuid.pop(0)98 python_logger.write("DEBUG: TC list result code is: " + rpc_return_code + '\n')99 if rpc_return_code == 1:100 python_logger.write("Failed return status from getUUIDsFromSessionNumber" + '\n')101 elif len(rpc_result_list_uuid) == 1:102 python_logger.write("No corresponding testcases for the given session number" + '\n')103 else:104 for testcase_data in rpc_result_list_uuid:105 uuid = testcase_data.split(',')106 self.uuid_map[uuid[0]] = session_number107 except:108 python_logger.error("Unable to run rpc query")109 return 1110 def update_results(self, scmlabel, platform, aptest_suite_name, session_number, **tc):111 uuid = tc['uuid']112 result = ''113 if 'result' in tc:114 result = tc['result']115 note = ''116 if 'note' in tc:117 note = tc['note']118 custom_key = ''119 if 'custom_key' in tc:120 custom_key = tc['custom_key']121 custom_val = ''122 if 'custom_val' in tc:123 custom_val = tc['custom_val']124 note_default = ''125 if scmlabel != '1.1.1.1' and platform != 'TestProd':126 note_default = 'BuildVersion:' + scmlabel + '__Product' + platform127 if note:128 note_default = note_default + '__Note:' + note129 transform_result = {'PASSED': 'pass', 'FAILED': 'fail', 'SKIPPED': 'untested'}130 if result in transform_result:131 result = transform_result[result]132 execdata = 'EXECDATA' + '_' + custom_key133 rpc_query_string = '?rpctoken=' + self.token + '&username=' + self.user_name + '&suite=' + aptest_suite_name + '&command=result&sess=' + session_number + '&uuid=' + uuid + '&result=' + result + '&' + execdata + '=' + custom_val + '¬e=' + note_default134 rpc_query_string = rpc_query_string.replace('[', '\[')135 rpc_query_string = rpc_query_string.replace(']', '\]')136 rpc_url = 'https://' + self.host + '/run/rpc.mpl' + rpc_query_string137 python_logger.write("DEBUG: RPC URL is: " + rpc_url + '\n')138 params = ['--connect-timeout', '20', '--silent', '-k']139 params.append(rpc_url)140 try:141 rpc_result_string = subprocess.Popen(['curl'] + params, stdout=subprocess.PIPE).communicate()[0]142 return_string = rpc_result_string.decode('ASCII')143 rpc_result_list = return_string.split('\n')144 rpc_return_value = rpc_result_list[0]145 rpc_return_message = rpc_result_list[1]146 python_logger.write("DEBUG: RPC result value is: " + rpc_return_value + '\n')147 python_logger.write("DEBUG: RPC result message is: " + rpc_return_message + '\n')148 return rpc_return_value149 except:150 python_logger.error("ERROR: Unable to run rpc query")151 return152# if __name__ == '__main__':153# ap = ApTest()154# cts_result = [{'uuid': '8A271988-0464-11DE-860E-445A00F93527', 'result': 'PASSED'},155# {'uuid': '8A39711E-0464-11DE-860E-445A00F93527', 'result': 'PASSED', 'note': '[trialrun]'}]156# scmlabel = '6.5.2.2-44n'157# platform = '5600'...
Logger.py
Source: Logger.py
1#!/usr/bin/env python2"""This is a simple class for wrapping functions in the3native python logging module."""45import logging, sys, os, shutil6import logging.handlers7from mini_utility import get_spkg_path, make_path8from static_data import OK, FAIL, LOG_MAX_SIZE, LOGS_TO_KEEP, LOG_FILE910FORMAT_STRING = '%(asctime)s|%(levelname)s|%(message)s|'11LOGGER_NAME = "bombardier"1213class LoggerClass:1415 """This class implements a simple interface that other logging16 type classes also implement. This is the most complete17 implementation. """1819 def __init__(self):20 spkg_path = get_spkg_path()21 self.python_logger = None22 self.formatter = None23 self.std_err_handler = None24 self.file_handler = None25 self.log_path = None26 if spkg_path:27 self.log_path = make_path(get_spkg_path(), LOG_FILE)28 if self.check_log_size() == FAIL:29 self.cycle_log()30 self.make_logger()3132 def check_log_size(self):33 """ We were dealing with Windows, and couldn't use Python's34 fancy rolling logger effectively. We therefore hd to see35 if the log is too big and deal with it periodically. -36 pbanka37 """38 if not os.path.isfile(self.log_path):39 return40 size = os.stat(self.log_path)[6]41 if size > LOG_MAX_SIZE:42 return FAIL43 return OK4445 def cycle_log(self):46 "Manually cycle logs"47 oldest_log_file = "%s.%s" % (self.log_path, LOGS_TO_KEEP)48 if os.path.isfile(oldest_log_file):49 try:50 os.unlink(oldest_log_file)51 except OSError:52 return53 for i in range(LOGS_TO_KEEP-1, 0, -1):54 source_path = "%s.%s" % (self.log_path, i)55 if os.path.isfile(source_path):56 dest_path = "%s.%s" % (self.log_path, (i+1))57 shutil.copyfile(source_path, dest_path)58 os.unlink(source_path)59 shutil.copyfile(self.log_path, "%s.1" % self.log_path)60 try:61 os.unlink(self.log_path)62 except OSError:63 return6465 def make_logger(self):66 "use the logging subsystem to obtain a logger"67 self.python_logger = logging.getLogger(LOGGER_NAME)68 try:69 self.file_handler = logging.FileHandler(self.log_path)70 except IOError:71 try:72 self.file_handler = logging.FileHandler(self.log_path)73 except IOError:74 print "Unable to log to %s" % self.log_path75 self.file_handler = logging.StreamHandler(sys.stderr)76 self.formatter = logging.Formatter(FORMAT_STRING)77 self.file_handler.setFormatter(self.formatter)78 self.python_logger.addHandler(self.file_handler)79 self.python_logger.setLevel(logging.DEBUG)8081 def info(self, msg):82 "wrap python logging facility"83 self.python_logger.info(msg)8485 def debug(self, msg):86 "wrap python logging facility"87 self.python_logger.debug(msg)8889 def warning(self, msg):90 "wrap python logging facility"91 self.python_logger.warning(msg)9293 def error(self, msg):94 "wrap python logging facility"95 self.python_logger.error(msg)9697 def critical(self, msg):98 "wrap python logging facility"99 self.python_logger.critical(msg)100101 def add_std_err_logging(self):102 "Get stderr logging in addition to file logging"103 self.std_err_handler = logging.StreamHandler(sys.stderr)104 self.std_err_handler.setFormatter(self.formatter)105 self.python_logger.addHandler(self.std_err_handler)106107 def rm_file_logging(self):108 "Sometimes you don't want to log to a file"109 if self.file_handler:110 msg = "Removing logging to %s" % self.log_path111 self.python_logger.info(msg)112 self.python_logger.removeHandler(self.file_handler)113 else:114 msg = "Being told to remove nonexistent file logging handler."115 self.python_logger.warning(msg)116117 def rm_std_err_logging(self):118 "Sometimes you're tired of seeing stderr logging"119 if self.std_err_handler:120 self.python_logger.info("Removing logging to standard error")121 self.python_logger.removeHandler(self.std_err_handler)122 else:123 msg = "Being told to remove nonexistent stderr handler."124 self.python_logger.warning(msg)125
...
ServerLogger.py
Source: ServerLogger.py
1import StringIO2import syslog3syslog.openlog("dispatcher", syslog.LOG_PID, syslog.LOG_USER)4syslog.LOG_UPTO(syslog.LOG_INFO)5import logging, sys, os, shutil6import logging.handlers7FORMAT_STRING = '%(asctime)s|%(levelname)s|%(message)s|'8class ServerLogger:9 def __init__(self, name, use_syslog = False):10 self.name = name11 self.python_logger = logging.getLogger(self.name)12 self.python_logger.setLevel(logging.DEBUG)13 self.formatter = logging.Formatter(FORMAT_STRING)14 self.file_handle = None15 self.use_syslog = use_syslog16 self.output_pointer = None17 def add_stringio_handle(self):18 self.file_handle = StringIO.StringIO()19 self.output_pointer = 020 file_log_handler = logging.StreamHandler(self.file_handle)21 file_log_handler.setFormatter(self.formatter)22 self.python_logger.addHandler(file_log_handler)23 def add_std_err(self):24 std_err_handler = logging.StreamHandler(sys.stderr)25 std_err_handler.setFormatter(self.formatter)26 self.python_logger.addHandler(std_err_handler)27 def get_new_logs(self):28 self.file_handle.seek(self.output_pointer)29 new_output = self.file_handle.read()30 output_list = new_output.split('\n')31 self.output_pointer += len(new_output) - len(output_list[-1])32 return output_list[:-1]33 def get_final_logs(self):34 self.file_handle.seek(self.output_pointer)35 return self.file_handle.read().split('\n')36 def get_complete_log(self):37 self.file_handle.seek(0)38 return self.file_handle.read()39 def log_message(self, level, message, username=None):40 if not username:41 username = self.name42 level_map = {"DEBUG": self.debug,43 "INFO": self.info,44 "WARNING": self.warning,45 "ERROR": self.error46 }47 if level in level_map:48 level_map[level](message, username)49 50 def debug(self, msg, username=None):51 if not username:52 username = self.name53 self.log(syslog.LOG_DEBUG, msg, username)54 self.python_logger.debug("%s|%s" % (username, msg))55 def info(self, msg, username=None):56 if not username:57 username = self.name58 self.log(syslog.LOG_INFO, msg, username)59 self.python_logger.info("%s|%s" % (username, msg))60 def warning(self, msg, username=None):61 if not username:62 username = self.name63 self.log(syslog.LOG_WARNING, msg, username)64 self.python_logger.warning("%s|%s" % (username, msg))65 def error(self, msg, username=None):66 if not username:67 username = self.name68 self.log(syslog.LOG_ERR, msg, username)69 self.python_logger.error("%s|%s" % (username, msg))70 def log(self, level, msg, username=None):71 if not username:72 username = self.name73 if self.use_syslog:74 syslog.syslog(level, "%-15s|dispatcher: %s" % (username, msg))75def test1():76 a = ServerLogger("test1")77 a.add_std_err()78 a.info("hello", "test_user")79def test2():80 a = ServerLogger("test1")81 a.info("hello2", "test_user")82if __name__ == "__main__":83 test1()...
logging.py
Source: logging.py
1import logging2from logging.handlers import TimedRotatingFileHandler3from .utils import make_random_string4def configure_logging(log_filepath=None, when=None, interval=None, backup_count=None):5 formatter = DispatchingFormatter({6 'aggregator': logging.Formatter('%(asctime)s - %(name)s - %(req_id)s - %(subsystem)s - %(levelname)s - %(message)s'),7 'quart.serving': logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'),8 'quart.app': logging.Formatter('%(asctime)s - %(name)s - %(levelname)s in %(module)s: %(message)s'),9 }, logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))10 if log_filepath:11 handler = TimedRotatingFileHandler(log_filepath, when=when, interval=interval, backupCount=backup_count)12 else:13 handler = logging.StreamHandler()14 handler.setLevel(logging.DEBUG)15 handler.setFormatter(formatter)16 # Our own internal application logger wrapper17 logger = logging.getLogger('aggregator')18 logger.setLevel(logging.DEBUG)19 logger.addHandler(handler)20 application_logger = Logger(logger, subsystem='root')21 return application_logger, handler22def configure_logging_for_tests():23 logger = logging.getLogger('aggregator')24 logger.setLevel(logging.DEBUG)25 return Logger(logger, subsystem='root')26class Logger(object):27 def __init__(self, python_logger, **extra):28 extra.setdefault('subsystem', '')29 extra.setdefault('req_id', '__n/a__')30 self.python_logger = python_logger31 self.extra = extra32 def info(self, msg, **extra):33 e = self.extra.copy()34 e.update(extra)35 self.python_logger.info(msg, extra=e)36 def error(self, msg, exc_info=None, **extra):37 e = self.extra.copy()38 e.update(extra)39 self.python_logger.error(msg, exc_info=exc_info, extra=e)40 def exception(self, msg, exc_info=True, **extra):41 e = self.extra.copy()42 e.update(extra)43 self.python_logger.exception(msg, exc_info=exc_info, extra=e)44 def getLogger(self, **extra):45 new_extra = self.extra.copy()46 new_extra.update(extra)47 return Logger(self.python_logger, **new_extra)48 def getLoggerWithRandomReqId(self, prefix):49 new_extra = self.extra.copy()50 new_extra['req_id'] = f'{prefix}-{make_random_string(7)}'51 return Logger(self.python_logger, **new_extra)52# From https://stackoverflow.com/a/3462668553class DispatchingFormatter:54 def __init__(self, formatters, default_formatter):55 self._formatters = formatters56 self._default_formatter = default_formatter57 def format(self, record):58 # Search from record's logger up to it's parents:59 logger = logging.getLogger(record.name)60 while logger:61 # Check if suitable formatter for current logger exists:62 if logger.name in self._formatters:63 formatter = self._formatters[logger.name]64 break65 else:66 logger = logger.parent67 else:68 # If no formatter found, just use default:69 formatter = self._default_formatter...
Check out the latest blogs from LambdaTest on this topic:
When most firms employed a waterfall development model, it was widely joked about in the industry that Google kept its products in beta forever. Google has been a pioneer in making the case for in-production testing. Traditionally, before a build could go live, a tester was responsible for testing all scenarios, both defined and extempore, in a testing environment. However, this concept is evolving on multiple fronts today. For example, the tester is no longer testing alone. Developers, designers, build engineers, other stakeholders, and end users, both inside and outside the product team, are testing the product and providing feedback.
Agile project management is a great alternative to traditional methods, to address the customer’s needs and the delivery of business value from the beginning of the project. This blog describes the main benefits of Agile for both the customer and the business.
As everyone knows, the mobile industry has taken over the world and is the fastest emerging industry in terms of technology and business. It is possible to do all the tasks using a mobile phone, for which earlier we had to use a computer. According to Statista, in 2021, smartphone vendors sold around 1.43 billion smartphones worldwide. The smartphone penetration rate has been continuously rising, reaching 78.05 percent in 2020. By 2025, it is expected that almost 87 percent of all mobile users in the United States will own a smartphone.
Software testing is fueling the IT sector forward by scaling up the test process and continuous product delivery. Currently, this profession is in huge demand, as it needs certified testers with expertise in automation testing. When it comes to outsourcing software testing jobs, whether it’s an IT company or an individual customer, they all look for accredited professionals. That’s why having an software testing certification has become the need of the hour for the folks interested in the test automation field. A well-known certificate issued by an authorized institute kind vouches that the certificate holder is skilled in a specific technology.
It’s strange to hear someone declare, “This can’t be tested.” In reply, I contend that everything can be tested. However, one must be pleased with the outcome of testing, which might include failure, financial loss, or personal injury. Could anything be tested when a claim is made with this understanding?
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!