Best Python code snippet using fMBT_python
server.py
Source:server.py
...48 datetime.datetime(1970, 1, 1)).total_seconds(),)49 else:50 rv = datetime.datetime.now().strftime("%s.%f")51 return rv52def daemon_log(msg):53 if opt_debug_limit >= 0:54 if len(msg) > opt_debug_limit:55 msg = (msg[:opt_debug_limit/2] +56 ("...[%s B, log CRC %s]..." % (len(msg), messages.crc(msg))) +57 msg[-opt_debug_limit/2:])58 formatted_msg = "%s %s\n" % (timestamp(), msg)59 if opt_log_fd != None:60 os.write(opt_log_fd, formatted_msg)61 if has_os_fdatasync:62 os.fdatasync(opt_log_fd)63 if opt_debug and opt_debug_limit != 0:64 sys.stdout.write(formatted_msg)65 sys.stdout.flush()66def code2string(code):67 return "\n".join(68 ["%-4s %s" % (li+1, l) for li, l in enumerate(code.splitlines())])69def exception2string(exc_info):70 return ''.join(traceback.format_exception(*exc_info))71def _store_return_value(func, queue):72 while True:73 queue.put(func())74def _read_lines_from_stdin(queue):75 while True:76 line = sys.stdin.readline()77 if not line:78 break79 queue.put(line)80 daemon_log("stdin closed")81class Pythonshare_ns(object):82 """Pythonshare services inside a namespace83 """84 def __init__(self, ns):85 self.ns = ns86 self._on_disconnect = []87 self._on_drop = []88 def ns_type(self, ns):89 """Query the type of a namespace.90 Returns "local" or "remote" if namespace exists, otherwise None.91 """92 if ns in _g_local_namespaces:93 return "local"94 elif ns in _g_remote_namespaces:95 return "remote"96 else:97 return None98 def local_nss(self):99 """List local namespaces100 """101 return _g_local_namespaces.keys()102 def remote_nss(self, ls_opts={}):103 """List remote namespaces104 """105 if "ip" in ls_opts and ls_opts["ip"] == True:106 key_peername = {}107 for k in _g_remote_namespaces.keys():108 try:109 key_peername[k] = _g_remote_namespaces[k].conn.getpeername()110 except Exception:111 key_peername[k] = ("?", "?")112 return key_peername113 return _g_remote_namespaces.keys()114 def on_disconnect(self):115 """Return codes that will be executed when a client has disconnected.116 """117 return self._on_disconnect118 def on_drop(self):119 """Return codes that will be executed when the namespace is dropped.120 """121 return self._on_drop122 def exec_on_disconnect(self, code, any_connection=False):123 """Add code that will be executed when client has disconnected.124 """125 if not any_connection:126 conn_id = _g_executing_pythonshare_conn_id127 else:128 conn_id = None129 self._on_disconnect.append((conn_id, code))130 def exec_on_drop(self, code):131 """Add code that will be executed when namespace is dropped.132 """133 self._on_drop.append(code)134 def set_on_disconnect(self, list_of_code):135 """Replace all "on disconnect" codes with new list of codes.136 """137 self._on_disconnect = list_of_code138 def set_on_drop(self, list_of_code):139 """Replace all "on drop" codes with new list of codes."""140 self._on_drop = list_of_code141 def call_on_disconnect(self, conn_id):142 for setter_conn_id, code in self._on_disconnect:143 if not setter_conn_id or setter_conn_id == conn_id:144 exec_msg = messages.Exec(self.ns, code, None)145 if opt_debug:146 daemon_log("on disconnect %s: %s" % (conn_id, exec_msg,))147 rv = _local_execute(exec_msg)148 if opt_debug:149 daemon_log("on disconnect rv: %s" % (rv,))150 if setter_conn_id == conn_id:151 self._on_disconnect.remove((conn_id, code))152 def call_on_drop(self):153 for code in self._on_drop:154 exec_msg = messages.Exec(self.ns, code, None)155 if opt_debug:156 daemon_log("on drop: %s" % (exec_msg,))157 rv = _local_execute(exec_msg)158 if opt_debug:159 daemon_log("on drop rv: %s" % (rv,))160 def read_rv(self, async_rv):161 """Return and remove asynchronous return value.162 """163 if self.ns != async_rv.ns:164 raise ValueError("Namespace mismatch")165 if (async_rv.ns in _g_async_rvs and166 async_rv.rvid in _g_async_rvs[async_rv.ns]):167 rv = _g_async_rvs[async_rv.ns][async_rv.rvid]168 if not isinstance(rv, pythonshare.InProgress):169 del _g_async_rvs[async_rv.ns][async_rv.rvid]170 return rv171 else:172 raise ValueError('Invalid return value id: "%s"'173 % (async_rv.rvid,))174 def poll_rvs(self):175 """Returns list of Async_rv instances that are ready for reading.176 """177 rv = []178 for rvid, value in _g_async_rvs[self.ns].iteritems():179 if not isinstance(value, pythonshare.InProgress):180 rv.append(messages.Async_rv(self.ns, rvid))181 return rv182class Pythonshare_rns(object):183 """Remote namespace"""184 def __init__(self, conn, to_remote, from_remote):185 self.conn = conn186 self.to_remote = to_remote187 self.from_remote = from_remote188 def __del__(self):189 pythonshare._close(self.conn, self.to_remote, self.from_remote)190_g_local_namespaces = {}191# client-id -> set of namespaces192_g_namespace_users = {}193_g_executing_pythonshare_conn_id = None194# _g_remote_namespaces: namespace -> Connection to origin195_g_remote_namespaces = {}196# _g_namespace_exports: namespace -> list of Connections to which the197# namespace (remote or local) has been exported. If the namespace is198# deleted (or connection to origin is lost), these Connection objects199# are to be notified.200_g_namespace_exports = {}201_g_local_namespace_locks = {}202_g_async_rvs = {}203_g_async_rv_counter = 0204_g_server_shutdown = False205def _init_local_namespace(ns, init_code=None, force=False):206 if not ns in _g_local_namespaces:207 if opt_allow_new_namespaces or force:208 daemon_log('added local namespace "%s"' % (ns,))209 _g_local_namespaces[ns] = {210 "pythonshare_ns": Pythonshare_ns(ns),211 "Async_rv": pythonshare.messages.Async_rv212 }213 _g_local_namespace_locks[ns] = thread.allocate_lock()214 _g_async_rvs[ns] = {}215 else:216 raise ValueError('Unknown namespace "%s"' % (ns,))217 if init_code != None:218 if isinstance(init_code, basestring):219 try:220 exec init_code in _g_local_namespaces[ns]221 except Exception, e:222 daemon_log('namespace "%s" init error in <string>:\n%s\n\n%s' % (223 ns, code2string(init_code), exception2string(sys.exc_info())))224 elif isinstance(init_code, dict):225 # Directly use the dictionary (locals() or globals(), for226 # instance) as a Pythonshare namespace.227 clean_ns = _g_local_namespaces[ns]228 _g_local_namespaces[ns] = init_code229 _g_local_namespaces[ns].update(clean_ns) # copy pythonshare defaults230 else:231 raise TypeError("unsupported init_code type")232def _drop_local_namespace(ns):233 daemon_log('drop local namespace "%s"' % (ns,))234 _g_local_namespaces[ns]["pythonshare_ns"].call_on_drop()235 del _g_local_namespaces[ns]236 del _g_local_namespace_locks[ns]237 del _g_async_rvs[ns]238 # send notification to all connections in _g_namespace_exports[ns]?239def _drop_remote_namespace(ns):240 daemon_log('drop remote namespace "%s"' % (ns,))241 try:242 rns = _g_remote_namespaces[ns]243 del _g_remote_namespaces[ns]244 rns.__del__()245 except KeyError:246 pass # already dropped247 # send notification to all connections in _g_namespace_exports[ns]?248def _init_remote_namespace(ns, conn, to_remote, from_remote):249 if ns in _g_remote_namespaces:250 raise ValueError('Remote namespace "%s" already registered' % (251 ns,))252 daemon_log('added remote namespace "%s", origin "%s"' % (253 ns, conn.getpeername()))254 _g_remote_namespaces[ns] = Pythonshare_rns(conn, to_remote, from_remote)255def _register_exported_namespace(ns, conn):256 if not ns in _g_namespace_exports:257 _g_namespace_exports[ns] = []258 _g_namespace_exports[ns].append(conn)259def _local_execute(exec_msg, conn_id=None):260 global _g_executing_pythonshare_conn_id261 ns = exec_msg.namespace262 if not ns in _g_local_namespaces:263 code_exc = expr_exc = "no local namespace %s" % (ns,)264 return messages.Exec_rv(code_exc, expr_exc, None)265 if conn_id:266 if not conn_id in _g_namespace_users:267 _g_namespace_users[conn_id] = set([ns])268 else:269 _g_namespace_users[conn_id].add(ns)270 code_exc, expr_exc, expr_rv = None, None, None271 if not exec_msg.lock or _g_local_namespace_locks[ns].acquire():272 _g_executing_pythonshare_conn_id = conn_id273 try:274 if exec_msg.code not in [None, ""]:275 try:276 exec exec_msg.code in _g_local_namespaces[ns]277 except Exception, e:278 code_exc = exception2string(sys.exc_info())279 if exec_msg.expr not in [None, ""]:280 try:281 expr_rv = eval(exec_msg.expr, _g_local_namespaces[ns])282 except Exception, e:283 expr_exc = exception2string(sys.exc_info())284 finally:285 _g_executing_pythonshare_conn_id = None286 if exec_msg.lock:287 try:288 _g_local_namespace_locks[ns].release()289 except thread.error:290 pass # already unlocked namespace291 else:292 code_exc = expr_exc = 'locking namespace "%s" failed' % (ns,)293 if isinstance(expr_rv, pythonshare.messages.Exec_rv):294 return expr_rv295 else:296 return messages.Exec_rv(code_exc, expr_exc, expr_rv)297def _local_async_execute(async_rv, exec_msg):298 exec_rv = _local_execute(exec_msg)299 _g_async_rvs[exec_msg.namespace][async_rv.rvid] = exec_rv300def _remote_execute(ns, exec_msg):301 rns = _g_remote_namespaces[ns]302 pythonshare._send(exec_msg, rns.to_remote)303 # _recv raises EOFError() if disconnected,304 # let it raise through.305 return pythonshare._recv(rns.from_remote)306def _remote_execute_and_forward(ns, exec_msg, to_client, peername=None):307 """returns (forward_status, info)308 forward_status values:309 True: everything successfully forwarded,310 info contains pair (forwarded byte count, full length).311 False: not everything forwarded,312 info contains pair (forwarded byte count, full length).313 to_client file/socket is not functional.314 None: no forwarding,315 info contains Exec_rv that should be sent normally.316 Raises EOFError if connection to remote namespace is not functional.317 The peername parameter is used for logging only.318 """319 client_supports_rv_info = exec_msg.recv_cap_data_info()320 exec_msg.set_recv_cap_data_info(True)321 rns = _g_remote_namespaces[ns]322 from_remote = rns.from_remote323 # Must keep simultaneously two locks:324 # - send lock on to_client325 # - recv lock on from_remote326 pythonshare._acquire_recv_lock(from_remote)327 try:328 pythonshare._send(exec_msg, rns.to_remote)329 response = pythonshare._recv(from_remote, acquire_recv_lock=False)330 if not isinstance(response, messages.Data_info):331 # Got direct response without forward mode332 return (None, response)333 pythonshare._acquire_send_lock(to_client)334 if client_supports_rv_info:335 # send data_info to client336 pythonshare._send(response, to_client, acquire_send_lock=False)337 try:338 if opt_debug and peername:339 daemon_log("%s:%s <= Exec_rv([forwarding %s B])" % (peername + (response.data_length,)))340 forwarded_bytes = pythonshare._forward(341 from_remote, to_client, response.data_length,342 acquire_recv_lock=False,343 acquire_send_lock=False)344 if forwarded_bytes == response.data_length:345 return (True, (forwarded_bytes, response.data_length))346 else:347 return (False, (forwarded_bytes, response.data_length))348 finally:349 pythonshare._release_send_lock(to_client)350 finally:351 exec_msg.set_recv_cap_data_info(client_supports_rv_info)352 pythonshare._release_recv_lock(from_remote)353def _connection_lost(conn_id, *closables):354 if closables:355 pythonshare._close(*closables)356 try:357 for ns in _g_namespace_users[conn_id]:358 try:359 _g_local_namespaces[ns]["pythonshare_ns"].call_on_disconnect(conn_id)360 except KeyError:361 pass362 except KeyError:363 pass364def _serve_connection(conn, conn_opts):365 global _g_async_rv_counter366 global _g_server_shutdown367 if isinstance(conn, client.Connection):368 to_client = conn._to_server369 from_client = conn._from_server370 else: # conn is a connected socket371 to_client = conn.makefile("w")372 from_client = conn.makefile("r")373 try:374 peername = conn.getpeername()375 except socket.error:376 peername = ("unknown", "?")377 if opt_debug:378 daemon_log("connected %s:%s" % peername)379 conn_id = "%s-%s" % (timestamp(), id(conn))380 auth_ok = False381 passwords = [k for k in conn_opts.keys() if k.startswith("password.")]382 kill_server_on_close = conn_opts.get("kill-server-on-close", False)383 if passwords:384 # password authentication is required for this connection385 try:386 received_password = pythonshare._recv(from_client)387 except Exception, e:388 daemon_log('error receiving password: %r' % (e,))389 received_password = None390 for password_type in passwords:391 algorithm = password_type.split(".")[1]392 if type(received_password) == str:393 if (algorithm == "plaintext" and394 received_password == conn_opts[password_type]):395 auth_ok = True396 elif (hasattr(hashlib, algorithm) and397 getattr(hashlib, algorithm)(received_password).hexdigest() ==398 conn_opts[password_type]):399 auth_ok = True400 try:401 if auth_ok:402 pythonshare._send(messages.Auth_rv(True), to_client)403 if opt_debug:404 daemon_log("%s:%s authentication ok" % peername)405 elif not received_password is None:406 pythonshare._send(messages.Auth_rv(False), to_client)407 if opt_debug:408 daemon_log("%s:%s authentication failed" % peername)409 except socket.error:410 daemon_log("authentication failed due to socket error")411 auth_ok = False412 else:413 auth_ok = True # no password required414 whitelist_local = conn_opts.get("whitelist_local", None)415 while auth_ok:416 try:417 obj = pythonshare._recv(from_client)418 if opt_debug:419 daemon_log("%s:%s => %s" % (peername + (obj,)))420 except (EOFError, pythonshare.socket.error):421 break422 if isinstance(obj, messages.Register_ns):423 try:424 _init_remote_namespace(obj.ns, conn, to_client, from_client)425 pythonshare._send(messages.Ns_rv(True), to_client)426 # from this point on, this connection is reserved for427 # sending remote namespace traffic. The connection will be428 # used by other threads, this thread stops here.429 return430 except Exception, e:431 pythonshare._send(messages.Ns_rv(False, exception2string(sys.exc_info())), to_client)432 elif isinstance(obj, messages.Drop_ns):433 try:434 if obj.ns in _g_local_namespaces:435 _drop_local_namespace(obj.ns)436 elif obj.ns in _g_remote_namespaces:437 _drop_remote_namespace(obj.ns)438 else:439 raise ValueError('Unknown namespace "%s"' % (obj.ns,))440 pythonshare._send(messages.Ns_rv(True), to_client)441 except Exception, e:442 if opt_debug:443 daemon_log("namespace drop error: %s" % (e,))444 pythonshare._send(messages.Ns_rv(False, exception2string(sys.exc_info())), to_client)445 elif isinstance(obj, messages.Request_ns):446 ns = obj.ns447 if (ns in _g_remote_namespaces or448 ns in _g_local_namespaces):449 _register_exported_namespace(ns, conn)450 pythonshare._send(messages.Ns_rv(True), to_client)451 # from this point on, this connection is reserved for452 # receiving executions on requested namespace. This453 # thread starts serving the connection.454 elif isinstance(obj, messages.Exec):455 ns = obj.namespace456 if ns in _g_remote_namespaces: # execute in remote namespace457 try:458 _fwd_status, _fwd_info = _remote_execute_and_forward(459 ns, obj, to_client, peername)460 if _fwd_status == True:461 # successfully forwarded462 if opt_debug:463 daemon_log("%s:%s forwarded %s B" % (peername + (_fwd_info[0],)))464 exec_rv = None # return value fully forwarded465 elif _fwd_status == False:466 # connection to client is broken467 if opt_debug:468 daemon_log("%s:%s error after forwarding %s/%s B" % (peername + _fwd_info))469 break470 elif _fwd_status is None:471 # nothing forwarded, send return value by normal means472 exec_rv = _fwd_info473 except (EOFError, socket.error):474 daemon_log('connection lost to "%s"' % (ns,))475 _drop_remote_namespace(ns)476 break477 else: # execute in local namespace478 if whitelist_local == None or ns in whitelist_local:479 _init_local_namespace(ns)480 if obj.async:481 # asynchronous execution, return handle (Async_rv)482 _g_async_rv_counter += 1483 rvid = timestamp() + str(_g_async_rv_counter)484 exec_rv = messages.Async_rv(ns, rvid)485 _g_async_rvs[ns][rvid] = pythonshare.InProgress()486 thread.start_new_thread(_local_async_execute, (exec_rv, obj))487 else:488 # synchronous execution, return true return value489 exec_rv = _local_execute(obj, conn_id)490 if not exec_rv is None:491 if opt_debug:492 daemon_log("%s:%s <= %s" % (peername + (exec_rv,)))493 try:494 try:495 if obj.recv_cap_data_info():496 info = pythonshare._send_opt(exec_rv, to_client, obj.recv_caps)497 if info:498 sent_info = " %s B, format:%s" % (499 info.data_length, info.data_format)500 else:501 sent_info = ""502 else:503 pythonshare._send(exec_rv, to_client)504 sent_info = ""505 if opt_debug:506 daemon_log("%s:%s sent%s" % (peername + (sent_info,)))507 except (EOFError, socket.error):508 break509 except (TypeError, ValueError, cPickle.PicklingError): # pickling rv fails510 exec_rv.expr_rv = messages.Unpicklable(exec_rv.expr_rv)511 try:512 pythonshare._send(exec_rv, to_client)513 except (EOFError, socket.error):514 break515 elif isinstance(obj, messages.Server_ctl):516 if obj.command == "die":517 ns = obj.args[0]518 if ns in _g_remote_namespaces:519 try:520 rv = _remote_execute(ns, obj)521 if opt_debug:522 daemon_log("%s:%s <= %s" % (peername + (rv,)))523 pythonshare._send(rv, to_client)524 except (EOFError, socket.error): # connection lost525 daemon_log('connection lost to "%s"' % (ns,))526 _drop_remote_namespace(ns)527 break528 else:529 _g_server_shutdown = True530 server_ctl_rv = messages.Server_ctl_rv(0, "shutting down")531 pythonshare._send(server_ctl_rv, to_client)532 if _g_wake_server_function:533 _g_wake_server_function()534 break535 elif obj.command == "unlock":536 try:537 ns = obj.args[0]538 if ns in _g_remote_namespaces:539 try:540 rv = _remote_execute(ns, obj)541 except (EOFError, socket.error): # connection lost542 daemon_log('connection lost to "%s"' % (ns,))543 _drop_remote_namespace(ns)544 break545 elif ns in _g_local_namespace_locks:546 try:547 _g_local_namespace_locks[ns].release()548 server_ctl_rv = messages.Server_ctl_rv(549 0, "%s unlocked" % (repr(ns),))550 except thread.error, e:551 server_ctl_rv = messages.Server_ctl_rv(552 1, "%s already unlocked" %553 (repr(ns),))554 elif ns in _g_local_namespaces:555 server_ctl_rv = messages.Server_ctl_rv(556 2, "namespace %s is not locked" % (repr(ns),))557 else:558 server_ctl_rv = messages.Server_ctl_rv(559 -1, "unknown namespace %s" % (repr(ns),))560 if opt_debug:561 daemon_log("%s:%s <= %s" % (peername + (server_ctl_rv,)))562 pythonshare._send(server_ctl_rv, to_client)563 except Exception, e:564 if opt_debug:565 daemon_log("Exception in handling %s: %s" % (obj, e))566 else:567 daemon_log("unknown message type: %s in %s" % (type(obj), obj))568 pythonshare._send(messages.Auth_rv(False), to_client)569 auth_ok = False570 if opt_debug:571 daemon_log("disconnected %s:%s" % peername)572 _connection_lost(conn_id, to_client, from_client, conn)573 if kill_server_on_close:574 _g_server_shutdown = True575 if _g_wake_server_function:576 _g_wake_server_function()577def start_server(host, port,578 ns_init_import_export=[],579 conn_opts={},580 listen_stdin=True):581 global _g_wake_server_function582 global _g_waker_lock583 daemon_log("pid: %s" % (os.getpid(),))584 # Initialise, import and export namespaces585 for task, ns, arg in ns_init_import_export:586 if task == "init":587 # If arg is a string, it will be executed in ns.588 # If arg is a dict, it will be used as ns.589 _init_local_namespace(ns, arg, force=True)590 elif task == "export":591 # Make sure ns exists before exporting.592 _init_local_namespace(ns, None, force=True)593 daemon_log('exporting "%s" to %s' % (ns, arg))594 try:595 c = pythonshare.connection(arg)596 except Exception, e:597 daemon_log('connecting to %s failed: %s' % (arg, e))598 return599 if c.export_ns(ns):600 _register_exported_namespace(ns, c)601 thread.start_new_thread(602 _serve_connection, (c, {"kill-server-on-close": True}))603 else:604 raise ValueError('Export namespace "%s" to "%s" failed'605 % (ns, arg))606 elif task == "import":607 if (ns in _g_local_namespaces or608 ns in _g_remote_namespaces):609 raise ValueError('Import failed, namespace "%s" already exists'610 % (ns,))611 c = pythonshare.connection(arg)612 if c.import_ns(ns):613 _init_remote_namespace(ns, c, c._to_server, c._from_server)614 try:615 addrinfos = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)616 for addrinfo in addrinfos:617 daemon_log("listen: %s:%s" % (addrinfo[4][0], addrinfo[4][1]))618 except socket.error:619 daemon_log("listen: %s:%s" % (host, port))620 if isinstance(port, int):621 def wake_server_function():622 _g_waker_lock.release() # wake up server623 _g_wake_server_function = wake_server_function624 _g_waker_lock = thread.allocate_lock()625 _g_waker_lock.acquire() # unlocked626 # Start listening to the port627 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)628 try:629 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)630 except:631 pass632 s.bind((host, port))633 s.listen(4)634 event_queue = Queue.Queue()635 thread.start_new_thread(_store_return_value, (s.accept, event_queue))636 thread.start_new_thread(_store_return_value, (_g_waker_lock.acquire, event_queue))637 if not sys.stdin.closed and listen_stdin:638 daemon_log("listening to stdin")639 thread.start_new_thread(_read_lines_from_stdin, (event_queue,))640 else:641 daemon_log("not listening stdin")642 while 1:643 event = event_queue.get()644 if isinstance(event, tuple):645 # returned from s.accept646 conn, _ = event647 thread.start_new_thread(_serve_connection, (conn, conn_opts))648 elif event == True:649 # returned from _g_waker_lock.acquire650 daemon_log("shutting down.")651 break652 else:653 # returned from sys.stdin.readline654 pass655 elif port == "stdin":656 opt_debug_limit = 0657 if os.name == "nt":658 import msvcrt659 msvcrt.setmode(sys.stdin.fileno(), os.O_BINARY)660 msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)661 conn = client.Connection(sys.stdin, sys.stdout)662 _serve_connection(conn, conn_opts)663 for ns in sorted(_g_remote_namespaces.keys()):664 _drop_remote_namespace(ns)...
ninja_scons_daemon.py
Source:ninja_scons_daemon.py
...68 filemode="a",69 format="%(asctime)s %(message)s",70 level=logging.DEBUG,71)72def daemon_log(message):73 logging.debug(message)74def custom_readlines(handle, line_separator="\n", chunk_size=1):75 buf = ""76 while not handle.closed:77 data = handle.read(chunk_size)78 if not data:79 break80 buf += data.decode("utf-8")81 if line_separator in buf:82 chunks = buf.split(line_separator)83 buf = chunks.pop()84 for chunk in chunks:85 yield chunk + line_separator86 if buf.endswith("scons>>>"):87 yield buf88 buf = ""89def custom_readerr(handle, line_separator="\n", chunk_size=1):90 buf = ""91 while not handle.closed:92 data = handle.read(chunk_size)93 if not data:94 break95 buf += data.decode("utf-8")96 if line_separator in buf:97 chunks = buf.split(line_separator)98 buf = chunks.pop()99 for chunk in chunks:100 yield chunk + line_separator101def enqueue_output(out, queue):102 for line in iter(custom_readlines(out)):103 queue.put(line)104 out.close()105def enqueue_error(err, queue):106 for line in iter(custom_readerr(err)):107 queue.put(line)108 err.close()109input_q = queue.Queue()110output_q = queue.Queue()111error_q = queue.Queue()112building_cv = Condition()113error_cv = Condition()114class StateInfo:115 def __init__(self) -> None:116 self.thread_error = False117 self.finished_building = []118 self.error_nodes = []119 self.startup_failed = False120 self.startup_output = ''121 self.daemon_needs_to_shutdown = False122 self.httpd = None123shared_state = StateInfo()124def sigint_func(signum, frame):125 global shared_state126 shared_state.daemon_needs_to_shutdown = True127signal.signal(signal.SIGINT, sigint_func)128def daemon_thread_func():129 global shared_state130 try:131 args_list = args + ["--interactive"]132 daemon_log(f"Starting daemon with args: {' '.join(args_list)}")133 daemon_log(f"cwd: {os.getcwd()}")134 p = Popen(args_list, stdout=PIPE, stderr=PIPE, stdin=PIPE)135 t = threading.Thread(target=enqueue_output, args=(p.stdout, output_q))136 t.daemon = True137 t.start()138 te = threading.Thread(target=enqueue_error, args=(p.stderr, error_q))139 te.daemon = True140 te.start()141 daemon_ready = False142 143 building_node = None144 startup_complete = False145 # While scons interactive process is stil running...146 while p.poll() is None:147 # while there is scons output to process148 while True:149 try:150 line = output_q.get(block=False, timeout=0.01)151 except queue.Empty:152 # breaks out of the output processing loop153 break154 else:155 daemon_log("output: " + line.strip())156 if not startup_complete:157 shared_state.startup_output += line158 if "scons: building terminated because of errors." in line:159 error_output = ""160 while True:161 try:162 error_output += error_q.get(block=False, timeout=0.01)163 except queue.Empty:164 break165 shared_state.error_nodes += [{"node": building_node, "error": error_output}]166 daemon_ready = True167 building_node = None168 with building_cv:169 building_cv.notify()170 elif line == "scons>>>":171 shared_state.startup_output = ''172 startup_complete = True173 with error_q.mutex:174 error_q.queue.clear()175 daemon_ready = True176 with building_cv:177 building_cv.notify()178 building_node = None179 # while there is input to process...180 while daemon_ready and not input_q.empty():181 try:182 building_node = input_q.get(block=False, timeout=0.01)183 except queue.Empty:184 break185 if "exit" in building_node:186 daemon_log("input: " + "exit")187 p.stdin.write("exit\n".encode("utf-8"))188 p.stdin.flush()189 with building_cv:190 shared_state.finished_building += [building_node]191 daemon_ready = False192 shared_state.daemon_needs_to_shutdown = True193 break194 else:195 input_command = "build " + building_node + "\n"196 daemon_log("input: " + input_command.strip())197 p.stdin.write(input_command.encode("utf-8"))198 p.stdin.flush()199 with building_cv:200 shared_state.finished_building += [building_node]201 daemon_ready = False202 if shared_state.daemon_needs_to_shutdown:203 break204 time.sleep(0.01)205 # our scons process is done, make sure we are shutting down in this case206 if not shared_state.daemon_needs_to_shutdown:207 if not startup_complete:208 shared_state.startup_failed = True209 shared_state.daemon_needs_to_shutdown = True210 except Exception:211 shared_state.thread_error = True212 daemon_log("SERVER ERROR: " + traceback.format_exc())213 raise214daemon_thread = threading.Thread(target=daemon_thread_func)215daemon_thread.daemon = True216daemon_thread.start()217logging.debug(218 f"Starting request server on port {port}, keep alive: {daemon_keep_alive}"219)220keep_alive_timer = timer()221def server_thread_func():222 global shared_state223 class S(http.server.BaseHTTPRequestHandler):224 def do_GET(self):225 global shared_state226 global keep_alive_timer227 try:228 gets = parse_qs(urlparse(self.path).query)229 230 # process a request from ninja for a node for scons to build. 231 # Currently this is a serial process because scons interactive is serial232 # is it was originally meant for a real human user to be providing input233 # parallel input was never implemented.234 build = gets.get("build")235 if build:236 keep_alive_timer = timer()237 daemon_log(f"Got request: {build[0]}")238 input_q.put(build[0])239 def pred():240 return build[0] in shared_state.finished_building241 with building_cv:242 building_cv.wait_for(pred)243 for error_node in shared_state.error_nodes:244 if error_node["node"] == build[0]:245 self.send_response(500)246 self.send_header("Content-type", "text/html")247 self.end_headers()248 self.wfile.write(error_node["error"].encode())249 return250 self.send_response(200)251 self.send_header("Content-type", "text/html")252 self.end_headers()253 return254 # this message is used in server startup, to make sure the server launched255 # successfully. If SCons interactive got to a input prompt (scons>>>), then256 # the server is ready to start processing commands. Otherwise the server will257 # send an error response back to ninja and shut itself down.258 ready = gets.get("ready")259 if ready:260 if shared_state.startup_failed:261 self.send_response(500)262 self.send_header("Content-type", "text/html")263 self.end_headers()264 self.wfile.write(shared_state.startup_output.encode())265 return266 exitbuild = gets.get("exit")267 if exitbuild:268 input_q.put("exit")269 self.send_response(200)270 self.send_header("Content-type", "text/html")271 self.end_headers()272 except Exception:273 shared_state.thread_error = True274 daemon_log("SERVER ERROR: " + traceback.format_exc())275 raise276 def log_message(self, format, *args):277 return278 socketserver.TCPServer.allow_reuse_address = True279 shared_state.httpd = socketserver.TCPServer(("127.0.0.1", port), S)280 shared_state.httpd.serve_forever()281server_thread = threading.Thread(target=server_thread_func)282server_thread.daemon = True283server_thread.start()284while (timer() - keep_alive_timer < daemon_keep_alive 285 and not shared_state.thread_error 286 and not shared_state.daemon_needs_to_shutdown):287 time.sleep(1)288if shared_state.thread_error:289 daemon_log(f"Shutting server on port {port} down because thread error.")290elif shared_state.daemon_needs_to_shutdown:291 daemon_log("Server shutting down upon request.")292else:293 daemon_log(294 f"Shutting server on port {port} down because timed out: {daemon_keep_alive}"295 )296shared_state.httpd.shutdown()297if os.path.exists(ninja_builddir / "scons_daemon_dirty"):298 os.unlink(ninja_builddir / "scons_daemon_dirty")299if os.path.exists(daemon_dir / "pidfile"):300 os.unlink(daemon_dir / "pidfile")301# Local Variables:302# tab-width:4303# indent-tabs-mode:nil304# End:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!