Best Python code snippet using autotest_python
wax_server.py
Source:wax_server.py
1#!/usr/bin/python2# Copyright 2013 Google Inc. All Rights Reserved.3"""Implements the Wax Service testing API as a simple web service.4This is a basic implementation of the Wax Service in a form that lends5itself to testing in an open source release since the Wax Service is not6yet publicly accessible. It is implemented as a plain web server rather7than Google Cloud Service, but the HTTP interface is the same. Therefore,8this is suitable for testing basic HTTP interactions and end-to-end testing9for many client features.10Usage:11 wax_server [-g] [--port=<port>] [--signal_pid=<pid>]12 -g allows the server to accept connections from any IP.13 The default is only from localhost.14 --port specifies the port to listen on.15 The default is 5000.16 --signal_pid specifies a PID that should be notified with SIGUSR1 when17 the server is read.18"""19from BaseHTTPServer import BaseHTTPRequestHandler20from BaseHTTPServer import HTTPServer21import copy22import getopt23import json24import os25import re26import signal27import string28import sys29import threading30import time31_RE_SESSION_COMMAND = re.compile('^/sessions/([^/]+)/items')32_RE_ITEM_COMMAND = re.compile('^/sessions/([^/]+)/items/([^/]+)')33_JSON_CONTENT_TYPE = 'application/json'34_SERVER_SETTINGS = {'run': True}35class SessionData(object):36 """Session data maintains a list of objects for a given session id.37 These objects are thread-safe and live over the lifetime of the server.38 They are destroyed by explicitly removing the session, which is not at39 all reliable since clients are not required to do so, and may crash, etc.40 However, this is a testing server that is not expected to be run for long41 periods of time.42 A session contains a list of items where an item is a dictionary.43 Items have a 'kind' attribute to make them compatible with the Wax Service.44 The session is initialized with two items, A and B, to be compatible with45 the Wax Service.46 """47 def __init__(self):48 """Constructs a new session instance."""49 self._created = time.clock()50 self._mutex = threading.Lock()51 self._items = []52 # Add items A and B by default to mimick the service's behavior.53 self._items.append(self.AddWaxKind({'id': 'A', 'name': 'Item A'}))54 self._items.append(self.AddWaxKind({'id': 'B', 'name': 'Item B'}))55 def _FindItem(self, key):56 """Finds item with the given id, if present in the session.57 This method is not thread-safe. It relies on the caller to lock.58 Args:59 key: (string) Identifier to search for.60 Returns:61 A reference to the item or None if one is not present.62 """63 for session_item in self._items:64 if session_item['id'] == key:65 return session_item66 return None67 def AddNewItem(self, key, item):68 """Add an item to the session list if the specified identifier is unique.69 Args:70 key: (string) The item's identifier71 item: (dict) The item to add must have an 'id'' and will be given72 a 'kind' attribute.73 Returns:74 The actual values for the added item (including added values).75 None indicates a failure (because the identifier already exists)76 """77 wax = None78 self._mutex.acquire()79 if not self._FindItem(key):80 wax = self.AddWaxKind(item)81 self._items.append(copy.deepcopy(wax))82 self._mutex.release()83 return wax84 def ReplaceItem(self, key, item):85 """Inserts the item into the session, or replaces the item for identifer.86 Args:87 key: (string) The items identifier.88 item: (dict) The item to insert or replace the existing item with.89 A 'kind' attribute will be added if needed.90 Returns:91 A copy of the inserted item.92 """93 wax = self.AddWaxKind(item)94 self._mutex.acquire()95 session_item = self._FindItem(key)96 if session_item:97 self._items.remove(session_item)98 self._items.append(copy.deepcopy(wax))99 self._mutex.release()100 return wax101 def PatchItem(self, key, item):102 """Updates an existing item with the given id with the elements of item.103 This overwrites the existing data with the new data. Any old values that104 were not contained in item are preserved.105 Args:106 key: (string) The identifier to update107 item: (dict) The particular values to update.108 Returns:109 A copy of the patched item or None on falure.110 """111 wax = None112 self._mutex.acquire()113 session_item = self._FindItem(key)114 if session_item:115 self._items.remove(session_item)116 wax = dict(session_item.items() + item.items())117 self._items.append(copy.deepcopy(wax))118 self._mutex.release()119 return wax120 def DeleteItem(self, key):121 """Deletes an item from the session.122 Args:123 key: (string) The identifier of the item to delete.124 Returns:125 The deleted item or None.126 """127 self._mutex.acquire()128 session_item = self._FindItem(key)129 if session_item:130 self._items.remove(session_item)131 self._mutex.release()132 return session_item133 def GetItemCopy(self, key):134 """Get a copy of the item with the given identifier.135 This returns a copy for simple thread-safety.136 Args:137 key: (string) The identifier of the item to return138 Returns:139 None if the identifier isnt present. Otherwise a copy of the item.140 """141 result = None142 self._mutex.acquire()143 session_item = self._FindItem(key)144 if session_item:145 result = copy.deepcopy(session_item)146 self._mutex.release()147 return result148 def GetAllItemsCopy(self):149 """Returns a copy of the list of all items.150 Returns:151 A copy is returned for simple thread-safety.152 """153 self._mutex.acquire()154 result = copy.deepcopy(self._items)155 self._mutex.release()156 return result157 def AddWaxKind(self, item):158 """Helper function that adds the 'kind' attribute to an item.159 Args:160 item: (dict) The item to modify.161 Returns:162 A reference to the item with a 'kind' attribute added.163 """164 item['kind'] = 'wax#waxDataItem'165 return item166class Repository(object):167 """A repository of sessions keyed by sessionId.168 The repository is thread-safe.169 """170 def __init__(self):171 """Initializes an empty repository."""172 self._sessions = {}173 self._nodeid = str(time.clock())174 self._sequence_num = 0175 self._mutex = threading.Lock()176 def GetSessionData(self, key):177 """Returns a reference to the session with the given identifier.178 Args:179 key: (string) The session identifier.180 Returns:181 A reference to the SessionData or None.182 """183 self._mutex.acquire()184 session_data = self._sessions.get(key)185 self._mutex.release()186 return session_data187 def RemoveIdentifier(self, key):188 """Removes a session identifier and its data.189 Args:190 key: (string) The session identifier.191 Returns:192 The session data removed or None193 """194 self._mutex.acquire()195 session_data = self._sessions.pop(key, None)196 self._mutex.release()197 return session_data198 def NewIdentifier(self, basename):199 """Adds a new empty session and gives it a new unique identifier.200 Args:201 basename: (string) The prefix for the generated identifier.202 Returns:203 The generated identifier to refer to the SessionData in the future.204 """205 self._mutex.acquire()206 self._sequence_num += 1207 key = '%s-%s-%s' % (basename, self._nodeid, self._sequence_num)208 self._sessions[key] = SessionData()209 self._mutex.release()210 return key211REPOSITORY_ = Repository()212class WaxHandler(BaseHTTPRequestHandler):213 """Implements a Wax Service interface for the web server."""214 def _SendResponse(self, payload, http_code, content_type='text/plain'):215 """Send HTTP response.216 Args:217 payload: (string) The HTTP response body.218 http_code: (int) The HTTP response status code.219 content_type: (string) The HTTP Content-Type header.220 Returns:221 The http_code.222 """223 self.send_response(http_code)224 self.send_header('Content-Type', content_type)225 self.end_headers()226 self.wfile.write(payload)227 return http_code228 def _SendJsonObjectResponse(self, obj, http_code):229 """Sends object as a JSON-encoded HTTP response.230 Args:231 obj: (dict) The object to return232 http_code: (int) The HTTP response status code.233 Returns:234 The http_code.235 """236 return self._SendResponse(237 json.dumps(obj), http_code, content_type=_JSON_CONTENT_TYPE)238 def _SendJsonErrorResponse(self, msg, http_code):239 """Sends a JSON a JSON-encoded HTTP response containing the message.240 Args:241 msg: (sting) The message to send (explaining error).242 http_code: (int) The HTTP response status code.243 Returns:244 The http_code.245 """246 return self._SendResponse(247 json.dumps({'message': msg, 'code': http_code}),248 http_code,249 content_type=_JSON_CONTENT_TYPE)250 def _ProcessNewSessionCommand(self):251 """Adds a new session and sends a response.252 Responds with JSON containing a newSessionId attribute.253 Returns:254 The final HTTP status code.255 """256 request_json = self._GetJson()257 if not request_json:258 return self._SendJsonErrorResponse('Invalid JSON', 400)259 key = REPOSITORY_.NewIdentifier(request_json['sessionName'])260 return self._SendJsonObjectResponse(261 {'kind': 'wax#waxNewSession', 'newSessionId': key}, 200)262 def _ProcessRemoveSessionCommand(self):263 """Removes the session and sends a response.264 Responds with JSON containing a removeSessionId attribute.265 Returns:266 The final HTTP status code.267 """268 request_json = self._GetJson()269 if not request_json:270 return self._SendJsonErrorResponse('Invalid JSON', 400)271 key = request_json['sessionId']272 old_item = REPOSITORY_.RemoveIdentifier(key)273 if not old_item:274 return self._SendJsonErrorResponse('Unknown sessionId', 404)275 return self._SendJsonObjectResponse(276 {'kind': 'wax#waxRemoveSession', 'removeSessionId': key}, 200)277 def _ProcessGetSessionItemsCommand(self, session_data):278 """Sends a JSON response with the session items.279 Args:280 session_data: (SessionData) The SessionData instance281 Returns:282 The final HTTP status code.283 """284 items_copy = session_data.GetAllItemsCopy()285 return self._SendJsonObjectResponse(286 {'kind': 'wax#waxList', 'items': items_copy}, 200)287 def _ProcessInsertNewItemCommand(self, session_data):288 """Adds the item specified by the JSON payload and send a JSON response.289 Args:290 session_data: (SessionData) The SessionData instance291 Returns:292 The final HTTP status code.293 """294 request_json = self._GetJson()295 # TODO(user): 20130624296 # Distinguish {} from no payload.297 if not request_json:298 return self._SendJsonErrorResponse('Invalid JSON', 400)299 item_id = request_json.get('id', None)300 if not item_id:301 return self._SendJsonErrorResponse('JSON missing id', 400)302 added = session_data.AddNewItem(item_id, request_json)303 if added:304 time.sleep(3.0 / 1000.0) # delay 3 ms so we can test timeouts305 return self._SendJsonObjectResponse(added, 200)306 else:307 return self._SendJsonErrorResponse('Item already exists', 403)308 def _ProcessGetItemCommand(self, session_data, item_id):309 """Returns a JSON response with the specified item.310 Args:311 session_data: (SessionData) The SessionData instance312 item_id: (string) The item id to return313 Returns:314 The final HTTP status code.315 """316 item_copy = session_data.GetItemCopy(item_id)317 if item_copy:318 return self._SendJsonObjectResponse(item_copy, 200)319 return self._SendJsonErrorResponse('Unknown item', 404)320 def _ProcessDeleteItemCommand(self, session_data, item_id):321 """Deletes the specified item and sends a response.322 Args:323 session_data: (SessionData) The SessionData instance.324 item_id: (string) The item id to return.325 Returns:326 The final HTTP status code.327 """328 if session_data.DeleteItem(item_id):329 return self._SendResponse('', 204)330 else:331 return self._SendJsonErrorResponse('Unknown item in session', 404)332 def _ProcessPatchItemCommand(self, session_data, item_id, request_json):333 """Patches the specified item with the JSON payload and responds.334 Args:335 session_data: (SessionData) The SessionData instance.336 item_id: (string) The item id to return.337 request_json: (dict) The value used to patch338 Returns:339 The final HTTP status code.340 """341 patched_item = session_data.PatchItem(item_id, request_json)342 if patched_item:343 return self._SendJsonObjectResponse(patched_item, 200)344 else:345 return self._SendJsonErrorResponse('Unknown item in session', 404)346 def _ProcessUpdateItemCommand(self, session_data, item_id, request_json):347 """Replaces the specified item with the JSON payload and responds.348 Args:349 session_data: (SessionData) The SessionData instance.350 item_id: (string) The item id to return.351 request_json: (dict) The replacement value.352 Returns:353 The final HTTP status code.354 """355 request_json.setdefault('id', item_id)356 if request_json['id'] != item_id:357 return self._SendJsonErrorResponse('Mismatched item ids', 400)358 wax = session_data.ReplaceItem(item_id, request_json)359 return self._SendJsonObjectResponse(wax, 200)360 def _ReadChunkedPayload(self):361 """Reads a payload sent with a chunked Transfer-Encoding.362 Raises:363 ValueError: if the payload was not properly chunk encoded.364 Returns:365 The decoded payload string.366 """367 payload = []368 while True:369 chunk_len = 0370 while True:371 hex_digit = self.rfile.read(1)372 if hex_digit in string.hexdigits:373 chunk_len = 16 * chunk_len + int(hex_digit, 16)374 elif hex_digit != '\r' or self.rfile.read(1) != '\n':375 raise ValueError('Expected \\r\\n termination')376 else:377 break378 if chunk_len == 0:379 break380 payload.append(self.rfile.read(chunk_len))381 return ''.join(payload)382 def _GetJson(self):383 """Reads JSON object from payload.384 Returns:385 JSON decoded object386 """387 encoding = self.headers.getheader('Transfer-Encoding')388 if encoding == 'chunked':389 payload = self._ReadChunkedPayload()390 else:391 length = int(self.headers.getheader('Content-Length'))392 payload = self.rfile.read(length)393 if (not payload394 or self.headers.getheader('Content-Type') != _JSON_CONTENT_TYPE):395 return None396 json_item = json.loads(payload)397 if not json_item:398 return None399 return json_item400 def _DispatchMethod(self, method):401 """Executes WAX method and sends response.402 Args:403 method: (string) The HTTP method type received.404 Returns:405 The response HTTP status code sent.406 """407 if self.path == '/quit':408 _SERVER_SETTINGS['run'] = False409 return self._SendResponse('BYE', 200)410 if self.path == '/newsession' and method == 'POST':411 return self._ProcessNewSessionCommand()412 if self.path == '/removesession' and method == 'POST':413 return self._ProcessRemoveSessionCommand()414 match = _RE_ITEM_COMMAND.match(self.path)415 if match:416 self._HandleItemMethod(method, match.group(1), match.group(2))417 return418 match = _RE_SESSION_COMMAND.match(self.path)419 if match:420 self._HandleSessionMethod(method, match.group(1))421 return422 self._SendJsonErrorResponse('URL Not Available', 404)423 def _HandleSessionMethod(self, method, session_id):424 """Executes method on wax sessions resource and sends response.425 Args:426 method: (string) HTTP method type.427 session_id: (string) Wax session identifier.428 Returns:429 HTTP status code.430 """431 session_data = REPOSITORY_.GetSessionData(session_id)432 if not session_data:433 self._SendJsonErrorResponse('Unknown sessionId', 404)434 return435 if method == 'GET':436 return self._ProcessGetSessionItemsCommand(session_data)437 if method == 'POST':438 return self._ProcessInsertNewItemCommand(session_data)439 return self._SendJsonErrorResponse('Unhandled method', 405)440 def _HandleItemMethod(self, method, session_id, item_id):441 """Executes method on wax items resource and sends response.442 Args:443 method: (string) HTTP method type.444 session_id: (string) Wax session identifier.445 item_id: (string) Wax item identifier.446 Returns:447 HTTP status code.448 """449 session_data = REPOSITORY_.GetSessionData(session_id)450 if not session_data:451 return self._SendJsonErrorResponse('Unknown sessionId', 404)452 if method == 'GET':453 return self._ProcessGetItemCommand(session_data, item_id)454 if method == 'DELETE':455 return self._ProcessDeleteItemCommand(session_data, item_id)456 # TODO(user): 20130624457 # Distinguish {} from no payload.458 request_json = self._GetJson()459 if not request_json:460 return self._SendJsonErrorResponse('Invalid JSON', 400)461 if method == 'PATCH':462 return self._ProcessPatchItemCommand(session_data, item_id, request_json)463 if method == 'PUT':464 return self._ProcessUpdateItemCommand(session_data, item_id, request_json)465 return self._SendJsonErrorResponse('Unhandled method', 405)466 def do_GET(self): # pylint: disable=g-bad-name467 self._DispatchMethod('GET')468 def do_DELETE(self): # pylint: disable=g-bad-name469 self._DispatchMethod('DELETE')470 def do_PATCH(self): # pylint: disable=g-bad-name471 self._DispatchMethod('PATCH')472 def do_POST(self): # pylint: disable=g-bad-name473 self._DispatchMethod('POST')474 def do_PUT(self): # pylint: disable=g-bad-name475 self._DispatchMethod('PUT')476def main(argv):477 """Runs the program."""478 try:479 opts = getopt.getopt(sys.argv[1:], '-g', ['port=', 'signal_pid='])[0]480 except getopt.GetoptError:481 print '%s: [-g] [--port=<port>] [--signal_pid=<pid>]' % argv[0]482 sys.exit(1)483 signal_pid = 0484 host = '127.0.0.0'485 port = 5000486 for key, value in opts:487 if key == '--port':488 port = int(value)489 print 'Running on port: %d' % port490 elif key == '-g':491 host = '0.0.0.0'492 elif key == '--signal_pid':493 signal_pid = int(value)494 if host != '0.0.0.0':495 print 'Only available on localhost (-g not provided)'496 server = HTTPServer((host, port), WaxHandler)497 print 'Started WaxServer on %s:%s' % (host, port)498 if signal_pid != 0:499 print 'Server ready -- signaling %d' % signal_pid500 os.kill(signal_pid, signal.SIGUSR1)501 while _SERVER_SETTINGS['run']:502 server.handle_request()503if __name__ == '__main__':...
syn
Source:syn
...79 time.sleep(1)80 list_processes()81 # Very important to exit quickly, otherwise a ^C sent to us will also kill our child82 exit(0)83def signal_pid(pid, sig, english):84 account = account_from_pid(pid)85 if account is None:86 print("No such PID")87 else:88 os.kill(pid, sig)89 print(english,account)90 open(HISTORY_FILE,"at").write(time.ctime() + " " + english + " " + account + "\n")91 time.sleep(1)92 list_processes()93# We don't use RX, for now94g_connected_account = None95def command_response(packet):96 # Is it for us?97 if g_connected_account is None:98 return99 if not "headers" in packet:100 return101 if not "Instancename" in packet["headers"]:102 return103 if packet["headers"]["Instancename"] != g_connected_account:104 return105 if not "action" in packet:106 return107 if packet["action"] != "response":108 return109 print(packet["response"] + "\n> ",end="",flush=True)110def connect(pid):111 global g_connected_account112 zeromq_tx.init(emit_logging=False) # Don't clutter-up screen with logging113 # zeromq_rx.init(command_response, emit_logging=False)114 g_connected_account = account_from_pid(pid)115 if g_connected_account is None:116 print("No such PID")117 return118 print("Connected to",g_connected_account, "- type 'exit'<return> to disconnect")119 zeromq_tx.socket_send( { "action" : "none" } ) # First packet seems to be ignored120 while True:121 inp = input("> ")122 if inp in ["exit","quit","disconnect","stop","leave"]:123 break124 inp = inp.strip()125 if len(inp) > 0:126 inp = inp.split(" ")127 packet = { "headers" : { "Instancename" : g_connected_account }, "action" : "command", "argv" : inp }128 zeromq_tx.socket_send(packet)129 print("Disconnected")130if __name__ == "__main__":131 if len(sys.argv) < 2:132 list_processes()133 print("\nFor available commands type '"+sys.argv[0]+" help'")134 else:135 if sys.argv[1] == "list":136 list_processes()137 elif sys.argv[1] == "watch":138 watch(int(sys.argv[2]))139 elif sys.argv[1] == "run":140 run(sys.argv[2], sys.argv[3])141 elif sys.argv[1] == "kill":142 signal_pid(int(sys.argv[2]), signal.SIGKILL, "Killed")143 elif sys.argv[1] == "pause":144 signal_pid(int(sys.argv[2]), signal.SIGUSR1, "Paused")145 elif sys.argv[1] == "unpause":146 signal_pid(int(sys.argv[2]), signal.SIGUSR2, "Unpaused")147 elif sys.argv[1] == "connect":148 connect(int(sys.argv[2]))149 elif sys.argv[1] == "history":150 print(open(HISTORY_FILE,"rt").read())151 elif sys.argv[1] == "help":152 print("Commands:")153 print(" list List all synth jobs, with their PID")154 print(" run <account> <scenario> Run a synth job. <account> is one of ../synth_accounts and <scenario> is one of ./scenarios e.g. 'run OnFStest 10secs'")155 print(" watch <pid> Monitor the output from a synth job")156 print(" kill <pid> Kill a synth job")157 print(" pause <pid> Pause a synth job")158 print(" unpause <pid> Unpause a synth job")159 print(" connect <pid> Connect to a synth job and send commands")160 print(" history List all recent activities")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!