Best Python code snippet using fMBT_python
server.py
Source:server.py
...499 _g_server_shutdown = True500 server_ctl_rv = messages.Server_ctl_rv(0, "shutting down")501 pythonshare._send(server_ctl_rv, to_client)502 if _g_wake_server_function:503 _g_wake_server_function()504 break505 elif obj.command == "unlock":506 try:507 ns = obj.args[0]508 if ns in _g_remote_namespaces:509 try:510 rv = _remote_execute(ns, obj)511 except (EOFError, socket.error): # connection lost512 daemon_log('connection lost to "%s"' % (ns,))513 _drop_remote_namespace(ns)514 break515 elif ns in _g_local_namespace_locks:516 try:517 _g_local_namespace_locks[ns].release()518 server_ctl_rv = messages.Server_ctl_rv(519 0, "%s unlocked" % (repr(ns),))520 except thread.error, e:521 server_ctl_rv = messages.Server_ctl_rv(522 1, "%s already unlocked" %523 (repr(ns),))524 elif ns in _g_local_namespaces:525 server_ctl_rv = messages.Server_ctl_rv(526 2, "namespace %s is not locked" % (repr(ns),))527 else:528 server_ctl_rv = messages.Server_ctl_rv(529 -1, "unknown namespace %s" % (repr(ns),))530 if opt_debug:531 daemon_log("%s:%s <= %s" % (peername + (server_ctl_rv,)))532 pythonshare._send(server_ctl_rv, to_client)533 except Exception, e:534 if opt_debug:535 daemon_log("Exception in handling %s: %s" % (obj, e))536 else:537 daemon_log("unknown message type: %s in %s" % (type(obj), obj))538 pythonshare._send(messages.Auth_rv(False), to_client)539 auth_ok = False540 if opt_debug:541 daemon_log("disconnected %s:%s" % peername)542 _connection_lost(conn_id, to_client, from_client, conn)543 if kill_server_on_close:544 _g_server_shutdown = True545 if _g_wake_server_function:546 _g_wake_server_function()547def start_server(host, port,548 ns_init_import_export=[],549 conn_opts={},550 listen_stdin=True):551 global _g_wake_server_function552 global _g_waker_lock553 daemon_log("pid: %s" % (os.getpid(),))554 # Initialise, import and export namespaces555 for task, ns, arg in ns_init_import_export:556 if task == "init":557 _init_local_namespace(ns, arg, force=True)558 elif task == "export":559 _init_local_namespace(ns, None, force=True)560 daemon_log('exporting "%s" to %s' % (ns, arg))561 try:562 c = pythonshare.connection(arg)563 except Exception, e:564 daemon_log('connecting to %s failed: %s' % (arg, e))565 return566 if c.export_ns(ns):567 _register_exported_namespace(ns, c)568 thread.start_new_thread(569 _serve_connection, (c, {"kill-server-on-close": True}))570 else:571 raise ValueError('Export namespace "%s" to "%s" failed'572 % (ns, arg))573 elif task == "import":574 if (ns in _g_local_namespaces or575 ns in _g_remote_namespaces):576 raise ValueError('Import failed, namespace "%s" already exists'577 % (ns,))578 c = pythonshare.connection(arg)579 if c.import_ns(ns):580 _init_remote_namespace(ns, c, c._to_server, c._from_server)581 try:582 addrinfos = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)583 for addrinfo in addrinfos:584 daemon_log("listen: %s:%s" % (addrinfo[4][0], addrinfo[4][1]))585 except socket.error:586 daemon_log("listen: %s:%s" % (host, port))587 if isinstance(port, int):588 def wake_server_function():589 _g_waker_lock.release() # wake up server590 _g_wake_server_function = wake_server_function591 _g_waker_lock = thread.allocate_lock()592 _g_waker_lock.acquire() # unlocked593 # Start listening to the port594 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)595 try:596 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)597 except:598 pass599 s.bind((host, port))600 s.listen(4)601 event_queue = Queue.Queue()602 thread.start_new_thread(_store_return_value, (s.accept, event_queue))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!