Best Python code snippet using yandex-tank
test_restart.py
Source:test_restart.py
...15import math16HOST = '127.0.0.1'17TCPPORT = 828218HTTPPORT = 818119def test_read_all(dtstart, delta, N):20 """Read all data in backward direction.21 All data should be received as expected."""22 begin = dtstart + delta*N23 end = dtstart - delta24 query = att.makequery("test", begin, end, output=dict(format='csv'))25 queryurl = "http://{0}:{1}/api/query".format(HOST, HTTPPORT)26 response = urlopen(queryurl, json.dumps(query))27 expected_tags = [28 "tag3=D",29 "tag3=E",30 "tag3=F",31 "tag3=G",32 "tag3=H",33 ]34 exp_ts = dtstart + delta*(N-1)35 exp_value = N-136 iterations = 037 print("Test - read all data in backward direction")38 for line in response:39 try:40 columns = line.split(',')41 tagline = columns[0].strip()42 timestamp = att.parse_timestamp(columns[1].strip())43 value = float(columns[2].strip())44 exp_tags = expected_tags[(N-iterations-1) % len(expected_tags)]45 att.check_values(exp_tags, tagline, 'ENDS', exp_ts, timestamp, exp_value*1.0, value, iterations)46 exp_ts -= delta47 exp_value -= 148 iterations += 149 except:50 print("Error at line: {0}".format(line))51 raise52 # Check that we received all values53 if iterations != N:54 raise ValueError("Expect {0} data points, get {1} data points".format(N, iterations))55 print("Test passed")56def main(path):57 akumulid = att.create_akumulid(path)58 # Reset database59 akumulid.delete_database()60 akumulid.create_database()61 # start ./akumulid server62 print("Starting server...")63 akumulid.serve()64 time.sleep(5)65 nmsgs = 10000066 dt = datetime.datetime.utcnow() - (datetime.timedelta(milliseconds=1)*nmsgs)67 delta = datetime.timedelta(milliseconds=1)68 try:69 chan = att.TCPChan(HOST, TCPPORT)70 # fill data in71 print("Sending {0} messages through TCP...".format(nmsgs))72 tags = {73 "tag1": ['A'],74 "tag2": ['B', 'C'],75 "tag3": ['D', 'E', 'F', 'G', 'H'],76 }77 for it in att.generate_messages(dt, delta, nmsgs, 'test', **tags):78 chan.send(it)79 time.sleep(5) # wait untill all messagess will be processed80 print("Trying to close channel")81 chan.close()82 test_read_all(dt, delta, nmsgs)83 except:84 traceback.print_exc()85 sys.exit(1)86 finally:87 print("Stopping server...")88 akumulid.stop()89 time.sleep(5)90 print("Server stopped")91 print("Starting server...")92 akumulid.serve()93 time.sleep(5)94 print("Server started")95 try:96 test_read_all(dt, delta, nmsgs)97 # Try to write new data98 dt = datetime.datetime.utcnow()99 chan = att.TCPChan(HOST, TCPPORT)100 # fill data in101 print("Sending {0} messages through TCP second time...".format(nmsgs))102 tags = {103 "tag1": ['A'],104 "tag2": ['B', 'C'],105 "tag3": ['D', 'E', 'F', 'G', 'H'],106 }107 for it in att.generate_messages(dt, delta, nmsgs, 'test', **tags):108 chan.send(it)109 time.sleep(5)110 test_read_all(dt, delta, nmsgs)111 finally:112 print("Stopping server...")113 akumulid.stop()114 time.sleep(5)115if __name__ == '__main__':116 print(' '.join(sys.argv))117 if len(sys.argv) < 2:118 print("Not enough arguments")119 sys.exit(1)120 main(sys.argv[1])121else:...
test_user.py
Source:test_user.py
...26 user.password = '*** change for your environment ***'27 manager.update(user)28 log.info('===== updated user:{} =========='.format(user))29 return user30 def test_read_all(self):31 log.info('---------- test_read_all ----------')32 manager = UserManager(db)33 user_read_list = manager.read_all()34 log.info('===== S all users with len:{} ====='.format(len(user_read_list)))35 for user_read in user_read_list:36 log.info('===== read user_id:{}, user_read:{} =========='.format(user_read.user_id, user_read))37 log.info('===== E all users =====')38 def test_create_and_delete_guest_user(self):39 log.info('---------- test_create_and_delete_guest_user ----------')40 user = self.test_create_guest_user()41 self.test_read_all()42 manager = UserManager(db)43 manager.delete(user)44 log.info('===== updated user:{} =========='.format(user))45 self.test_read_all()46 def test_read_with_jwt(self):47 log.info('---------- test_signin ----------')48 manager = UserManager(db)49 # signin with guest user50 guest_jwt = self.test_signin()51 log.info('guest_jwt:{}'.format(guest_jwt))52 user = manager.read_with_jwt(guest_jwt)53 log.info('===== read user:{} ====='.format(user))54 assert user.user_id == '08170382ac'55 # with error56 invalid_signature_jwt = '{}{}'.format(guest_jwt[:-2], 'XX')57 log.info('invalid_signature_jwt:{}'.format(invalid_signature_jwt))58 try:59 user = manager.read_with_jwt(invalid_signature_jwt)...
test_student_records.py
Source:test_student_records.py
...11 12 def test_open_csv_file(self):13 self.test_open_csv_file = self.database.open_csv_file('grades.csv')14 self.assertIsNotNone(self.test_open_csv_file)15 def test_read_all(self):16 self.test_read_all = self.database.read_all()17 self.assertEqual(self.test_read_all, type(self.list_for_test))18 # def test_add_student(self):19 # self.test_add_student = self.database.add_student('nnabue', 'favour', '124-61-7788', 78, 90, 77, 84, 90, 'A')20 # self.assertIsNone(self.test_add_student)21 # def test_update_record(self):22 # self.test_update_record = self.database.update_record(65,77,87,90,64, 'B', '345-67-8901')23 # self.assertIsNone(self.test_read_all)24 # sel.assertEqual(self.test_read_all, None) 25 # 26 def test_delete_record(self):27 self.test_delete_record = self.database.delete_record('632-79-9939') 28 self.assertIsNone(self.test_delete_record)29 def test_above_50(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!