Best Python code snippet using tavern
client.py
Source:client.py
1from ecdsa import SigningKey,NIST521p2import socket3from message import sendMessage,receiveMessages4import sys5from random import randint6from os import system7from encryption.AES import AESCipher8from ipfs import downloadFileFromIPFS, uploadFileToIPFS9#Function to send data to the server 10def sendDataToServer(message_to_send):11 server = socket.socket()12 server.connect(('127.0.0.1',9098))13 sendMessage(server,message_to_send)14 message_received = receiveMessages(server)15 server.close()16 if(message_received):17 return message_received18 else:19 return False20#Status if user exist or not (not important )21def checkStatus(secret_number):22 system('cls')23 print("checking your status.....")24 message_to_send = ['1112',secret_number]25 message_received = sendDataToServer(message_to_send)26 return message_received27#User authentication with main server28def authenticateUser(secret_number):29 system('cls')30 returnValue = True31 print("you are required to enter the filename or full path of the filename which contains your private key")32 message_to_send = ['0001',secret_number]33 server = socket.socket()34 server.connect(('127.0.0.1',9098))35 sendMessage(server,message_to_send)36 message_received = receiveMessages(server)37 if(message_received[0] == '0002'):38 if(message_received[1] is False):39 print('no data found in corresponding to the given secret number')40 exit()41 puzzle_number = message_received[1]42 privateKey = None43 filename = input('your private key filename(full path of file with extension) : ')44 try:45 privateKey = SigningKey.from_pem(open(filename).read())46 except:47 print('(BBEMS error: 401) File does not contain any private key information')48 sendMessage(server,['0010',False])49 server.close()50 exit()51 signature = privateKey.sign(str(puzzle_number).encode())52 message_to_send = ['0010',signature]53 sendMessage(server,message_to_send)54 message_received = receiveMessages(server)55 server.close()56 if message_received[0] == '0020' and message_received[1]:57 print('Authentication successful!!!\n')58 else:59 print('authentication failed !')60 returnValue = False61 else:62 print('(BBEMS error: 0001) Failed to autenticate user')63 returnValue = False64 return returnValue65#Function for new registration66def newRegistration():67 gradeset = set(['X','Y','Z'])68 system('cls')69 print("Welcome to BBEMS New Registration!!!")70 print("Please note the following points : ")71 print("1 You are required to provide your basic information during registration")72 print("2 After successful registration you will be provided with a secret number. You need to remember or save this secret number and make sure you dont lose it.\nYou can tell this number to anyone it really does not sabotage your account or credentials by telling this number to anyone")73 print("3 You will also be provided with a pair of public and private keys. You need to save both of them seperately. Do not lose the private key or tell anyone your private key.")74 print("Losing your private key will result in losing your account permanently. Losing your secret number will not let you to be authenticated even if you have your private key")75 print('-*-'*50)76 print("Please read the above instructions before continuing")77 print('-'*100)78 full_name = input("enter your name : ")79 while True:80 grade = input('enter your grade : ')81 if grade in gradeset:82 break83 else:84 print('invalid grade entered!!!')85 secret_number = randint(0,9999999)86 sk = SigningKey.generate(curve=NIST521p)87 vk = sk.verifying_key88 privateKey = sk.to_pem().decode()89 publicKey = vk.to_pem().decode()90 message_to_send = ['1111',(secret_number,grade,publicKey)]91 message_received = sendDataToServer(message_to_send)92 if message_received[0] == '2111':93 print("your registration has been completed successfully!\nMake sure you save the keys below to a safe environment")94 print('-*-'*100)95 print(f"your secret number(important) = {secret_number}")96 print('-*-'*100)97 print("your public key : ")98 print(publicKey)99 print('-*-'*100)100 print("your private key(important) : ")101 print(privateKey)102 print('-*-'*100)103 return (secret_number,publicKey,privateKey)104 else:105 print("(BBEMS error:) Registration falied")106 return False107#Function to generate new TID and new AES encryption key108def getTransactionIDandKey(secret_number):109 message_to_send = ['T1001',secret_number]110 message_received = sendDataToServer(message_to_send)111 keys = False112 print(f"messaged received = {message_received}")113 if message_received[0] == '2001' and message_received[1]:114 keys = message_received[2]115 else:116 print('The process failed exiting.....')117 exit()118 return keys119#Uploading the file in IPFS120def uploadFile(encryption_object):121 file_path = input('enter the file name that you want to upload(full path of file with extension) : ')122 data = uploadFileToIPFS(file_path)123 metadata = False124 if(data[0] == False):125 metadata = False126 else:127 metadata = data[1]128 metadata['Hash'] = encryption_object.encrypt(metadata['Hash'])129 return metadata130def mineTheDataToBlockchain(metadata):131 message_to_send = ['M1N3',metadata]132 message_received = sendDataToServer(message_to_send)133 if message_received[0] == 'M1N3' and message_received[1]:134 print("Data has been mined to blockchain successfully!!!")135 print(f"please make a note of TID as it will be reqiured to further acess the file TID = {metadata['TID']}")136 else:137 print("Mining failed")138 exit()139#main flow of code140if __name__ == '__main__':141 message_to_print = ''142 arg_num = len(sys.argv)143 if(arg_num < 3):144 message_to_print ="(BBEMS error:) Parameters missing \nExiting....."145 print(message_to_print)146 exit()147 elif(arg_num > 3):148 message_to_print = "(BBEMS error:) Too many parameters \nExiting....."149 else:150 arg1 = sys.argv[1]151 arg2 = sys.argv[2]152 if(arg1 == '-n' and arg2 == '-r'):153 keys = newRegistration()154 secret_number = keys[0]155 privateKey = keys[2]156 filename = str(secret_number)+'_PRIVATEKEY.pem'157 file = open(filename,'w')158 file.write(privateKey)159 file.close()160 print(f"your private key has been saved in {filename}")161 162 elif(arg1 == '-s' and arg2):163 secret_number = int(arg2)164 message_received = checkStatus(secret_number)165 if(message_received):166 print(message_received[1])167 else:168 print(f"(BBEMS error code: 404) {secret_number} not registered") 169 elif(arg1 == '-U' and arg2):170 secret_number = int(arg2)171 success = False172 if not secret_number:173 success = True174 else:175 success = authenticateUser(secret_number)176 if success:177 keys = getTransactionIDandKey(secret_number)178 print(keys)179 TID = keys[0]180 seed = keys[1]181 encryption_object = AESCipher(seed)182 metadata = uploadFile(encryption_object)183 if metadata is False:184 print("File upload encountered an error exiting....")185 exit()186 metadata['TID'] = TID 187 TID = None188 seed = None189 keys = None 190 mineTheDataToBlockchain(metadata)191 else:192 print("Authentication Failed exiting....")193 exit()194 195 elif(arg1 == '-D' and arg2):196 secret_number = int(arg2)197 if not secret_number:198 print("anonymous users cannot download evidence files")199 exit()200 success = authenticateUser(secret_number)201 if success:202 TID = input("enter the TID of the file you want to download : ")203 message_to_send = ['D1D0',TID]204 message_received = sendDataToServer(message_to_send)205 if message_received[0] == 'D1D0' and message_received[1] and message_received[2][0]:206 seed = message_received[1]207 aes_object = AESCipher(seed)208 address = aes_object.decrypt(message_received[2][0])209 filename = message_received[2][1]210 downloadFileFromIPFS(filename,address)211 else:212 print("failed to get key from encryption authority ")...
mainServer.py
Source:mainServer.py
1import socket2import select3from ecdsa.keys import VerifyingKey4from database import connectDatabase,executeInsertQuery, executeSelectQuery5from message import receiveMessages, sendMessage6import _thread7from random import randint8from blockchain.Blockchain import Blockchain9def getKeyFromID(TID):10 message_to_send = ['1002',TID]11 enc_socket = socket.socket()12 enc_socket.connect(('127.0.0.1',9099))13 sendMessage(enc_socket,message_to_send)14 message_received = receiveMessages(enc_socket)15 print(message_received)16 enc_socket.close()17def askForNewKeyandTID(message_to_send,notified_socket):18 enc_socket = socket.socket()19 enc_socket.connect(('127.0.0.1',9099))20 sendMessage(enc_socket,message_to_send)21 message_received = receiveMessages(enc_socket)22 data = None23 if message_received[0] == '2001' and message_received[1]:24 data = message_received25 else:26 data = ['T1001',False,()]27 enc_socket.close()28 sendMessage(notified_socket,data)29def askForKeyFromTID(message_to_send):30 enc_socket = socket.socket()31 enc_socket.connect(('127.0.0.1',9099))32 sendMessage(enc_socket,message_to_send)33 message_received = receiveMessages(enc_socket)34 return message_received35def saveRegistrationDataInDatabase(secret_number,public_key,grade):36 mydb = connectDatabase('localhost','encauth','1234','development')37 mycursor = mydb.cursor()38 query = f"INSERT INTO user (secret_number,public_key,grade) VALUES({secret_number},'{public_key}','{grade}')"39 executeInsertQuery(mydb,mycursor,query)40 return True41def getRegistrationDetailsFromDatabase(secret_number):42 mydb = connectDatabase('localhost','encauth','1234','development')43 mycursor = mydb.cursor()44 query = f"SELECT * FROM user WHERE secret_number = {secret_number}"45 myresult = executeSelectQuery(mycursor,query)46 return myresult47if __name__ == '__main__':48 IP = '127.0.0.1'49 PORT = 909850 server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)51 server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)52 server_socket.bind((IP,PORT))53 server_socket.listen()54 print(f"connection started at {IP} {PORT}")55 socket_list = [server_socket,]56 authenticated = {}57 authenticated[000000] = True58 #Running the server live59 60 while True:61 readers,_,errors = select.select(socket_list,[],socket_list)62 for notified_socket in readers:63 if notified_socket == server_socket:64 client_socket,client_addr = server_socket.accept()65 print(f"connection established with {client_socket}")66 socket_list.append(client_socket)67 else:68 message_received = receiveMessages(notified_socket)69 if message_received is False:70 print(f"disconnected from {notified_socket}")71 socket_list.remove(notified_socket)72 else:73 if message_received[0] == '1111':74 print("new registration save karna hai")75 secret_number = message_received[1][0]76 grade = message_received[1][1]77 public_key = message_received[1][2]78 value = saveRegistrationDataInDatabase(secret_number,public_key,grade)79 message_to_send = ['2111',value]80 sendMessage(notified_socket,message_to_send)81 elif message_received[0] == '1112':82 secret_number = message_received[1]83 value = getRegistrationDetailsFromDatabase(secret_number)84 print(value)85 message_to_send = ['2222',value[0]]86 sendMessage(notified_socket,message_to_send)87 elif message_received[0] == '0001':88 secret_number = message_received[1]89 data = getRegistrationDetailsFromDatabase(secret_number)90 if(data is False):91 message_to_send = ['0002',data]92 sendMessage(notified_socket,message_to_send)93 else:94 public_key = data[0][1]95 public_key = VerifyingKey.from_pem(public_key)96 puzzle_number = randint(1111,99999)97 message_to_send = ['0002',puzzle_number]98 sendMessage(notified_socket,message_to_send)99 message_received = receiveMessages(notified_socket)100 if(message_received[0] == '0010'):101 signature = message_received[1]102 if(signature is False):103 socket_list.remove(notified_socket)104 continue105 value = public_key.verify(signature,str(puzzle_number).encode())106 message_to_send = ['0020',value] 107 authenticated[secret_number] = True108 else:109 message_to_send = ['0020',False]110 sendMessage(notified_socket,message_to_send)111 elif message_received[0] == 'T1001':112 secret_number = message_received[1]113 isAuth = False114 if(secret_number):115 isAuth = True116 message_to_send = ['1001',isAuth]117 if authenticated[secret_number]:118 data = _thread.start_new_thread(askForNewKeyandTID,(message_to_send,notified_socket,))119 else:120 message_to_send['T1001',False,()]121 122 elif message_received[0] == 'M1N3':123 metadata = message_received[1]124 blockchain_object = Blockchain()125 result = blockchain_object.mineData(metadata)126 message_to_send = ['M1N3',result]127 sendMessage(notified_socket,message_to_send)128 129 elif message_received[0] == 'D1D0':130 message_from_enc = askForKeyFromTID(message_received)131 TID = message_received[1]132 print(f"TID dekh le bhai {TID}")133 message_to_send = ['D1D0']134 if message_from_enc[0] == 'D1D0' and message_from_enc[1]:135 message_to_send.append(message_from_enc[2])136 else:137 message_to_send.append(False)138 blockchain_object = Blockchain()139 nameAndAddress = blockchain_object.getAddressFromTID(TID)140 message_to_send.append(nameAndAddress)141 print(message_to_send)142 sendMessage(notified_socket,message_to_send)143 144 145 for notified_socket in errors:146 print(f"something is wrong with {notified_socket}")147 socket_list.remove(notified_socket)...
bot_tf_idf_cosine.py
Source:bot_tf_idf_cosine.py
1import socket2import select3import sys4import pandas as pd5import spacy6import os7from sklearn.naive_bayes import GaussianNB8from sklearn.tree import DecisionTreeClassifier, export_graphviz9import re10import nltk11from nltk.corpus import stopwords12from nltk.tokenize import word_tokenize13from string import punctuation14import numpy as np15import pandas as pd16from nltk.corpus import stopwords17from nltk.tokenize import word_tokenize18from nltk.stem.lancaster import LancasterStemmer19import nltk20import re21from sklearn.preprocessing import OneHotEncoder22from keras.preprocessing.text import Tokenizer23from keras.preprocessing.sequence import pad_sequences24from keras.utils import to_categorical25from keras.models import Sequential, load_model26from keras.layers import Dense, LSTM, Bidirectional, Embedding, Dropout27from keras.callbacks import ModelCheckpoint28import bot_functions29from sklearn.feature_extraction.text import TfidfVectorizer30from scipy import spatial31from random import randrange32# Coisas de conexao do chat33server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)34IP_address = "172.18.135.225"35Port = 808136server.connect((IP_address, Port))37# Carregando modelo do portugues, isso demora pra cacete38print("Carregando modelo em portugues do spacy...", end="")39# nlp = spacy.load('pt_core_news_sm')40nlp = spacy.load('en')41print("pronto!")42df_intents = pd.read_csv('intents.csv')43sentences = list(df_intents["sentence"])44intents = df_intents["intent"]45sentences = bot_functions.tfidf_mapping(nlp, sentences)46model = bot_functions.train_models_tf_idf(nlp, sentences, intents)47print("****TELA DO BOT****")48sockets_list = [sys.stdin, server]49train_mode = False50while True:51 read_sockets, write_socket, error_socket = select.select(sockets_list, [], [])52 for socks in read_sockets:53 if socks == server:54 if not train_mode:55 message_received = socks.recv(2048)56 message_received = message_received.decode()[17:]57 try:58 print(message_received)59 # message_received = bot_functions.clean_sentence(message_received, nlp)60 # print(message_received)61 sentences.append(message_received)62 sentences = bot_functions.tfidf_mapping(nlp, sentences)63 print(sentences)64 intent = bot_functions.cosin_similarity(sentences, intents)65 message_to_send = bot_functions.answer(intent)66 except ValueError:67 message_received = bot_functions.didYouMean(message_received)68 try:69 intent = bot_functions.predict_intent(message_received, nlp, biggest_sentence, model)70 message_to_send = bot_functions.answer(intent[0])71 except ValueError:72 intent = ['desconhecido']73 message_to_save = message_received74 message_to_send = bot_functions.user_train()75 train_mode = True76 else:77 message_received = socks.recv(2048)78 message_received = message_received.decode()[17:]79 message_intent = message_received80 bot_functions.save_msg(message_to_save[:-1], message_intent[:-1]) #[:-1] tirar o \n81 model, biggest_sentence = bot_functions.train_model(nlp)82 message_to_send = "Certo, entendi! Obrigado por contribuir com o meu treinamento! :)\n"83 train_mode = False84 print("\nUsuario enviou: " + message_received)85 server.send(message_to_send.encode())86 sys.stdout.write("Bot: ")87 sys.stdout.write(message_to_send + "\n__________________________________")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!