How to use expect_new method in autotest

Best Python code snippet using autotest_python

test_reistijden_measurement_site.py

Source:test_reistijden_measurement_site.py Github

copy

Full Screen

1from datetime import datetime2import pytest3from django.test import TestCase4from reistijden_v1.models import Camera, Lane, MeasurementLocation, MeasurementSite5@pytest.mark.django_db6class MeasurementSiteTest(TestCase):7 BASE_MEASUREMENT_SITE = {8 'name': 'foobar',9 'type': 'trajectory',10 'length': 2736,11 'version': '1.0',12 'reference_id': 'foobar',13 'measurement_locations': [14 {15 'index': 1,16 'lanes': [17 {18 'cameras': [19 {20 'status': 'on',21 'latitude': '52.372334',22 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',23 'longitude': '4.961688',24 'lane_number': -1,25 'view_direction': 112,26 }27 ],28 'specific_lane': '-1',29 }30 ],31 }32 ],33 }34 def _measurement_site_test(self, before, after, *, expect_new: bool):35 """36 Calls MeasurementSite.get_or_create twice, once with before json and then37 again with after, depending on the value of expect_new we either check38 if the counts of the objects are the same (i.e. no new measurement site39 was created, expect_new == False) or that they are different (i.e. a40 new measurement site was created, expect_new == True).41 """42 self.assertEqual(MeasurementSite.objects.count(), 0)43 self.assertEqual(MeasurementLocation.objects.count(), 0)44 self.assertEqual(Lane.objects.count(), 0)45 self.assertEqual(Camera.objects.count(), 0)46 MeasurementSite.get_or_create(before, datetime.now())47 measurement_site_count = MeasurementSite.objects.count()48 measurement_location_count = MeasurementLocation.objects.count()49 lane_count = Lane.objects.count()50 camera_count = Camera.objects.count()51 MeasurementSite.get_or_create(after, datetime.now())52 # The counts before and after the second call should not be equal if we53 # expect the second call to create a new measurement site, otherwise we54 # expect the counts to remain the same55 assertion = self.assertNotEqual if expect_new else self.assertEqual56 assertion(measurement_site_count, MeasurementSite.objects.count())57 assertion(measurement_location_count, MeasurementLocation.objects.count())58 assertion(lane_count, Lane.objects.count())59 assertion(camera_count, Camera.objects.count())60 def test_changes_in_measurement_site_should_lead_to_new_measurement_site(self):61 before = self.BASE_MEASUREMENT_SITE62 after = {63 'name': 'foobar',64 'type': 'trajectory',65 'length': 5000, # different vs BASE_MEASUREMENT_SITE66 'version': '1.0',67 'reference_id': 'foobar',68 'measurement_locations': [69 {70 'index': 1,71 'lanes': [72 {73 'cameras': [74 {75 'status': 'on',76 'latitude': '52.372334',77 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',78 'longitude': '4.961688',79 'lane_number': -1,80 'view_direction': 112,81 }82 ],83 'specific_lane': '-1',84 }85 ],86 },87 ],88 }89 self._measurement_site_test(before, after, expect_new=True)90 def test_changes_in_locations_should_lead_to_new_measurement_site(self):91 before = self.BASE_MEASUREMENT_SITE92 after = {93 'name': 'foobar',94 'type': 'trajectory',95 'length': 2736,96 'version': '1.0',97 'reference_id': 'foobar',98 'measurement_locations': [99 {100 'index': 2, # different vs BASE_MEASUREMENT_SITE101 'lanes': [102 {103 'cameras': [104 {105 'status': 'on',106 'latitude': '52.372334',107 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',108 'longitude': '4.961688',109 'lane_number': -1,110 'view_direction': 112,111 }112 ],113 'specific_lane': '-1',114 }115 ],116 }117 ],118 }119 self._measurement_site_test(before, after, expect_new=True)120 def test_changes_in_lanes_should_lead_to_new_measurement_site(self):121 before = self.BASE_MEASUREMENT_SITE122 after = {123 'name': 'foobar',124 'type': 'trajectory',125 'length': 2736,126 'version': '1.0',127 'reference_id': 'foobar',128 'measurement_locations': [129 {130 'index': 1,131 'lanes': [132 {133 'cameras': [134 {135 'status': 'on',136 'latitude': '52.372334',137 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',138 'longitude': '4.961688',139 'lane_number': -1,140 'view_direction': 112,141 }142 ],143 'specific_lane': 'lane1', # different vs BASE_MEASUREMENT_SITE144 }145 ],146 }147 ],148 }149 self._measurement_site_test(before, after, expect_new=True)150 def test_changes_in_cameras_should_lead_to_new_measurement_site(self):151 before = self.BASE_MEASUREMENT_SITE152 after = {153 'name': 'foobar',154 'type': 'trajectory',155 'length': 2736,156 'version': '1.0',157 'reference_id': 'foobar',158 'measurement_locations': [159 {160 'index': 1,161 'lanes': [162 {163 'cameras': [164 {165 'status': 'on',166 'latitude': '52.372334',167 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',168 'longitude': '4.961688',169 'lane_number': -1,170 'view_direction': 200, # different vs BASE_MEASUREMENT_SITE171 }172 ],173 'specific_lane': '-1',174 }175 ],176 }177 ],178 }179 self._measurement_site_test(before, after, expect_new=True)180 def test_no_changes_should_not_lead_to_new_measurement_site(self):181 before = self.BASE_MEASUREMENT_SITE182 after = self.BASE_MEASUREMENT_SITE183 self._measurement_site_test(before, after, expect_new=False)184 def test_different_ordering_of_keys_should_not_lead_to_new_measurement_site(self):185 before = self.BASE_MEASUREMENT_SITE186 after = {187 # keys in different order vs BASE_MEASUREMENT_SITE188 'reference_id': 'foobar',189 'version': '1.0',190 'length': 2736,191 'type': 'trajectory',192 'name': 'foobar',193 'measurement_locations': [194 {195 'index': 1,196 'lanes': [197 {198 'cameras': [199 {200 'status': 'on',201 'latitude': '52.372334',202 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',203 'longitude': '4.961688',204 'lane_number': -1,205 'view_direction': 112,206 }207 ],208 'specific_lane': '-1',209 }210 ],211 }212 ],213 }214 self._measurement_site_test(before, after, expect_new=False)215 # measurement site with multiple locations, lanes and cameras, all216 # of which are in an unnatural order. we can use this to check that217 # different ordering does not end up creating new measurement sites218 UNORDERED_MEASUREMENT_SITE = {219 'name': 'foobar',220 'type': 'trajectory',221 'length': 2736,222 'version': '1.0',223 'reference_id': 'foobar',224 'measurement_locations': [225 {226 'index': 1,227 'lanes': [228 {229 'cameras': [230 {231 'status': 'on',232 'latitude': '52.372334',233 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',234 'longitude': '4.961688',235 'lane_number': -1,236 'view_direction': 112,237 }238 ],239 'specific_lane': '-1',240 }241 ],242 },243 {244 'index': 2,245 'lanes': [246 {247 'cameras': [248 {249 'status': 'on',250 'latitude': '52.372334',251 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',252 'longitude': '4.961688',253 'lane_number': -1,254 'view_direction': 112,255 }256 ],257 'specific_lane': '-2',258 },259 {260 'cameras': [261 {262 'status': 'on',263 'latitude': '52.372334',264 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',265 'longitude': '4.961688',266 'lane_number': -1,267 'view_direction': 999,268 },269 {270 'status': 'on',271 'latitude': '52.372334',272 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',273 'longitude': '4.961688',274 'lane_number': -1,275 'view_direction': 112,276 },277 ],278 'specific_lane': '-1',279 },280 ],281 },282 ],283 }284 def test_different_order_should_not_lead_to_new_measurement_site(self):285 before = self.UNORDERED_MEASUREMENT_SITE286 # lists are in the correct order vs UNORDERED_MEASUREMENT_SITE287 after = {288 'name': 'foobar',289 'type': 'trajectory',290 'length': 2736,291 'version': '1.0',292 'reference_id': 'foobar',293 'measurement_locations': [294 {295 'index': 2,296 'lanes': [297 {298 'cameras': [299 {300 'status': 'on',301 'latitude': '52.372334',302 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',303 'longitude': '4.961688',304 'lane_number': -1,305 'view_direction': 112,306 },307 {308 'status': 'on',309 'latitude': '52.372334',310 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',311 'longitude': '4.961688',312 'lane_number': -1,313 'view_direction': 999,314 },315 ],316 'specific_lane': '-1',317 },318 {319 'cameras': [320 {321 'status': 'on',322 'latitude': '52.372334',323 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',324 'longitude': '4.961688',325 'lane_number': -1,326 'view_direction': 112,327 }328 ],329 'specific_lane': '-2',330 },331 ],332 },333 {334 'index': 1,335 'lanes': [336 {337 'cameras': [338 {339 'status': 'on',340 'latitude': '52.372334',341 'reference_id': '322aac3d-62c2-495b-afee-7deded30f0e7',342 'longitude': '4.961688',343 'lane_number': -1,344 'view_direction': 112,345 }346 ],347 'specific_lane': '-1',348 }349 ],350 },351 ],352 }...

Full Screen

Full Screen

HTTPClient.py

Source:HTTPClient.py Github

copy

Full Screen

1'''2 封装发送http请求的类3'''4import requests5from Common.Log import Log6from Conf.Config import Config7from Common.Mysql import connect_mysql8import json9from Common.Yaml_Operation import read_yaml_extract10log = Log().logger11class HTTPClient:12 index = 013 def __init__(self):14 '''15 一个项目的url,headers前缀一样的,参数类型也一样16 init方法每次实例化的时候,自动调用init方法17 '''18 self.init_url_headers()19 def get_mysql_data(self, sql):20 return connect_mysql(sql)21 def init_url_headers(self):22 '''23 初始化url和headers24 '''25 self.url = Config().get('url', 'api_url')26 self.headers = {27 'Content-Type': 'application/json'28 }29 self.re_list = []30 def send_request(self, method, name=None, data=None, headers=None, files=None):31 '''封装发送请求的方法32 :param method: 请求的方式,分为get、post、delete等33 :param name: 接口地址的后缀34 :param data: 接口参数35 :param headers: 接口请求头36 :return: 返回接口的返回值37 '''38 # 用例执行数量39 HTTPClient.index += 140 # headers判断逻辑41 if headers:42 # 转成字典类型43 for key, value in headers.items():44 # 若存在类似于${{get_token}}格式的,则将内容提取出来,放到self.headers里面,也就是所谓的参数化45 if value.startswith("${{") and value.endswith("}}"):46 value = value.split("{{")[1].split("}}")[0]47 value = read_yaml_extract(value)48 self.headers[key] = value49 # data判断逻辑50 if data:51 # 字典类型转换成json字符串52 if isinstance(data, dict):53 data = json.dumps(data)54 # 后缀不一定相同,但是前缀是一样的,拼接每次的请求的url55 self.url = self.url + name56 try:57 # python反射机制,这里相当于requests.method(),如requests.get()58 res = getattr(requests, method.lower())(url=self.url, headers=self.headers, data=data, files=files)59 log.info(f'>>>-----接口测试用例<{HTTPClient.index}>')60 log.info(f'接口地址:{self.url}')61 log.info(f'请求方法:{method}')62 log.info(f'接口参数:{data}')63 # 将接口返回值转换成python字典格式并打印日志64 log.info('接口响应值:{}'.format(res.json()))65 return res.json()66 except Exception as error:67 log.error('接口请求异常:{}'.format(error))68 finally:69 self.init_url_headers()70 def vaildate(self, expect, actual):71 '''72 校验测试结果的函数73 :param expect: 预期结果74 :param actual: 实际结果75 :return:76 '''77 for key, value in expect.items():78 # 预期的key在实际结果返回值的key里面79 if key in actual:80 # 将sql语句转换成数据库数据。如select * from token where id = 1 这个格式判断为一个数据库语句 select, update....81 if isinstance(value, str):82 if value.startswith('select') and 'from' in value or (83 value.startswith('update') and 'set' in value):84 # 从数据库中读取value85 value = self.get_mysql_data(value)86 # 预期结果与实际结果进行比较87 try:88 assert value == actual[key]89 log.debug(f'接口实际结果与预期结果比对一致-->实际结果为[{key}:{actual[key]}]预期结果为[{key}:{value}]')90 self.re_list.append('True')91 except AssertionError:92 log.error(f'接口的实际结果与预期结果不一致-->实际结果为[{key}:{actual[key]}]预期结果为[{key}:{value}]')93 self.re_list.append('False')94 else:95 for _key, _value in actual.items():96 if isinstance(_value, dict) and (key in _value):97 expect_new = {}98 expect_new[key] = value99 self.vaildate(expect_new, _value)100 # 递归,重新走判断逻辑101 elif isinstance(_value, list):102 for i in _value:103 if key in i:104 expect_new = {}105 expect_new[key] = value106 self.vaildate(expect_new, i)107 if self.re_list.count('False') > 0:108 return False109 else:110 return True111if __name__ == '__main__':112 HTTPClient().send_request(method='get', name='demo')113#114# para = {115# "username": "admin",116# "password": "123456"117# }118# HTTPClient().send_request(method='post', name='login', data=para)...

Full Screen

Full Screen

datachunk.py

Source:datachunk.py Github

copy

Full Screen

1#!/usr/bin/env python2from io import BytesIO3from struct import pack, unpack4class DataChunk:5 6 def __init__(self):7 self.times = 08 self.expect_new = True9 self.current_msg_len = 010 self.current_buffer = BytesIO()11 def handle_msg(self, data):12 self.handle_chunk(data)13 14 def handle_chunk(self, data):15 self.times += 116 print len(data)17 18 def process_chunk(self, data):19 datalen = len(data)20 if self.expect_new:21 if datalen >= 4:22 self.current_msg_len = unpack('i', data[0:4])[0]23 if (self.current_msg_len + 4) == datalen:24 # We got the entire packet 25 self.handle_msg(data[4:])26 return27 elif (self.current_msg_len + 4) > datalen:28 # We need some more bytes29 self.current_buffer.write(data[4:])30 self.expect_new = False31 else:32 # We may have got more than one message33 start = 434 while True:35 self.handle_msg(data[start : start + self.current_msg_len])36 start = start + self.current_msg_len37 if start == datalen:38 # We finished all the bytes and there is no incomplete messages39 # in the bytes40 self.expect_new = True41 self.current_msg_len = -142 self.current_buffer = BytesIO()43 break44 if 4 <= (datalen - start):45 self.current_msg_len = unpack('i', data[start : start + 4])[0]46 start += 447 if (datalen - start) >= self.current_msg_len:48 # we have this message also in this buffer49 continue50 else:51 # This message is incomplete, wait for the next chunk52 self.expect_new = False53 self.current_buffer = BytesIO()54 self.current_buffer.write(data[start:])55 break56 else:57 # we don't even know the size of the current buffer58 self.current_msg_len = -159 self.current_buffer = BytesIO()60 self.current_buffer.write(data[start:])61 self.expect_new = False62 break63 else:64 # We haven't even received 4 bytes of data for this brand new 65 # packet66 self.expect_new = False67 self.current_buffer = BytesIO()68 self.current_buffer.write(data)69 self.current_msg_len = -170 else:71 # Not a new message72 start = 073 if self.current_msg_len == -1:74 # try to get the message len75 if datalen >= (4 - self.current_buffer.tell()):76 #get the length of the data77 start = 4 - self.current_buffer.tell()78 self.current_buffer.write(data[0: start])79 self.current_buffer.seek(0)80 self.current_msg_len = unpack('i', self.current_buffer.read())[0]81 self.current_buffer = BytesIO()82 else:83 # Till now even the size of the data is not known84 self.current_buffer.write(data)85 return86 while start < datalen:87 if self.current_buffer is None:88 self.current_buffer = BytesIO()89 if self.current_msg_len == -1:90 if (datalen - start) < 4:91 self.current_buffer.write(data[start:])92 break93 elif (datalen - start) == 4:94 self.current_msg_len = unpack('i', data[start:])[0]95 break96 else:97 self.current_msg_len = unpack('i', data[start: start + 4])[0]98 start += 499 if (datalen - start) >= (self.current_msg_len - self.current_buffer.tell()):100 consume = self.current_msg_len - self.current_buffer.tell()101 self.current_buffer.write(data[start: start + consume])102 start += consume103 self.current_msg_len = - 1104 self.current_buffer.seek(0)105 self.handle_msg(self.current_buffer.read())106 self.current_buffer = BytesIO()107 if start == datalen:108 self.expect_new = True109 else:110 self.current_buffer.write(data[start:])...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful