Best Python code snippet using fMBT_python
proxy pool.py
Source:proxy pool.py
...95 if sock is self.src_socks:96 data = sock.recv(BUFLEN)97 if len(data) > 0:98 print '{}:{}->{}:{} len:{}'.format(99 self.src_socks.getpeername()[0], self.src_socks.getpeername()[1],100 self.dst_socks.getpeername()[0], self.dst_socks.getpeername()[1],101 len(data))102 self.dst_socks.send(data)103 running = True104 else:105 print 'got nothing'106 print 'kill connection {}:{} to {}:{}'.format(107 self.src_socks.getpeername()[0],self.src_socks.getpeername()[1],108 self.dst_socks.getpeername()[0],self.dst_socks.getpeername()[1])109 running = False110 elif sock is self.dst_socks:111 data = sock.recv(BUFLEN)112 if len(data) > 0:113 print '{}:{}->{}:{} len:{}'.format(114 self.dst_socks.getpeername()[0], self.dst_socks.getpeername()[1],115 self.src_socks.getpeername()[0], self.src_socks.getpeername()[1],116 len(data))117 self.src_socks.send(data)118 running = True119 else:120 print 'got nothing'121 print 'kill connection {}:{} to {}:{}'.format(122 self.src_socks.getpeername()[0], self.src_socks.getpeername()[1],123 self.dst_socks.getpeername()[0], self.dst_socks.getpeername()[1])124 running = False125126 def __del__(self):127 self.src_socks.close()128 self.dst_socks.close()129130131class Server(object):132 def __init__(self, host, port, timeout=20, handler=Proxy):133 signal_init()134 socket.setdefaulttimeout(timeout)135 self.host = host136 self.port = port137 self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
...
server.py
Source:server.py
...27 pass28 def msg_to_all(self, msg, cliente):29 for c in self.clients:30 try:31 if c != cliente and self.wwclients[c.getpeername()][0] == 2:32 c.send(msg)33 except:34 del self.wwclients[c.getpeername()]35 self.clients.remove(c)36 def acceptingConns(self):37 print("Ðжидание подклÑÑений")38 while True:39 try:40 conn, addr = self.sock.accept()41 conn.setblocking(False)42 self.clients.append(conn)43 self.wwclients[conn.getpeername()] = [0,'']44 except:45 pass46 def recievingMsgs(self):47 print("Ðжидание ÑообÑений")48 while True:49 if len(self.clients) > 0:50 for c in self.clients:51 try:52 data = c.recv(1024)53 sdata = repr(data)54 print(sdata)55 print(self.wwclients)56 if data:57 if self.wwclients[c.getpeername()][0] == 2:58 self.msg_to_all(data,c)59 elif self.wwclients[c.getpeername()][0] == 0:60 if serveroperations.check_name_exists(sdata):61 found = 062 for b in self.wwclient:63 if b[1] == sdata:64 c.send('Такой полÑзоваÑÐµÐ»Ñ Ñже залогинен')65 print('ÐолÑзоваÑÐµÐ»Ñ {} попÑÑалÑÑ Ð²Ð¾Ð¹Ñи под '+\66 'Ñже залогиненÑм именем'.format(c.getpeername()))67 found = 168 if not found:69 self.wwclient[c.getpeername()] = [1, sdata]70 c.send('ÐÑ Ð²Ð²ÐµÐ»Ð¸ логин')71 print('ÐолÑзоваÑÐµÐ»Ñ {} ввел логин'.format(c.getpeername()))72 else:73 serveroperations.create_user(sdata)74 self.wwclients[c.getpeername()] = [1,sdata]75 c.send('ÐÑ Ñоздали ÑÑеÑнÑÑ Ð·Ð°Ð¿Ð¸ÑÑ')76 print('ÐолÑзоваÑÐµÐ»Ñ {} Ñоздал ÑÑеÑнÑÑ Ð·Ð°Ð¿Ð¸ÑÑ'.format(c.getpeername()))77 elif self.wwclients[c.getpeername()][0] == 1:78 if serveroperations.get_user_password() == '':79 serveroperations.set_user_password(self.wwclients[c.getpeername()][1],sdata)80 self.wwclients[c.getpeername()][0] = 281 c.send('ÐÑ Ð·Ð°Ð»Ð¾Ð³Ð¸Ð½Ð¸Ð»Ð¸ÑÑ')82 print('ÐолÑзоваÑÐµÐ»Ñ {} залогинилÑÑ'.format(c.getpeername()))83 elif serveroperations.get_user_password(self.wwclients[c.getpeername()][1]) == sdata:84 self.wwclients[c.getpeername()][0] = 285 c.send('ÐÑ Ð·Ð°Ð»Ð¾Ð³Ð¸Ð½Ð¸Ð»Ð¸ÑÑ')86 print('ÐолÑзоваÑÐµÐ»Ñ {} залогинилÑÑ'.format(c.getpeername()))87 else:88 self.wwclients[c.getpeername()] = [0,'']89 c.send('ÐÑ Ð²Ð²ÐµÐ»Ð¸ невеÑнÑй паÑолÑ')90 print('ÐолÑзоваÑÐµÐ»Ñ {} ÑазлогинилÑÑ'.format(c.getpeername()))91 except:92 pass...
chat.py
Source:chat.py
...2122def Disconnect(client):23 l.remove(i)24 clients.remove(i)25 print("client disconnected " + "\"%s\""%(str(nick[i.getpeername()[1]])))26 client.close()2728def Error_Command(msg):29 if msg.split(' ')[0] == 'NICK':30 if len(msg.split(' ')) != 2:31 connection.sendall("invalid command\n".encode())32 return (-1)33 else:34 return (1)35 if msg.split(' ')[0] in ['QUIT', 'MSG', 'KILL']:36 if len(msg.split(' ')) < 2:37 connection.sendall("invalid command\n".encode())38 return (-1)39 else:40 return (1)4142def MSG(msg, connection):43 mm = "[%s] %s"%(str(nick[connection.getpeername()[1]]), str(' '.join(msg.split(' ')[1:])))44 print(mm);45 brodcast(mm, connection)4647def KILL(msg, connection):48 mm = "[%s] %s\n"%(str(nick[connection.getpeername()[1]]), str(' '.join(msg.split(' ')[2:])))49 k_list = list(nick.keys())50 v_list = list(nick.values())51 port = int(k_list[v_list.index(msg.split(' ')[1])])52 for p in clients:53 if p.getpeername()[1] == port:54 client = p55 client.sendall(mm.encode())56 l.remove(client)57 print("client disconnected " + "\"%s\""%(str(nick[client.getpeername()[1]])))58 nick.pop(client.getpeername()[1])59 clients.remove(client)60 client.close()6162def QUIT(msg, connection):63 sms = "[%s] %s"%(str(nick[connection.getpeername()[1]]), str(' '.join(msg.split(' ')[1:])))64 brodcast(sms, connection)65 print("client disconnected " + "\"%s\""%(str(nick[connection.getpeername()[1]])))66 clients.remove(connection)67 l.remove(connection)68 nick.pop(connection.getpeername()[1])69 connection.close()7071def WHO(msg, connection):72 connection.sendall("[server] ".encode())73 for i in clients:74 connection.sendall(nick[i.getpeername()[1]].encode())75 connection.sendall(' '.encode())76 connection.sendall('\n'.encode())7778def NICK(msg, connection): 79 name = "client " + str(nick.get(connection.getpeername()[1])) + " => " + "\"%s\""%(str(msg.split(' ')[1].strip('\n')))80 print(name)81 brodcast(name, connection)82 nick[connection.getpeername()[1]] = msg.split(' ')[1].strip('\n')8384def handel(msg, connection):85 if msg.split(' ')[0] not in cmd:86 connection.sendall("invalid command\n".encode())87 elif msg.split(' ')[0] == 'WHO':88 WHO(msg, connection)89 elif msg.split(' ')[0] == 'NICK':90 print(msg)91 if Error_Command(msg) == 1:92 NICK(msg, connection)93 elif msg.split(' ')[0] == 'QUIT':94 if Error_Command(msg) == 1:95 QUIT(msg, connection)96 elif msg.split(' ')[0] == 'MSG':97 if Error_Command(msg) == 1:98 MSG(msg, connection)99 elif msg.split(' ')[0] == 'KILL':100 if Error_Command(msg) == 1:101 KILL(msg, connection)102103 104while True:105 intr, nint, nit = select.select(l, [], [])106 for i in intr:107 if i == s:108 sc, a = s.accept()109 print("client connected \"%s:%i\""%(str(a[0]), int(a[1])))110 nick[sc.getpeername()[1]] = "%s:%i"%(str(a[0]), int(a[1]))111 l.append(sc)112 clients.append(sc)113 else:114 try:115 msg = i.recv(1500)116 sms = msg.decode().strip('\n')117 if len(msg) == 0:118 Disconnect(i)119 break120 handel(sms, i)121 except:122 continue
...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!