Best Python code snippet using robotframework
client_plain.py
Source:client_plain.py
1'''2SMIT package implements a basic IoT platform.3Copyright 2016-2018 Distributed Systems Security, Data61, CSIRO4This file is part of SMIT package.5SMIT package is free software: you can redistribute it and/or modify6it under the terms of the GNU General Public License as published by7the Free Software Foundation, either version 3 of the License, or8any later version.9SMIT package is distributed in the hope that it will be useful,10but WITHOUT ANY WARRANTY; without even the implied warranty of11MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the12GNU General Public License for more details.13You should have received a copy of the GNU General Public License14along with SMIT package. If not, see <https://www.gnu.org/licenses/>.15'''16import os17import sys18import socket19import inspect20sys.path.insert(0, '../../')21sys.path.insert(0, '../../../')22from smit import utils23import traceback24import time25from datetime import datetime26import signal27from multiprocessing import Process28import logging29import subprocess30import random31class SinkClientPlain(object):32 """33 This class implements a client which can connect and send messages to sink server via plaintext.34 This class needs a configuration file 'clien_expcnf' to configure experiment parameters.35 """36 seq = 037 utl = utils.Utils()38 config = {'SERVERIP': '', 'SERVERPORT': '', 'TIMEZONE': '', 'SYNCTIME': 0, 'REFLOWPAN': 0,39 'SYSWAIT': 0, 'PAYLOADLEN': 0, 'DATE': '', 'SENDTIME': 0, 'SENDRATE': 0, 'DEVNUM': 0}40 clientcnf = 'client_expcnf' # the path to configuration file for this client package41 package_path = '' # the path to this pakcage42 send_logger = None # a log handler for recording the information of sending messages.43 sock = 044 addr = None45 is_first_time = 1 # indicate if it is an initial time synchronization.46 def __init__(self):47 self.package_path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))48 os.chdir(self.package_path)49 self.config = self.utl.read_config(self.clientcnf, self.config)50 def install_dependencies(self):51 """Install dependencies for experiment on a device.52 """53 self.utl.call('sudo apt-get -y --force-yes install ntp ntpdate', shell=True)54 self.utl.call('sudo apt-get -y --force-yes install tcpdump', shell=True)55 self.utl.call('sudo apt-get -y --force-yes install sshpass', shell=True)56 def create_conn_log(self, dst_addr):57 """This function creates a connection log file which contains local network58 interface information and destination address.59 :param dst_addr: destination address (host, port)60 :type dst_addr: tuple61 """62 if os.path.exists('conn.log'):63 self.utl.call('sudo rm -f conn.log', shell=True)64 print 'Old connection log has been removed.'65 formatter = logging.Formatter(fmt='%(asctime)s\t\n%(message)s', datefmt='%d/%m/%Y %H:%M:%S')66 conn_logger = self.create_logger('Connection log.', 'conn.log', logging.INFO, formatter)67 p = subprocess.Popen('/sbin/ifconfig', stdout=subprocess.PIPE, shell=True)68 (out, err) = p.communicate()69 if out is None:70 out = 'No output!'71 # get local ip for 6LoWPAN72 tmp_pos = out.find('lowpan0')73 end_pos = tmp_pos + out[tmp_pos:].find(' prefixlen 64 scopeid 0x0<global>')74 begin_pos = out[:end_pos].rfind('inet6 ')75 local_ip = out[(begin_pos + 6): end_pos].strip()76 # local_ip, netmask = host.split()[1].split('/') # inet6 addr: ip/mask Scope:Global77 print ("local_ip: -> " + local_ip)78 out = out + '\nClient|' + str(local_ip)79 out = out + '\nSink|' + str(dst_addr[0]) + '.' + str(dst_addr[1])80 tail = '\n' + "#" * 40 + '\n'81 conn_log = out + tail82 conn_logger.info(conn_log)83 def create_logger(self, log_name, logfile, level, formatter=None):84 """Create and return a reference to a logger. This is used to create difference log files in different settings.85 :param log_name: the name of a log.86 :param logfile: the file name of the log.87 :param level: the log level.88 :param formatter: the format of log items.89 :type log_name: str.90 :type logfile: str.91 :type level: int.92 :type formatter: logging.Formatter93 """94 logger = logging.getLogger(log_name)95 logger.setLevel(level)96 handler = logging.FileHandler(logfile)97 handler.setFormatter(formatter)98 logger.addHandler(handler)99 return logger100 def signal_handler(self, signum, frame):101 """This function handles message sending task.102 """103 # send data in some probability (decided from configuration file) per each sending request.104 # by default, every 100 milliseconds, it tries to send a message.105 sock = self.sock # socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)106 packet_per_sec = float(self.config['SENDRATE']) / float(self.config['DEVNUM'])107 probability = int(packet_per_sec / (1.0 / float(self.config['SENDTIME'])) * 100)108 coin = random.randint(0, 99)109 if coin in range(0, probability): # coin is in {0,1,...,probability}, then send a message, otherwise, ignore.110 timestamp = datetime.now().strftime('%H%M%S%f')[:-3]111 self.seq += 1112 # create payload113 seq_str = str(self.seq).zfill(8)114 other_len = int(self.config['PAYLOADLEN']) - len(seq_str) - len(timestamp)115 if other_len < 0:116 other_len = 0117 data = seq_str + '-' * other_len + timestamp118 if data == '' or data is None:119 self.send_logger.info('DATA ERRORS: the generated date is empty or None.')120 # print self.addr121 # local_ip = "fe80::cc1c:954d:e4e6:364f"122 # sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)123 # sock.bind((local_ip, 34311, 0, 6))124 sent = sock.sendto(data, self.addr)125 # sent = 1126 if sent == 0:127 self.send_logger.info('SENDING ERRORS: sent message length is ' + str(sent) + ' bytes.')128 raise RuntimeError('Socket connection broken.')129 else:130 self.send_logger.info('Sent message length: ' + str(sent) + ' | Message content: ' + data)131 def refresh(self, timer, cmd, logger):132 """Refresh net information periodically. If the parameter num is a negative number, the function will execute in133 endless mode.134 :param timer: a timer (in seconds) which specifies a period.135 :param cmd: the scheduled command to execute.136 :param logger: a handler to record log into a file.137 :type timer: long138 :type cmd: str.139 :type logger: Logger object.140 """141 try:142 while True:143 i = 0144 while i < 30:145 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)146 (out, err) = p.communicate()147 if out is None:148 out = 'No output!'149 if err is None:150 err = 'None.'151 logger.info(out + '\nError (if any): ' + err + '\n=======================')152 if out.find('No response.') == -1:153 break154 else:155 i += 1156 time.sleep(float(timer))157 except KeyboardInterrupt:158 return159 def sync_time(self, timezone, server_addr, timer, logger):160 """Synchronize time with a specified server.161 :param timezone: local timezone.162 :param server_addr: server IP address.163 :param timer: a periodical time to do the time synchronization.164 :param logger: a handler to record log into a file.165 :type timezone: str.166 :type server_addr: str.167 :type timer: float168 :type logger: Logger object.169 """170 try:171 while True:172 print ('Time synchronizing...')173 if self.is_first_time == 1:174 self.utl.call('sudo date --set=\'' + self.config['DATE'] + '\'', shell=True)175 self.is_first_time = 0176 if not os.path.exists('/usr/share/zoneinfo/' + timezone):177 print('Error: timezone is invalid.')178 return179 self.utl.call('timedatectl set-timezone ' + timezone, shell=True)180 p = subprocess.Popen('sudo ntpdate -u ' + server_addr, stdout=subprocess.PIPE, shell=True)181 (out, err) = p.communicate()182 if err is None:183 err = 'None.'184 logger.info(out + '\nError (if any): ' + err + '\n=======================')185 print ('Time synchronization finished.')186 time.sleep(timer)187 except KeyboardInterrupt:188 return189 def run_tcpdump(self, tcpdump_cmd):190 """Run tcpdump by using the given command.191 :param tcpdump_cmd: a command to run tcpdump.192 :type tcpdump_cmd: str.193 """194 try:195 subprocess.check_call(tcpdump_cmd, shell=True)196 except KeyboardInterrupt:197 return198 except:199 raise200 def get_scope_id_inet6(self, addr):201 """This function returns the scope id of a given lladdr if the address exists.202 :param addr: IPv6 address203 :return: int. scope id of interface.204 """205 scope_id = -1206 try:207 full_addr = self.expand_address_inet6(addr)208 key = full_addr.replace(':', '')209 with open('/proc/net/if_inet6', 'r') as ifaces:210 for line in ifaces:211 if line.lower().find(key.lower()) != -1: # found the interface212 scope_id = int(line.split()[1])213 break214 return scope_id215 except:216 raise217 def expand_address_inet6(self, addr):218 """This function expand an IPv6 address to a full description.219 :param addr: an IPv6 address220 Return:221 str. - a full IPv6 address.222 """223 try:224 prefix, suffix = addr.strip().split('::')225 num_items = str(prefix).count(':') + str(suffix).count(':')226 fill_items = 7 - num_items # calculate the items missed in given ip address227 items = prefix.split(':')228 for index, item in enumerate(items):229 if len(item) < 4:230 items[index] = item.zfill(4)231 elif len(item) > 4:232 raise ValueError('Passed IPv6 address is invalid.')233 prefix = ':'.join(items)234 items = suffix.split(':')235 for index, item in enumerate(items):236 if len(item) < 4:237 items[index] = item.zfill(4)238 elif len(item) > 4:239 raise ValueError('Passed IPv6 address is invalid.')240 suffix = ':'.join(items)241 prefix += ':'242 for i in range(0, fill_items - 1):243 prefix += '0000:'244 i += 1245 return prefix + suffix246 except:247 raise ValueError('Passed IPv6 address is invalid.')248 def start(self):249 """This function start a client which can interact with a sink server via plaintext.250 This function will start sub-processes to log status, synchronize time and refresh network.251 """252 print ('Starting client ...')253 try:254 print ('Initializing program ...')255 # create a process to refresh 6LoWPAN connection periodically.256 # if REFLOWPAN <= 0, disable the refresh process.257 if int(self.config['REFLOWPAN']) > 0:258 formatter = logging.Formatter(fmt='%(asctime)s\n\t%(message)s', datefmt='%d/%m/%Y %H:%M:%S')259 rf_logger = self.create_logger('Connection refreshing log', 'rs.log', logging.INFO, formatter)260 rf = Process(target=self.refresh,261 args=(float(self.config['REFLOWPAN']), 'sudo rdisc6 lowpan0', rf_logger,))262 rf.start()263 # create a process to synchronize time periodically.264 # if SYNCTIME <= 0, disable the time synchronization.265 if int(self.config['SYNCTIME']) > 0:266 formatter = logging.Formatter(fmt='%(asctime)s\n\t%(message)s', datefmt='%d/%m/%Y %H:%M:%S')267 sync_logger = self.create_logger('Time synchronization log.', 'sync.log', logging.INFO, formatter)268 sync = Process(target=self.sync_time, args=(269 self.config['TIMEZONE'], self.config['SERVERIP'], float(self.config['SYNCTIME']), sync_logger,))270 sync.start()271 # create a process to run tcpdump272 tcpdump_lowpan = Process(target=self.run_tcpdump,273 args=('sudo nohup tcpdump -i lowpan0 -ttttnnvvS -w lowpan0.log &',))274 tcpdump_lowpan.start()275 tcpdump_wpan = Process(target=self.run_tcpdump,276 args=('sudo nohup tcpdump -i wpan0 -ttttnnvvS -w wpan0.log &',))277 tcpdump_wpan.start()278 time.sleep(15) # wait for initial time synchronization279 print ('Program initialization finished.')280 self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)281 host = self.config['SERVERIP']282 port = int(self.config['SERVERPORT'])283 print ('HOST: ' + host)284 print ('PORT: %d' % port)285 # get local ip for 6LoWPAN286 p = subprocess.Popen('/sbin/ifconfig', stdout=subprocess.PIPE, shell=True)287 (out, err) = p.communicate()288 if out is None:289 out = 'No output!'290 tmp_pos = out.find('lowpan0')291 end_pos = tmp_pos + out[tmp_pos:].find(' prefixlen 64 scopeid 0x20<link>')292 begin_pos = out[:end_pos].rfind('inet6 ')293 local_ip = out[(begin_pos + 5): end_pos].strip()294 print ("local_ip: [" + local_ip + "]")295 # local_ip = "fe80::d9eb:c371:783f:f22"296 # local_ip = "fe80::cc1c:954d:e4e6:364f"297 scope_id = self.get_scope_id_inet6(local_ip)298 print ('Scope id: %d ' % scope_id)299 # self.dw.connect((host, int(port), 0, scope_id))300 # self.sock.bind(('', 34311, 0, scope_id))301 # self.sock.bind((local_ip, 34311, 0, scope_id))302 self.sock.sendto('start', (host, int(port), 0, scope_id))303 # self.sock.bind((local_ip, 34311, 0, scope_id))304 print ('Sent')305 # Connect to sink with retransmission enabled, that is requiring acknowledgement.306 while True:307 try:308 self.sock.settimeout(2.0)309 print ('Going to receive...')310 data, self.addr = self.sock.recvfrom(1024)311 print ('Received!')312 self.create_conn_log(self.addr)313 break314 except socket.timeout:315 self.sock.sendto('start', (host, port, 0, scope_id))316 # wait a while for other devices to start the experiment.317 print ('Wait ' + self.config['SYSWAIT'] + ' seconds to start the experiment.')318 time.sleep(float(self.config['SYSWAIT']))319 print ('Start to send packages.')320 # Create a logger for recording sending information.321 formatter = logging.Formatter(fmt='%(asctime)s\t%(message)s', datefmt='%d/%m/%Y %H:%M:%S')322 self.send_logger = self.create_logger('Sending information log.', 'send.log', logging.INFO, formatter)323 signal.signal(signal.SIGALRM, self.signal_handler)324 signal.setitimer(signal.ITIMER_REAL, float(self.config['SENDTIME']), float(self.config['SENDTIME']))325 # sleep main function to let signal handlers execute326 while True:327 time.sleep(1000)328 except KeyboardInterrupt:329 pass330 except Exception as e:331 print (e)332if __name__ == '__main__':333 client = SinkClientPlain()334 client.install_dependencies()...
main.py
Source:main.py
1import logging2import os3from github import Github, enable_console_debug_logging4import util5from bot_issue_finder import find_issues6from repo_finder import find_repos7from repo_cloner import clone_repos8import pre_bot_issue_finder9import repo_analyser_v210if __name__ == "__main__":11 settings = util.load_settings('settings.json')12 util.verify_loglevels(settings.get('loglevels'))13 loglevels = settings.get('loglevels')14 logoutputs = settings.get('logoutputs')15 # General logger16 logger = util.create_logger('bot_issue_finder', loglevels.get('general'), logoutputs.get('general'))17 util.g_logger = logger18 logger.info("======SETTINGS======")19 util.verify_settings(settings)20 if settings.get('log-pygithub-requests'):21 util.load_gh_logger(settings.get('shorten-pygithub-requests'))22 # Load GitHub Login information23 login_settings = util.load_settings('login.json')24 token_or_username = login_settings.get('login_or_token')25 if token_or_username and login_settings.get('password'):26 # Someone logged in with their username/password combination27 logger.info(f"Logged in as {token_or_username}")28 elif not token_or_username:29 # No user was logged in30 logger.info("No login was made; all reqests will be anonymous (NB: Less requests can be made per minute as an anonymous user!)")31 else:32 # Token login33 logger.info("Logged in using an access token")34 base_url = login_settings.get("base_url")35 if base_url is not None and base_url != util.STANDARD_API_ENDPOINT:36 logger.info(f"Using Github Enterprise with custom hostname: {base_url}")37 else:38 logger.info(f"Using the standard API endpoint at {util.STANDARD_API_ENDPOINT}")39 # Initialize PyGithub40 github = Github(per_page=100, **login_settings)41 logger.info("====================\n")42 # Issue finder logger43 if_logger = util.create_logger('issue_finder', loglevels.get('issue_finder'), logoutputs.get('issue_finder'))44 util.g_logger = if_logger45 # Find issues46 has_already_found_repos = os.path.isfile(settings.get('results-repos-output-file'))47 if has_already_found_repos:48 # Repositories were already fetched49 if_logger.info("Repositories were already fetched. Skipping the issue fetching phase!")50 elif os.path.isfile(settings.get('results-issues-output-file')):51 # Issues were already fetched52 if_logger.info("Found an existing issues file; using that instead!")53 else:54 # Issues were not yet fetched55 find_issues(github, settings, if_logger)56 # Repo finder logger57 rf_logger = util.create_logger('repo_finder', loglevels.get('repo_finder'), logoutputs.get('repo_finder'))58 util.g_logger = rf_logger59 # Find repositories60 if has_already_found_repos:61 rf_logger.info("Found an existing repo file; using that instead!")62 else:63 was_error = find_repos(github, settings, rf_logger)64 if was_error:65 msg = "An error occurred while fetching repositories!"66 if_logger.error(msg)67 raise ValueError(msg)68 # Repo cloner logger69 rc_logger = util.create_logger('repo_cloner', loglevels.get('repo_cloner'), logoutputs.get('repo_cloner'))70 util.g_logger = rc_logger71 if not settings.get('skip-cloning'):72 # Clone repositories73 clone_repos(settings, rc_logger)74 # Pre-bot issue logger75 pef_logger = util.create_logger('pre_issue_finder', loglevels.get('pre_issue_finder'), logoutputs.get('pre_issue_finder'))76 util.g_logger = pef_logger77 if True:78 pre_bot_issue_finder.find_pre_bot_issues(settings, pef_logger)79 pre_bot_issue_finder.remove_pre_duplicates(settings, logger)80 if True:81 pre_bot_issue_finder.obtain_pre_post_data(settings, logger)82 pre_bot_issue_finder.obtain_cloned_repos(settings, logger)83 # Use the standard logger for all other tasks84 util.g_logger = logger85 # The following figure generation steps are not in a particular order.86 if False:87 repo_analyser_v2.find_usage_numbers(settings, logger)88 if False:89 repo_analyser_v2.plot_stars_forks_watchers_hist(settings, logger)90 if False:91 repo_analyser_v2.plot_stars_forks_watchers_scatter(settings, logger)92 if False:93 repo_analyser_v2.plot_commits(settings, logger)94 if False:95 repo_analyser_v2.plot_issues(settings, logger)96 if False:97 repo_analyser_v2.plot_issues_by_date(settings, logger)98 if False:99 repo_analyser_v2.plot_repo_creation_updated(settings, logger)100 if False:101 repo_analyser_v2.plot_pre_todo(settings, logger)102 if False:103 repo_analyser_v2.plot_pre_post_todo(settings, logger)104 if False:105 repo_analyser_v2.plot_pre_post_conclusion(settings, logger)106 if False:...
rf_keyword.py
Source:rf_keyword.py
1#coding: utf-82import re3import json4import copy5import traceback6from lib.robot_ext.rf_keyword.rf_logger import logger7from lib.robot_ext.rf_keyword.rf_exception import OK, FAIL, RF_KW_PARAMS_FAIL8__all__ = ["keyword", "OK", "FAIL", "logger"]9"""10def keword(name, tags=(), types=()):11 def decorator(func):12 func.robot_name = name13 func.robot_tags = tags14 func.robot_types = types15 return func16"""17def keword(name=None):18 #logger.trace("enter deco_para")19 def deco_func(func):20 #logger.info(f"enter deco_func {func}")21 def deal_params(params, params_schema):22 """23 åæ°ä¸æ->è±ææ å°ï¼æ ¡éªï¼é»è®¤å¼å¤ç24 return expect, new_params25 """26 #ä¸æåæ°è½¬æ¢ä¸ºè±æ27 new_params = {}28 expect = "æå"29 for key, value in params.items():30 if key == "ææç»æ":31 expect = value32 #å个å°è½¬æ¢ï¼æ°å
³é®åéç¨any ä»£æ¿ None33 if expect.upper() == "NONE":34 expcet = "any"35 continue36 if "_USECASE_PATH__" in key or "__USECASE_NAME__" in key or "_timeout" in key:37 continue38 param_schema_list = list(filter(lambda x: x["name"] == key, params_schema))39 if not param_schema_list:40 raise RF_KW_PARAMS_FAIL(f'åæ° name->id æ å°åºéï¼å¯è½ä¼ å
¥ä¸å¨å®ä¹èå´å
çåæ°, è¯·æ ¸å¯¹: {key}={value}')41 param_schema = param_schema_list[0]42 en_name = param_schema["id"]43 new_params[en_name] = value44 return (expect, new_new_params)45 46 def run(*arg, **params):47 pass48 49 logger.info("enter wrapper")50 logger.info("-----wrapper :before func---")51 logger.info(f"{func.__doc__}")52 logger.info(json.dumps("123", ensure_ascii=False, indent=4))53 kw_schema={"name":"æ°å¢åºå","tags":"æªå®æ"}54 wrapper_func = run55 wrapper_func.__name__ = func.__name__56 wrapper_func.__doc__ = func.__doc__57 wrapper_func.robot_name = kw_schema["name"]58 wrapper_func.robot_tags = kw_schema["tags"]59 return wrapper_func...
rf_result.py
Source:rf_result.py
1import traceback2from toolbox.robot_ext.rf_keyword.rf_logger import logger3from toolbox.robot_ext.rf_keyword.rf_exception import OK, FAIL, RF_KW_FAIL4class Result():5 # factory method to initialize the appropriate result class6 # according to the type of +result+7 @classmethod8 def build(self, result):9 if isinstance(result, OK) or isinstance(result, FAIL):10 #logger.info('å
³é®åæ§è¡å®æ¯ï¼ç»æ为ææç±»å')11 return ResultWithExpectation(result)12 elif isinstance(result, Exception):13 #logger.info('å
³é®åæ§è¡å®æ¯ï¼ç»æ为å¼å¸¸ç±»å')14 return ResultWithException(result)15 else:16 logger.info('å
³é®åæ§è¡å®æ¯ï¼ç»æ为è¿åå¼ç±»å')17 return ResultWithReturn(result)18 19class ResultWithReturn(result):20 def __init__(self, result):21 self.result = result22 def expect(self, exp="any"):23 if exp in ("any", "æå"):24 return True, self.result25 26class ResultWithExpectation(result):27 def __init__(self, result):28 self.result = result29 self.more_info = ""30 for item in result.args:31 self.more_info += f'{item}'32 self.more_info = self.more_info.strip()33 34 def expect(self, exp="any"):35 actual = str(self.result)36 if exp == "any":37 return True, f'ææ对æ¯æåï¼ææç»æ => {exp} å®é
ç»æ => {actual}; {self.more_info}'38 elif actual == exp:39 return True, f'ææ对æ¯æåï¼ææç»æ => {exp} å®é
ç»æ => {actual}; {self.more_info}'40 else:41 return False, f'ææ对æ¯å¤±è´¥ï¼ææç»æ => {exp} å®é
ç»æ => {actual}; {self.more_info}'42class ResultWithException():43 """å
³é®åæåºå¼å¸¸,ä¸ä¸æ¯RFå®ä¹çæåä¸å¤±è´¥å¼å¸¸"""44 def __init__(self, result):45 self.result = result46 def expect(self, exp="any"):47 actual = str(self.result)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!