Best Python code snippet using Testify_python
test_countifs.py
Source:test_countifs.py
...210 (9, 2),211 ],212 },213]214def prepare_test_case(215 spark_context: SparkContext, test_case: Dict[str, List[Any]]216) -> Tuple[RDD, RDD, List[Any]]:217 data_rdd = spark_context.parallelize(218 enumerate(219 map(lambda p: p if isinstance(p, tuple) else (p,), test_case["data_points"])220 )221 )222 query_rdd = spark_context.parallelize(223 enumerate(224 map(225 lambda p: p if isinstance(p, tuple) else (p,),226 sorted(test_case["query_points"]),227 )228 )229 )230 return data_rdd, query_rdd, test_case["expected_result"]231@pytest.mark.parametrize("n_partitions", [1, 2, 4])232@pytest.mark.parametrize("test_case", TESTS_1D)233def test_algorithm_execution_1d(spark_context, n_partitions, test_case):234 data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)235 countifs = Countifs(spark_context, n_partitions)236 result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=1).collect()237 assert len(result) == len(expected_result)238 assert result == expected_result239TESTS_2D = [240 {241 "data_points": [(3, 6), (4, 2)],242 "query_points": [(0, 5), (7, 1)],243 "expected_result": [(0, 1), (1, 0)],244 },245 {246 "query_points": [(0, 5), (7, 1)],247 "data_points": [(3, 6), (4, 2)],248 "expected_result": [(0, 1), (1, 0)],249 },250 {251 "query_points": [(100, 100), (102, 102)],252 "data_points": [(103, 480), (1178, 101)],253 "expected_result": [(0, 2), (1, 1)],254 },255 {256 "query_points": [(100, 100), (102, 102), (104, 104), (106, 106)],257 "data_points": [(1178, 101), (103, 480), (105, 1771), (1243, 107)],258 "expected_result": [(0, 4), (1, 3), (2, 2), (3, 1)],259 },260 {261 "query_points": [(100, 100), (102, 102)],262 "data_points": [(103, 480), (105, 1771), (1178, 101), (1243, 107)],263 "expected_result": [(0, 4), (1, 3)],264 },265]266@pytest.mark.parametrize("n_partitions", [1, 2, 4])267@pytest.mark.parametrize("test_case", TESTS_2D)268def test_algorithm_execution_2d(spark_context, n_partitions, test_case):269 data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)270 countifs = Countifs(spark_context, n_partitions)271 result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=2).collect()272 assert len(result) == len(expected_result)273 assert result == expected_result274TESTS_3D = [275 {276 "query_points": [(100, 100, 100), (102, 102, 102)],277 "data_points": [278 (2137, 103, 480),279 (105, 2137, 1771),280 (1178, 101, 2137),281 (2137, 1243, 107),282 ],283 "expected_result": [(0, 4), (1, 3)],284 }285]286@pytest.mark.parametrize("n_partitions", [1, 2, 4])287@pytest.mark.parametrize("test_case", TESTS_3D)288def test_algorithm_execution_3d(spark_context, n_partitions, test_case):289 data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)290 countifs = Countifs(spark_context, n_partitions)291 result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=3).collect()292 assert len(result) == len(expected_result)293 assert result == expected_result294def random_test_case(n_data_points: int, n_query_points: int, n_dim: int):295 min_coord_value = 100296 max_coord_value = 100 + 100 * (n_query_points + n_data_points)297 def make_query_point(i):298 return tuple(min_coord_value + 2 * i for _ in range(n_dim))299 query_points = [make_query_point(i) for i in range(n_query_points)]300 data_points_per_query_point = n_data_points // n_query_points301 data_points_rest = n_data_points - n_query_points * data_points_per_query_point302 def make_data_point(min_val, max_val, global_max_val):303 """304 One of the coords will be between (min_val, max_val),305 rest of the coords will be between (min_val, global_max_val)306 """307 random_coord = random.randint(0, n_dim - 1)308 coords = [random.randint(min_val, global_max_val) for _ in range(n_dim)]309 coords[random_coord] = random.randint(min_val, max_val)310 return tuple(coords)311 # start with random data points which are smaller than all query points312 data_points = [313 make_data_point(0, min_coord_value, max_coord_value)314 for _ in range(data_points_rest)315 ]316 for i in range(n_query_points):317 # add data point in L-shape, with all dimensions > query point dimensions318 data_points_for_query = [319 make_data_point(320 min_coord_value + 2 * i + 1,321 min_coord_value + 2 * i + 1,322 max_coord_value,323 )324 for _ in range(data_points_per_query_point)325 ]326 data_points += data_points_for_query327 random.shuffle(data_points)328 expected_result = [329 (i, data_points_per_query_point * (n_query_points - i))330 for i in range(n_query_points)331 ]332 assert expected_result[-1] == (n_query_points - 1, data_points_per_query_point)333 assert (334 len(data_points) == n_data_points335 ), f"got: {len(data_points)}, expected: {n_data_points}"336 return {337 "data_points": data_points,338 "query_points": query_points,339 "expected_result": expected_result,340 }341LONG_N_PARTITIONS = [1, 2, 3, 4, 8, 16]342RANDOM_TESTS_1D = [343 random_test_case(10, 10, 1),344 random_test_case(1_000, 10, 1),345 random_test_case(1_000, 100, 1),346 random_test_case(1_000, 1_000, 1),347 # random_test_case(100_000, 10, 1),348 # random_test_case(100_000, 100, 1),349 # random_test_case(100_000, 1_000, 1),350 # random_test_case(100_000, 10_000, 1),351]352@pytest.mark.long353@pytest.mark.parametrize("n_partitions", LONG_N_PARTITIONS)354@pytest.mark.parametrize("test_case", RANDOM_TESTS_1D)355def test_algorithm_performance_1d(spark_context, n_partitions, test_case):356 data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)357 countifs = Countifs(spark_context, n_partitions)358 result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=1).collect()359 assert len(result) == len(expected_result)360 assert result == expected_result361RANDOM_TESTS_2D = [362 random_test_case(10, 10, 2),363 random_test_case(1_000, 10, 2),364 random_test_case(1_000, 100, 2),365 random_test_case(1_000, 1_000, 2),366 # random_test_case(100_000, 10, 2),367 # random_test_case(100_000, 100, 2),368 # random_test_case(100_000, 1_000, 2),369 # random_test_case(100_000, 10_000, 2),370]371@pytest.mark.long372@pytest.mark.parametrize("n_partitions", LONG_N_PARTITIONS)373@pytest.mark.parametrize("test_case", RANDOM_TESTS_2D)374def test_algorithm_performance_2d(spark_context, n_partitions, test_case):375 data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)376 countifs = Countifs(spark_context, n_partitions)377 result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=2).collect()378 assert len(result) == len(expected_result)379 assert result == expected_result380RANDOM_TESTS_3D = [381 random_test_case(10, 10, 3),382 random_test_case(1_000, 10, 3),383 random_test_case(1_000, 100, 3),384 random_test_case(1_000, 1_000, 3),385 # random_test_case(100_000, 10, 3),386 # random_test_case(100_000, 100, 3),387 # random_test_case(100_000, 1_000, 3),388 # random_test_case(100_000, 10_000, 3),389]390@pytest.mark.long391@pytest.mark.parametrize("n_partitions", LONG_N_PARTITIONS)392@pytest.mark.parametrize("test_case", RANDOM_TESTS_3D)393def test_algorithm_performance_3d(spark_context, n_partitions, test_case):394 data_rdd, query_rdd, expected_result = prepare_test_case(spark_context, test_case)395 countifs = Countifs(spark_context, n_partitions)396 result = countifs(data_rdd=data_rdd, query_rdd=query_rdd, n_dim=3).collect()397 assert len(result) == len(expected_result)...
test_runner_test.py
Source:test_runner_test.py
...7from testify import test_runner8from .test_runner_subdir.inheriting_class import InheritingClass9prepared = False10running = False11def prepare_test_case(options, test_case):12 global prepared13 prepared = True14def run_test_case(options, test_case, runnable):15 global running16 running = True17 try:18 return runnable()19 finally:20 running = False21def add_testcase_info(test_case, runner):22 test_case.__testattr__ = True23class TestTestRunnerGetTestMethodName(test_case.TestCase):24 def test_method_from_other_module_reports_class_module(self):25 ret = test_runner.TestRunner.get_test_method_name(...
add_validation_case.py
Source:add_validation_case.py
...12# get event data from production SKIP13skip = SKIP('https://skip.eatws.net',14 secret_id='skip-prod-readonly-access')15logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)16def prepare_test_case(evid, inventory=None, waveforms=None):17 event: Optional[Event] = skip.get_event(evid)18 if not event:19 raise ValueError('%s not found in SKIP' % evid)20 eqinfo = dict(21 id=event.id,22 lat=event.latitude,23 lon=event.longitude,24 dep=event.depth_km,25 time=UTCDateTime(event.event_time),26 )27 datadir = abspath(DATA_DIR)28 result = runwphase(29 server='IRIS',30 eqinfo=eqinfo,31 save_waveforms=join(datadir, '%s.mseed' % evid),32 save_inventory=join(datadir, '%s.xml' % evid),33 inventory=inventory,34 waveforms=waveforms,35 )36 MT = result.MomentTensor37 case = dict(38 id=event.id,39 lat=event.latitude,40 lon=event.longitude,41 dep=event.depth_km,42 time=event.event_time,43 _expected_results={k: getattr(MT, k) for k in result_keys},44 )45 if result.QualityParams is not None:46 case["_expected_results"]["azimuthal_gap"] = result.QualityParams.azimuthal_gap47 print(json.dumps(case, indent=4))48 add_case(case)49 print("This test case has been added to validation_cases.json and test-datasets/.")50 print("To create a new release tarball: "51 "tar czvf ga-wphase-test-datasets.tar.gz test-datasets/ validation_cases.json")52if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!