Best Python code snippet using autotest_python
nxagent-helper
Source:nxagent-helper
1#!/usr/bin/python2.42#3# Copyright 2007 Google Inc.4#5# This program is free software; you can redistribute it and/or6# modify it under the terms of the GNU General Public License7# as published by the Free Software Foundation; either version 28# of the License, or (at your option) any later version.9#10# This program is distributed in the hope that it will be useful,11# but WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the13# GNU General Public License for more details.14#15# You should have received a copy of the GNU General Public License16# along with this program; if not, write to the Free Software17# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.18#19# Author: diamond@google.com (Stephen Shirley)20"""nxserver program for accepting nx connections.21"""22import os23import re24import signal25import subprocess26import sys27import time28import traceback29sys.path.append('/usr/freenx/lib')30import nxlog31import nxloadconfig32prog_name = "nxagent-helper"33_COMMANDS = ( "start", "resume" )34state_lines = { 'starting': re.compile(r'^Session: Starting session at '),35 'running': re.compile(r'^Session: Session (started|resumed) at '),36 'suspending': re.compile(r'^Session: Suspending session at '),37 'suspended': re.compile(r'^Session: Session suspended at '),38 'terminating': re.compile(r'^Session: (Terminat|Abort)ing session at '),39 'terminated': re.compile(r'^Session: Session (terminat|abort)ed at ')40 }41info_lines = { 'watchdog_pid':42 re.compile(r"^Info: Watchdog running with pid '(?P<pid>\d+)'."),43 'kill_watchdog':44 re.compile(r"^Info: Waiting the watchdog process to complete."),45 'agent_pid':46 re.compile(r"^Info: Agent running with pid '(?P<pid>\d+)'."),47 'general_error':48 re.compile(r"^Error: (?P<error>.*)$")49 }50options = {}51def main():52 """Do setup, then read and handle command"""53 basic_setup()54 handle_command(*read_command()) # Expand the list passed back into two55 # seperate arguments.56def basic_setup():57 """Setup logging, read configuration"""58 nxlog.setup(prog_name)59 nxlog.log(nxlog.LOG_DEBUG, "started with pid %d\n" % os.getpid())60 nxloadconfig.setup(prog_name)61 level = nxloadconfig.conf.get('LOG_LEVEL', '5') # Default to LOG_NOTICE62 try:63 nxlog.set_log_level(level)64 except ValueError:65 nxlog.log(nxlog.LOG_ERR, "Invalid log level: %s\n" % level)66 nxlog.log(nxlog.LOG_INFO, "config parsed\n")67def read_command():68 """Read a single command from stdin, check that it's valid, return it."""69 raw_cmd = raw_input()70 cmd = raw_cmd.split()71 if len(cmd) < 2:72 nxlog.log(nxlog.LOG_CRIT, "Command has invalid format: %s\n" %73 repr(raw_cmd))74 sys.exit(1)75 if cmd[0].lower() not in _COMMANDS:76 nxlog.log(nxlog.LOG_CRIT, "Unknown command given: %s\n" % repr(raw_cmd))77 sys.exit(1)78 return cmd[0].lower(), cmd[1]79def handle_command(cmd, sessionid):80 """Handle the commands received."""81 global options82 nxlog.log(nxlog.LOG_DEBUG, "command: %s %s\n" % (cmd, sessionid))83 #FIXME: should be using a better, non-hardcoded location84 if cmd == 'start':85 nxlog.log(nxlog.LOG_NOTICE, "Starting session %s\n" % sessionid)86 session_dir = '/tmp/nx/S-%s' % sessionid87 try:88 args = [line[:-1] for line in89 open('/tmp/nx/S-%s/args' % sessionid).readlines()]90 nxlog.log(nxlog.LOG_NOTICE, "Session args: %s\n" % repr(args))91 # Open the session log to pass to nxagent92 nxagent_log = open(os.path.join(session_dir, 'session.log'), 'w')93 options_path = os.path.join(session_dir, 'options')94 opt_read(options_path)95 except IOError, e:96 nxlog.log(nxlog.LOG_ERR, "Session file error: %s\n" % e)97 sys.exit(1)98 options['dir'] = session_dir99 cleanup()100 os.environ['LD_LIBRARY_PATH'] = '/usr/NX/lib'101 os.environ['DISPLAY'] = 'nx/nx,options=%s:%s' % \102 (options_path, options['display_num'])103 os.environ['X_AUTHORITY'] = os.path.join(session_dir, 'authority')104 setup_xauth()105 options['p'] = subprocess.Popen(['/usr/freenx/bin/nxagent'] + args,106 close_fds=True, stdin=None, stdout=nxagent_log,107 stderr=subprocess.STDOUT)108 options['agent_pid'] = options['p'].pid109 write_agent_pid()110 nxlog.log(nxlog.LOG_NOTICE, "Spawned nxagent pid %d\n" % options['p'].pid)111 # Don't want to propogate this to other children.112 del os.environ['LD_LIBRARY_PATH']113 os.environ['DISPLAY'] = ':%s' % options['display_num']114 nxagent_log = open(os.path.join(session_dir, 'session.log'), 'r')115 follow_log(nxagent_log)116 elif cmd == 'resume':117 nxlog.log(nxlog.LOG_NOTICE, "Resuming session %s\n" % sessionid)118 nxlog.log(nxlog.LOG_CRIT, "Session resumption not yet supported\n")119 sys.exit(1)120def follow_log(log):121 """Follow and parse the log of nxagent.122 FIXME: Keeps track of the state of nxagent, performs actions on certain123 state changes.124 """125 try:126 nxlog.log(nxlog.LOG_DEBUG, "Following agent log\n")127 while True:128 line = log.readline()129 if not line:130 if options['p'].poll() is not None:131 ret = options['p'].returncode132 if ret < 0:133 nxlog.log(nxlog.LOG_NOTICE, "Nxagent has exited, "134 "killed by signal %d\n" % -ret)135 else:136 nxlog.log(nxlog.LOG_NOTICE, "Nxagent has exited: %s\n" % ret)137 break138 time.sleep(0.5)139 continue140 line = line.rstrip() # Remove trailing newline141 if match_state(line):142 nxlog.log(nxlog.LOG_DEBUG, "Matched state: %s\n" % line)143 elif match_info(line):144 nxlog.log(nxlog.LOG_DEBUG, "Matched info: %s\n" % line)145 # If any problems occur, we want to cleanup first. When we're done cleaning146 # up, re-raise the exception.147 except:148 nxlog.log(nxlog.LOG_ERR, "Got exception, cleaning up\n")149 print "NX> 1009 Session status: terminated"150 cleanup()151 raise152 nxlog.log(nxlog.LOG_DEBUG, "Finished following agent log\n")153def match_state(line):154 """Try match the given line against a session state155 Return:156 True: the line matched157 False: the line didn't match158 """159 # Faking a static variable160 old_state = match_state.cur_state = getattr(match_state, 'cur_state', None)161 if not line.startswith('Session: '):162 return False163 for state, rx in state_lines.iteritems():164 if rx.search(line):165 match_state.cur_state = state166 handle_state_change(match_state.cur_state, old_state)167 return True168 return False169def match_info(line):170 """Try match the given line against an info line regex171 172 Return:173 True: the line matched174 False: the line didn't match175 """176 for info, rx in info_lines.iteritems():177 m = rx.search(line)178 if m:179 handle_info(info, m)180 return True181 return False182def handle_state_change(cur_state, old_state):183 """Compare the current state to the previous state, and handle as 184 appropriate185 186 Args:187 cur_state: current state name188 old_state: previous state name189 Return:190 None191 """192 if cur_state == old_state:193 return194 nxlog.log(nxlog.LOG_NOTICE,"Nxagent state was: %s Now: %s\n" % \195 (old_state, cur_state))196 print "NX> 1009 Session status: %s" % cur_state197 sys.stdout.flush()198 if cur_state == 'starting' and old_state is None:199 start_app()200 elif cur_state == 'terminated' and old_state != 'terminated':201 nxlog.log(nxlog.LOG_NOTICE, "Nxagent finished, cleaning up")202 cleanup()203def handle_info(info, m):204 """Execute the required response to a given info line match205 206 Args:207 info: The id of the info line208 m: The match object from the info line's regex209 Return:210 None211 """212 if info == 'watchdog_pid':213 options['watchdog_pid'] = int(m.group('pid'))214 nxlog.log(nxlog.LOG_NOTICE, "matched info watchdog, pid %s\n" %215 options['watchdog_pid'])216 elif info == 'agent_pid':217 real_agent_pid = int(m.group('pid'))218 nxlog.log(nxlog.LOG_NOTICE, "matched info agent_pid, pid %s\n" %219 real_agent_pid)220 if options['agent_pid'] != real_agent_pid:221 # Probably caused by nxagent being a shell script222 nxlog.log(nxlog.LOG_WARNING, "Agent pid (%s) doesn't match "223 "spawned pid (%s)\n" % (options['agent_pid'], real_agent_pid))224 options['agent_pid'] = real_agent_pid225 write_agent_pid()226 elif info == 'kill_watchdog':227 if 'watchdog_pid' not in options:228 nxlog.log(nxlog.LOG_ERR, "matched info kill_watchdog, "229 "but no known watchdog pid\n")230 else:231 try:232 os.kill(options['watchdog_pid'], signal.SIGTERM)233 except OSError, (errno, strerror):234 nxlog.log(nxlog.LOG_WARNING, "matched info kill_watchdog, "235 "got error from kill[%d]: %s\n" % (errno, strerror))236 else:237 nxlog.log(nxlog.LOG_NOTICE, "matched info kill_watchdog, sent TERM.\n")238 elif info == 'general_error':239 nxlog.log(nxlog.LOG_ERR, "Agent error: %s" % m.group('error'))240 else:241 # If none of the above handers match...242 nxlog.log(nxlog.LOG_ERR, "matched info %s, but failed to "243 "find handler for it\n" % info)244def opt_read(opt_path):245 """Parse the nxagent options file into a dict."""246 global options247 options = {}248 #FIXME: this needs to be much more robust249 opt_str = open(opt_path).read().rstrip()250 opt_str, options['display_num'] = opt_str.rsplit(':', 1)251 for pair in opt_str.split(','):252 if pair == 'nx/nx': continue253 name, val = pair.split('=')254 options[name] = val255def setup_xauth():256 """Setup Xauthority file using session cookie."""257 global options258 os.system('xauth add localhost:%(display_num)s MIT-MAGIC-COOKIE-1 '259 '%(cookie)s &> $X_AUTHORITY.log' % options)260 os.system('xauth add :%(display_num)s MIT-MAGIC-COOKIE-1 %(cookie)s' % \261 options)262def start_app():263 """Start the session application (startkde/gnome-session/etc)."""264 global options265 if options['type'] in ('gnome', 'kde'):266 app = nxloadconfig.conf['COMMAND_START_%s' % options['type'].upper()]267 elif options['type'] == 'application':268 app = options['application']269 assert(app) #FIXME(diamond): handle this better270 cmd = 'nxstart %d %s &' % (options['agent_pid'], app)271 nxlog.log(nxlog.LOG_INFO, "Nxagent ready, launching app: %s" % cmd)272 log = open(os.path.join(options['dir'], 'app.log'), 'w')273 s = subprocess.Popen(cmd, shell=True, close_fds=True,274 stdin=None, stdout=log, stderr=subprocess.STDOUT)275 options['app_pid'] = s.pid276 #FIXME(diamond): check for errors here277def cleanup():278 """Cleanup after various things (X11 locks etc)."""279 global options280 for proc in 'watchdog', 'agent', 'app':281 pid_name = "%s_pid" % proc282 if pid_name in options:283 try:284 os.kill(options[pid_name], signal.SIGTERM)285 except OSError, (errno, strerror):286 nxlog.log(nxlog.LOG_WARNING, "Cleanup handler got error %d when "287 "killing %s: %s\n" % (errno, proc, strerror))288 else:289 nxlog.log(nxlog.LOG_WARNING, "Cleanup handler sent TERM to %s\n" % proc)290 for f in ['/tmp/.X%(display_num)s-lock' % options,291 '/tmp/.X11-unix/X%(display_num)s' % options]:292 if os.path.exists(f):293 os.remove(f)294def write_agent_pid():295 """Send the agent pid to nxserver-inner"""296 global options297 sys.stdout.write('NX> 8888 agentpid: %s\n' % options['agent_pid'])298 sys.stdout.flush()299if __name__ == '__main__':300 try:301 main()302 except SystemExit, e:303 sys.exit(e.code)304 except Exception, e:305 trace = traceback.format_exc()306 nxlog.log(nxlog.LOG_ERR, 'Going down because exception caught '307 'at the top level.')308 for line in trace.split('\n'):309 nxlog.log(nxlog.LOG_ERR, '%s' % line)...
watchdog.py
Source:watchdog.py
...26 time.sleep(self.watchdog_timeout_sec)27 else:28 logger.log('DEBUG', 'Watchdog disabled')29 break30 def kill_watchdog(self):31 """ Permanently halts operation of the watchdog task """32 self.watchdog_timeout_sec = 033 def watchdog_task(self):34 """ Called once each watchdog period """35 logger.log('DEBUG', f'Watchdog({self.watchdog_name}) Timeout(sec)={self.watchdog_timeout_sec}')36if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!