Best Python code snippet using keyboard
TServer.py
Source:TServer.py
...132 def __init__(self, *args):133 TServer.__init__(self, *args)134 self.children = []135 def serve(self):136 def try_close(file):137 try:138 file.close()139 except IOError as e:140 logger.warning(e, exc_info=True)141 self.serverTransport.listen()142 while True:143 client = self.serverTransport.accept()144 if not client:145 continue146 try:147 pid = os.fork()148 if pid:149 self.children.append(pid)150 self.collect_children()151 itrans = self.inputTransportFactory.getTransport(client)152 otrans = self.outputTransportFactory.getTransport(client)153 try_close(itrans)154 try_close(otrans)155 else:156 itrans = self.inputTransportFactory.getTransport(client)157 otrans = self.outputTransportFactory.getTransport(client)158 iprot = self.inputProtocolFactory.getProtocol(itrans)159 oprot = self.outputProtocolFactory.getProtocol(otrans)160 ecode = 0161 try:162 try:163 while True:164 self.processor.process(iprot, oprot)165 except TTransport.TTransportException:166 pass167 except Exception as e:168 logger.exception(e)169 ecode = 1170 finally:171 try_close(itrans)172 try_close(otrans)173 os._exit(ecode)174 except TTransport.TTransportException:175 pass176 except Exception as x:177 logger.exception(x)178 def collect_children(self):179 while self.children:180 try:181 pid, status = os.waitpid(0, os.WNOHANG)182 except os.error:183 pid = None184 if pid:185 self.children.remove(pid)186 else:...
wrap_squid_base.py
Source:wrap_squid_base.py
...15 #-----------16 def __enter__(self):17 return self18 def __del__(self):19 self.try_close()20 def __exit__(self, type, value, tb):21 self.try_close()22 #-----------23 def init(self, part) :24 self.part = part25 found = 026 try:27 path = os.path.dirname(os.path.realpath(__file__))28 source = open(path + '/wrap_squid.txt', 'r')29 for row in source :30 row = row.strip()31 if(len(row)) :32 t = row.split('=')33 if('ip' == t[0]) :34 self.serverHost = t[1]35 found += 136 elif('port' == t[0]) :37 self.serverPort = int(t[1])38 found += 139 if(2 == found) :40 break 41 except:42 print ('ERR no open source')43 return False44 return True45 #-----------46 def try_close(self) :47 if(None == self.s):48 return49 try:50 self.s.shutdown(SHUT_RDWR)51 except: 52 pass53 self.s.close()54 self.s = None55 #-----------56 def connect(self) : 57 result = False58 self.try_close()59 if(self.serverPort and not '' == self.serverHost) :60 try:61 self.s = socket(AF_INET, SOCK_STREAM)62 self.s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)63 self.s.connect((self.serverHost, self.serverPort))64 self.s.send(bytes(self.part + '\n', 'utf-8'))65 responce = self.s.recv(1024)66 responce = responce.decode("utf-8")67 if(not responce == 'ok-' + self.part + '\n') :68 self.try_close()69 else :70 result = True71 except Exception as e: 72 print("ERR " + str(e))73 self.try_close()74 return result75 #-----------76 def send(self, data) :77 if(None == self.s) :78 if(not self.connect()) :79 print("Err no Conn\n")80 return False81 try:82 sent = self.s.sendall(data)83 if(None == sent) :84 return True85 else:86 self.try_close()87 return False88 except:89 self.try_close()90 return False91 #-----------92 def recv(self) :93 try:94 data = self.s.recv(1024)95 if(not data) :96 self.try_close()97 return None 98 else : 99 return data100 except:101 self.try_close()102 return None 103#----------------------------------------104def conv_input(data) :105 data = data.encode("utf-8")106 #data = data.encode("iso8859-1")107 data = base64.b64encode(data) + b'\n';108 return data109#----------------------------------------110def conv_output(data) :111 return (data.decode("utf-8")).strip()112#----------------------------------------113def try_send(conn, line) :114 if(conn.send(line)) :115 data = conn.recv()116 if(not None == data) :117 data = conv_output(data)118 print(data)119 return True120 conn.try_close()121 return False122#----------------------------------------123def perform_wrap_squid(conn) :124 #conn.connect()125 for line in sys.stdin:126 if(not line) :127 break128 line = line.strip()129 if(len(line) > 0) :130 line = conv_input(line)131 try:132 if(not try_send(conn, line)) :133 time.sleep(0.2)134 if(not try_send(conn, line)) :135 print("ERR no send\n")136 conn.try_close()137 #return138 except:139 pass140#----------------------------------------141def wrap_squid_base(helper_text) :142 with connection() as conn :143 if(conn.init(helper_text)):144 perform_wrap_squid(conn)145 conn.try_close()...
__init__.py
Source:__init__.py
2from . import exceptions3PROTO = 'CAP/0.1.0'4BUFSIZ = 10245local_urls = []6def try_close(func):7 """Decorator to wrap function in a try/except for OSError.""" 8 def tc(*args, **kwarg):9 try:10 return func(*args, **kwarg)11 except (OSError, EOFError):12 raise exceptions.SocketClosed()13 return tc14def local(func):15 """Decorator cancel network if local_urls != []""" 16 def nop():17 pass18 def ll(*args, **kwarg):19 if local_urls == []:20 return func(*args, **kwarg)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!