Best Python code snippet using robotframework-pageobjects_python
test_highlevel_open_tcp_stream.py
Source:test_highlevel_open_tcp_stream.py
...217 assert ip in self.ip_dict218 if socket is not succeeded:219 assert socket.closed220 assert socket.port == self.port221async def run_scenario(222 # The port to connect to223 port,224 # A list of225 # (ip, delay, result)226 # tuples, where delay is in seconds and result is "success" or "error"227 # The ip's will be returned from getaddrinfo in this order, and then228 # connect() calls to them will have the given result.229 ip_list,230 *,231 # If False, AF_INET4/6 sockets error out on creation, before connect is232 # even called.233 ipv4_supported=True,234 ipv6_supported=True,235 # Normally, we return (winning_sock, scenario object)236 # If this is True, we require there to be an exception, and return237 # (exception, scenario object)238 expect_error=(),239 **kwargs,240):241 supported_families = set()242 if ipv4_supported:243 supported_families.add(trio.socket.AF_INET)244 if ipv6_supported:245 supported_families.add(trio.socket.AF_INET6)246 scenario = Scenario(port, ip_list, supported_families)247 trio.socket.set_custom_hostname_resolver(scenario)248 trio.socket.set_custom_socket_factory(scenario)249 try:250 stream = await open_tcp_stream("test.example.com", port, **kwargs)251 assert expect_error == ()252 scenario.check(stream.socket)253 return (stream.socket, scenario)254 except AssertionError: # pragma: no cover255 raise256 except expect_error as exc:257 scenario.check(None)258 return (exc, scenario)259async def test_one_host_quick_success(autojump_clock):260 sock, scenario = await run_scenario(80, [("1.2.3.4", 0.123, "success")])261 assert sock.ip == "1.2.3.4"262 assert trio.current_time() == 0.123263async def test_one_host_slow_success(autojump_clock):264 sock, scenario = await run_scenario(81, [("1.2.3.4", 100, "success")])265 assert sock.ip == "1.2.3.4"266 assert trio.current_time() == 100267async def test_one_host_quick_fail(autojump_clock):268 exc, scenario = await run_scenario(269 82, [("1.2.3.4", 0.123, "error")], expect_error=OSError270 )271 assert isinstance(exc, OSError)272 assert trio.current_time() == 0.123273async def test_one_host_slow_fail(autojump_clock):274 exc, scenario = await run_scenario(275 83, [("1.2.3.4", 100, "error")], expect_error=OSError276 )277 assert isinstance(exc, OSError)278 assert trio.current_time() == 100279async def test_one_host_failed_after_connect(autojump_clock):280 exc, scenario = await run_scenario(281 83, [("1.2.3.4", 1, "postconnect_fail")], expect_error=KeyboardInterrupt282 )283 assert isinstance(exc, KeyboardInterrupt)284# With the default 0.250 second delay, the third attempt will win285async def test_basic_fallthrough(autojump_clock):286 sock, scenario = await run_scenario(287 80,288 [289 ("1.1.1.1", 1, "success"),290 ("2.2.2.2", 1, "success"),291 ("3.3.3.3", 0.2, "success"),292 ],293 )294 assert sock.ip == "3.3.3.3"295 # current time is default time + default time + connection time296 assert trio.current_time() == (0.250 + 0.250 + 0.2)297 assert scenario.connect_times == {298 "1.1.1.1": 0,299 "2.2.2.2": 0.250,300 "3.3.3.3": 0.500,301 }302async def test_early_success(autojump_clock):303 sock, scenario = await run_scenario(304 80,305 [306 ("1.1.1.1", 1, "success"),307 ("2.2.2.2", 0.1, "success"),308 ("3.3.3.3", 0.2, "success"),309 ],310 )311 assert sock.ip == "2.2.2.2"312 assert trio.current_time() == (0.250 + 0.1)313 assert scenario.connect_times == {314 "1.1.1.1": 0,315 "2.2.2.2": 0.250,316 # 3.3.3.3 was never even started317 }318# With a 0.450 second delay, the first attempt will win319async def test_custom_delay(autojump_clock):320 sock, scenario = await run_scenario(321 80,322 [323 ("1.1.1.1", 1, "success"),324 ("2.2.2.2", 1, "success"),325 ("3.3.3.3", 0.2, "success"),326 ],327 happy_eyeballs_delay=0.450,328 )329 assert sock.ip == "1.1.1.1"330 assert trio.current_time() == 1331 assert scenario.connect_times == {332 "1.1.1.1": 0,333 "2.2.2.2": 0.450,334 "3.3.3.3": 0.900,335 }336async def test_custom_errors_expedite(autojump_clock):337 sock, scenario = await run_scenario(338 80,339 [340 ("1.1.1.1", 0.1, "error"),341 ("2.2.2.2", 0.2, "error"),342 ("3.3.3.3", 10, "success"),343 # .25 is the default timeout344 ("4.4.4.4", 0.25, "success"),345 ],346 )347 assert sock.ip == "4.4.4.4"348 assert trio.current_time() == (0.1 + 0.2 + 0.25 + 0.25)349 assert scenario.connect_times == {350 "1.1.1.1": 0,351 "2.2.2.2": 0.1,352 "3.3.3.3": 0.1 + 0.2,353 "4.4.4.4": 0.1 + 0.2 + 0.25,354 }355async def test_all_fail(autojump_clock):356 exc, scenario = await run_scenario(357 80,358 [359 ("1.1.1.1", 0.1, "error"),360 ("2.2.2.2", 0.2, "error"),361 ("3.3.3.3", 10, "error"),362 ("4.4.4.4", 0.250, "error"),363 ],364 expect_error=OSError,365 )366 assert isinstance(exc, OSError)367 assert isinstance(exc.__cause__, trio.MultiError)368 assert len(exc.__cause__.exceptions) == 4369 assert trio.current_time() == (0.1 + 0.2 + 10)370 assert scenario.connect_times == {371 "1.1.1.1": 0,372 "2.2.2.2": 0.1,373 "3.3.3.3": 0.1 + 0.2,374 "4.4.4.4": 0.1 + 0.2 + 0.25,375 }376async def test_multi_success(autojump_clock):377 sock, scenario = await run_scenario(378 80,379 [380 ("1.1.1.1", 0.5, "error"),381 ("2.2.2.2", 10, "success"),382 ("3.3.3.3", 10 - 1, "success"),383 ("4.4.4.4", 10 - 2, "success"),384 ("5.5.5.5", 0.5, "error"),385 ],386 happy_eyeballs_delay=1,387 )388 assert not scenario.sockets["1.1.1.1"].succeeded389 assert (390 scenario.sockets["2.2.2.2"].succeeded391 or scenario.sockets["3.3.3.3"].succeeded392 or scenario.sockets["4.4.4.4"].succeeded393 )394 assert not scenario.sockets["5.5.5.5"].succeeded395 assert sock.ip in ["2.2.2.2", "3.3.3.3", "4.4.4.4"]396 assert trio.current_time() == (0.5 + 10)397 assert scenario.connect_times == {398 "1.1.1.1": 0,399 "2.2.2.2": 0.5,400 "3.3.3.3": 1.5,401 "4.4.4.4": 2.5,402 "5.5.5.5": 3.5,403 }404async def test_does_reorder(autojump_clock):405 sock, scenario = await run_scenario(406 80,407 [408 ("1.1.1.1", 10, "error"),409 # This would win if we tried it first...410 ("2.2.2.2", 1, "success"),411 # But in fact we try this first, because of section 5.4412 ("::3", 0.5, "success"),413 ],414 happy_eyeballs_delay=1,415 )416 assert sock.ip == "::3"417 assert trio.current_time() == 1 + 0.5418 assert scenario.connect_times == {419 "1.1.1.1": 0,420 "::3": 1,421 }422async def test_handles_no_ipv4(autojump_clock):423 sock, scenario = await run_scenario(424 80,425 # Here the ipv6 addresses fail at socket creation time, so the connect426 # configuration doesn't matter427 [428 ("::1", 10, "success"),429 ("2.2.2.2", 0, "success"),430 ("::3", 0.1, "success"),431 ("4.4.4.4", 0, "success"),432 ],433 happy_eyeballs_delay=1,434 ipv4_supported=False,435 )436 assert sock.ip == "::3"437 assert trio.current_time() == 1 + 0.1438 assert scenario.connect_times == {439 "::1": 0,440 "::3": 1.0,441 }442async def test_handles_no_ipv6(autojump_clock):443 sock, scenario = await run_scenario(444 80,445 # Here the ipv6 addresses fail at socket creation time, so the connect446 # configuration doesn't matter447 [448 ("::1", 0, "success"),449 ("2.2.2.2", 10, "success"),450 ("::3", 0, "success"),451 ("4.4.4.4", 0.1, "success"),452 ],453 happy_eyeballs_delay=1,454 ipv6_supported=False,455 )456 assert sock.ip == "4.4.4.4"457 assert trio.current_time() == 1 + 0.1458 assert scenario.connect_times == {459 "2.2.2.2": 0,460 "4.4.4.4": 1.0,461 }462async def test_no_hosts(autojump_clock):463 exc, scenario = await run_scenario(80, [], expect_error=OSError)464 assert "no results found" in str(exc)465async def test_cancel(autojump_clock):466 with trio.move_on_after(5) as cancel_scope:467 exc, scenario = await run_scenario(468 80,469 [470 ("1.1.1.1", 10, "success"),471 ("2.2.2.2", 10, "success"),472 ("3.3.3.3", 10, "success"),473 ("4.4.4.4", 10, "success"),474 ],475 expect_error=trio.MultiError,476 )477 # What comes out should be 1 or more Cancelled errors that all belong478 # to this cancel_scope; this is the easiest way to check that479 raise exc480 assert cancel_scope.cancelled_caught481 assert trio.current_time() == 5...
transforms_test.py
Source:transforms_test.py
...56Nested = typing.NamedTuple(57 'Nested', [('id', int), ('animal_speed', AnimalSpeed)])58coders.registry.register_coder(Nested, coders.RowCoder)59class TransformTest(unittest.TestCase):60 def run_scenario(self, input, func):61 expected = func(input)62 empty = input.iloc[0:0]63 input_placeholder = expressions.PlaceholderExpression(empty)64 input_deferred = frame_base.DeferredFrame.wrap(input_placeholder)65 actual_deferred = func(input_deferred)._expr.evaluate_at(66 expressions.Session({input_placeholder: input}))67 check_correct(expected, actual_deferred)68 with beam.Pipeline() as p:69 input_pcoll = p | beam.Create([input.iloc[::2], input.iloc[1::2]])70 input_df = convert.to_dataframe(input_pcoll, proxy=empty)71 output_df = func(input_df)72 output_proxy = output_df._expr.proxy()73 if isinstance(output_proxy, pd.core.generic.NDFrame):74 self.assertTrue(75 output_proxy.iloc[:0].equals(expected.iloc[:0]),76 (77 'Output proxy is incorrect:\n'78 f'Expected:\n{expected.iloc[:0]}\n\n'79 f'Actual:\n{output_proxy.iloc[:0]}'))80 else:81 self.assertEqual(type(output_proxy), type(expected))82 output_pcoll = convert.to_pcollection(output_df, yield_elements='pandas')83 assert_that(84 output_pcoll, lambda actual: check_correct(expected, concat(actual)))85 def test_identity(self):86 df = pd.DataFrame({87 'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],88 'Speed': [380., 370., 24., 26.]89 })90 self.run_scenario(df, lambda x: x)91 def test_groupby_sum_mean(self):92 df = pd.DataFrame({93 'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],94 'Speed': [380., 370., 24., 26.]95 })96 self.run_scenario(df, lambda df: df.groupby('Animal').sum())97 with expressions.allow_non_parallel_operations():98 self.run_scenario(df, lambda df: df.groupby('Animal').mean())99 self.run_scenario(100 df, lambda df: df.loc[df.Speed > 25].groupby('Animal').sum())101 def test_groupby_apply(self):102 df = pd.DataFrame({103 'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],104 'foo': [None if i % 11 == 0 else i for i in range(100)],105 'bar': [None if i % 7 == 0 else 99 - i for i in range(100)],106 'baz': [None if i % 13 == 0 else i * 2 for i in range(100)],107 })108 def median_sum_fn(x):109 return (x.foo + x.bar).median()110 describe = lambda df: df.describe()111 self.run_scenario(df, lambda df: df.groupby('group').foo.apply(describe))112 self.run_scenario(113 df, lambda df: df.groupby('group')[['foo', 'bar']].apply(describe))114 self.run_scenario(df, lambda df: df.groupby('group').apply(median_sum_fn))115 self.run_scenario(116 df,117 lambda df: df.set_index('group').foo.groupby(level=0).apply(describe))118 self.run_scenario(df, lambda df: df.groupby(level=0).apply(median_sum_fn))119 self.run_scenario(120 df, lambda df: df.groupby(lambda x: x % 3).apply(describe))121 def test_filter(self):122 df = pd.DataFrame({123 'Animal': ['Aardvark', 'Ant', 'Elephant', 'Zebra'],124 'Speed': [5, 2, 35, 40]125 })126 self.run_scenario(df, lambda df: df.filter(items=['Animal']))127 self.run_scenario(df, lambda df: df.filter(regex='Anim.*'))128 self.run_scenario(129 df, lambda df: df.set_index('Animal').filter(regex='F.*', axis='index'))130 with expressions.allow_non_parallel_operations():131 a = pd.DataFrame({'col': [1, 2, 3]})132 self.run_scenario(a, lambda a: a.agg(sum))133 self.run_scenario(a, lambda a: a.agg(['mean', 'min', 'max']))134 def test_scalar(self):135 with expressions.allow_non_parallel_operations():136 a = pd.Series([1, 2, 6])137 self.run_scenario(a, lambda a: a.agg(sum))138 self.run_scenario(a, lambda a: a / a.agg(sum))139 # Tests scalar being used as an input to a downstream stage.140 df = pd.DataFrame({'key': ['a', 'a', 'b'], 'val': [1, 2, 6]})141 self.run_scenario(142 df, lambda df: df.groupby('key').sum().val / df.val.agg(sum))143 def test_getitem_projection(self):144 df = pd.DataFrame({145 'Animal': ['Aardvark', 'Ant', 'Elephant', 'Zebra'],146 'Speed': [5, 2, 35, 40],147 'Size': ['Small', 'Extra Small', 'Large', 'Medium']148 })149 self.run_scenario(df, lambda df: df[['Speed', 'Size']])150 def test_offset_elementwise(self):151 s = pd.Series(range(10)).astype(float)152 df = pd.DataFrame({'value': s, 'square': s * s, 'cube': s * s * s})153 # Only those values that are both squares and cubes will intersect.154 self.run_scenario(155 df,156 lambda df: df.set_index('square').value + df.set_index('cube').value)157 def test_batching_named_tuple_input(self):158 with beam.Pipeline() as p:159 result = (160 p | beam.Create([161 AnimalSpeed('Aardvark', 5),162 AnimalSpeed('Ant', 2),163 AnimalSpeed('Elephant', 35),164 AnimalSpeed('Zebra', 40)165 ]).with_output_types(AnimalSpeed)166 | transforms.DataframeTransform(lambda df: df.filter(regex='Anim.*')))167 assert_that(168 result,169 equal_to([('Aardvark', ), ('Ant', ), ('Elephant', ), ('Zebra', )]))170 def test_batching_beam_row_input(self):171 with beam.Pipeline() as p:172 result = (173 p174 | beam.Create([(u'Falcon', 380.), (u'Falcon', 370.), (u'Parrot', 24.),175 (u'Parrot', 26.)])176 | beam.Map(lambda tpl: beam.Row(Animal=tpl[0], Speed=tpl[1]))177 | transforms.DataframeTransform(178 lambda df: df.groupby('Animal').mean(), include_indexes=True))179 assert_that(result, equal_to([('Falcon', 375.), ('Parrot', 25.)]))180 def test_batching_beam_row_to_dataframe(self):181 with beam.Pipeline() as p:182 df = convert.to_dataframe(183 p184 | beam.Create([(u'Falcon', 380.), (u'Falcon', 370.), (185 u'Parrot', 24.), (u'Parrot', 26.)])186 | beam.Map(lambda tpl: beam.Row(Animal=tpl[0], Speed=tpl[1])))187 result = convert.to_pcollection(188 df.groupby('Animal').mean(), include_indexes=True)189 assert_that(result, equal_to([('Falcon', 375.), ('Parrot', 25.)]))190 def test_batching_passthrough_nested_schema(self):191 with beam.Pipeline() as p:192 nested_schema_pc = (193 p | beam.Create([Nested(1, AnimalSpeed('Aardvark', 5))194 ]).with_output_types(Nested))195 result = nested_schema_pc | transforms.DataframeTransform( # pylint: disable=expression-not-assigned196 lambda df: df.filter(items=['animal_speed']))197 assert_that(result, equal_to([(('Aardvark', 5), )]))198 def test_batching_passthrough_nested_array(self):199 Array = typing.NamedTuple(200 'Array', [('id', int), ('business_numbers', typing.Sequence[int])])201 coders.registry.register_coder(Array, coders.RowCoder)202 with beam.Pipeline() as p:203 array_schema_pc = (p | beam.Create([Array(1, [7, 8, 9])]))204 result = array_schema_pc | transforms.DataframeTransform( # pylint: disable=expression-not-assigned205 lambda df: df.filter(items=['business_numbers']))206 assert_that(result, equal_to([([7, 8, 9], )]))207 def test_unbatching_series(self):208 with beam.Pipeline() as p:209 result = (210 p211 | beam.Create([(u'Falcon', 380.), (u'Falcon', 370.), (u'Parrot', 24.),212 (u'Parrot', 26.)])213 | beam.Map(lambda tpl: beam.Row(Animal=tpl[0], Speed=tpl[1]))214 | transforms.DataframeTransform(lambda df: df.Animal))215 assert_that(result, equal_to(['Falcon', 'Falcon', 'Parrot', 'Parrot']))216 def test_input_output_polymorphism(self):217 one_series = pd.Series([1])218 two_series = pd.Series([2])219 three_series = pd.Series([3])220 proxy = one_series[:0]221 def equal_to_series(expected):222 def check(actual):223 actual = pd.concat(actual)224 if not expected.equals(actual):225 raise AssertionError(226 'Series not equal: \n%s\n%s\n' % (expected, actual))227 return check228 with beam.Pipeline() as p:229 one = p | 'One' >> beam.Create([one_series])230 two = p | 'Two' >> beam.Create([two_series])231 assert_that(232 one | 'PcollInPcollOut' >> transforms.DataframeTransform(233 lambda x: 3 * x, proxy=proxy, yield_elements='pandas'),234 equal_to_series(three_series),235 label='CheckPcollInPcollOut')236 assert_that(237 (one, two)238 | 'TupleIn' >> transforms.DataframeTransform(239 lambda x, y: (x + y), (proxy, proxy), yield_elements='pandas'),240 equal_to_series(three_series),241 label='CheckTupleIn')242 assert_that(243 dict(x=one, y=two)244 | 'DictIn' >> transforms.DataframeTransform(245 lambda x,246 y: (x + y),247 proxy=dict(x=proxy, y=proxy),248 yield_elements='pandas'),249 equal_to_series(three_series),250 label='CheckDictIn')251 double, triple = one | 'TupleOut' >> transforms.DataframeTransform(252 lambda x: (2*x, 3*x), proxy, yield_elements='pandas')253 assert_that(double, equal_to_series(two_series), 'CheckTupleOut0')254 assert_that(triple, equal_to_series(three_series), 'CheckTupleOut1')255 res = one | 'DictOut' >> transforms.DataframeTransform(256 lambda x: {'res': 3 * x}, proxy, yield_elements='pandas')257 assert_that(res['res'], equal_to_series(three_series), 'CheckDictOut')258 def test_cat(self):259 # verify that cat works with a List[Series] since this is260 # missing from doctests261 df = pd.DataFrame({262 'one': ['A', 'B', 'C'],263 'two': ['BB', 'CC', 'A'],264 'three': ['CCC', 'AA', 'B'],265 })266 self.run_scenario(df, lambda df: df.two.str.cat([df.three], join='outer'))267 self.run_scenario(268 df, lambda df: df.one.str.cat([df.two, df.three], join='outer'))269 def test_repeat(self):270 # verify that repeat works with a Series since this is271 # missing from doctests272 df = pd.DataFrame({273 'strings': ['A', 'B', 'C', 'D', 'E'],274 'repeats': [3, 1, 4, 5, 2],275 })276 self.run_scenario(df, lambda df: df.strings.str.repeat(df.repeats))277 def test_rename(self):278 df = pd.DataFrame({"A": [1, 2, 3], "B": [4, 5, 6]})279 self.run_scenario(280 df, lambda df: df.rename(columns={'B': 'C'}, index={281 0: 2, 2: 0282 }))283 with expressions.allow_non_parallel_operations():284 self.run_scenario(285 df,286 lambda df: df.rename(287 columns={'B': 'C'}, index={288 0: 2, 2: 0289 }, errors='raise'))290class TransformPartsTest(unittest.TestCase):291 def test_rebatch(self):292 with beam.Pipeline() as p:293 sA = pd.Series(range(1000))294 sB = sA * sA295 pcA = p | 'CreatePCollA' >> beam.Create([('k0', sA[::3]),296 ('k1', sA[1::3]),297 ('k2', sA[2::3])])298 pcB = p | 'CreatePCollB' >> beam.Create([('k0', sB[::3]),...
test_unitconv.py
Source:test_unitconv.py
...71 self.assertDictEqual(72 unitconv.parse_unitname(''),73 {'multiplier': 1, 'unit_class': None, 'primary_unit': '',74 'base_unit': ''})75def run_scenario(user_asked_for, data_exists_as, allow_derivation=True,76 allow_integration=False, allow_prefixes_in_denominator=False,77 round_result=6):78 userunit = unitconv.parse_unitname(user_asked_for, fold_scale_prefix=False)79 prefixclass = unitconv.prefix_class_for(userunit['scale_multiplier'])80 use_unit = userunit['base_unit']81 compatibles = unitconv.determine_compatible_units(82 allow_derivation=allow_derivation,83 allow_integration=allow_integration,84 allow_prefixes_in_denominator=allow_prefixes_in_denominator,85 **userunit)86 try:87 scale, extra_op = compatibles[data_exists_as]88 except KeyError:89 return90 if round_result is not None:91 scale = round(scale, round_result)92 return (data_exists_as, use_unit, scale, extra_op, prefixclass)93class TestDetermineCompatible(unittest.TestCase):94 def test_compatible_to_simple_primary_type(self):95 all_time_units = [pair[0] for pair in unitconv.unit_classes_by_name['time']]96 u = unitconv.determine_compatible_units('s', 'time', allow_integration=False)97 compatunits = u.keys()98 for timeunit in all_time_units:99 self.assertIn(timeunit, compatunits)100 self.assertEqual(u['MM'], (60000000.0, None))101 self.assertEqual(u['h'], (3600.0, None))102 self.assertEqual([extra_op for (_multiplier, extra_op) in u.values()],103 [None] * len(u))104 def test_allow_derivation(self):105 u = unitconv.determine_compatible_units('b', 'datasize', 1, 's', 'time', allow_integration=False)106 self.assertEqual(u['b'], (1.0, 'derive'))107 self.assertEqual(u['B'], (8.0, 'derive'))108 self.assertEqual(u['b/s'], (1.0, None))109 self.assertAlmostEqual(u['B/d'][0], 9.26e-05)110 self.assertIsNone(u['B/d'][1])111 self.assertNotIn('h', u)112 def test_allow_integration(self):113 u = unitconv.determine_compatible_units('Eggnog', None, 0.125, allow_integration=True)114 self.assertEqual(u['Eggnog'], (8.0, None))115 self.assertAlmostEqual(u['Eggnog/h'][0], 0.0022222)116 self.assertEqual(u['Eggnog/h'][1], 'integrate')117 self.assertNotIn('derive', [extra_op for (_multiplier, extra_op) in u.values()])118class TestUnitconv(unittest.TestCase):119 # in the comments explaining results, X(t) represents a data series in120 # graphite with the "data_exists_as" unit, and Y(t) represents the data121 # series we want to graph, in the "user_asked_for" unit. the results of122 # run_scenario should give the necessary steps to convert X(t) to Y(t).123 def test_straightforward_conversion(self):124 self.assertEqual(run_scenario(user_asked_for='B', data_exists_as='b'),125 ('b', 'B', 0.125, None, 'si'))126 # 0.125 * X(t) b = Y(t) B127 def test_esoteric_conversion_with_derive(self):128 self.assertEqual(run_scenario(user_asked_for='MiB/d', data_exists_as='kb'),129 ('kb', 'B/d', 10800000, 'derive', 'binary'))130 # d(X(t) kb)/dt kb/s * 86400 s/d * 1B/8b * 1000 B/kB = Y(t) B/d131 # 86400 * 1000 / 8 = 10800000132 def test_unrecognized_unit_derive(self):133 self.assertEqual(run_scenario(user_asked_for='Cheese/w', data_exists_as='Cheese'),134 ('Cheese', 'Cheese/w', 604800.0, 'derive', 'si'))135 # d(604800.0 * X(t) Cheese)/dt = Y(t) Cheese/w136 def test_integration(self):137 self.assertEqual(run_scenario(user_asked_for='b', data_exists_as='MB/s',138 allow_integration=True),139 ('MB/s', 'b', 8000000.0, 'integrate', 'si'))140 # Integral(8000000.0 * X(t) MB/s, dt) = Y(t) b141 def test_conversion_between_unrecognized_units(self):142 self.assertIsNone(run_scenario(user_asked_for='pony', data_exists_as='coal'))143 # can't convert144 def test_conversion_between_units_of_different_class(self):145 self.assertIsNone(run_scenario(user_asked_for='d', data_exists_as='Mb'))146 # we know what they are but we can't convert days to megabits147 def test_straightforward_conversion_with_compound_units(self):148 self.assertEqual(run_scenario(user_asked_for='kb/s', data_exists_as='TiB/w'),149 ('TiB/w', 'b/s', 14543804.600212, None, 'si'))150 # X(t) TiB/w * (1024**4 B/TiB) * (8 b/B) * (1 w/604800 s) = Y(t) kb/s151 # 1024**4 * 8 / 604800 =~ 14543804.600212152 def test_straightforward_conversion_between_iec_data_rates(self):153 self.assertEqual(run_scenario(user_asked_for='KiB', data_exists_as='TiB/w',154 allow_integration=True),155 ('TiB/w', 'B', 1817975.575026, 'integrate', 'binary'))156 # X(t) TiB/w * (1024**4 B/TiB) * (1 w/604800 s) = Z(t) B/s...
test_combine.py
Source:test_combine.py
...31 # block 6 Includes back-dated activity to last item of block 132 add_block(test_scope, entity_code, bs,'2020-01-05','2020-01-15','2020-02-02')33 # block 7 Includes back-dated activity to exclude block 1 entirely34 add_block(test_scope, entity_code, bs,'2012-12-31','2020-01-15','2020-02-03')35 def run_scenario(entity_scope, entity_code, to_date,asat_date,expected_open_tmv,expected_locked_tmv):36 blocks = bs.find_blocks(entity_scope, entity_code, '2020-01-03',to_date,asat_date)37 df = pd.DataFrame.from_records(38 [(o.date,o.tmv) 39 for o in 40 combine(blocks,locked,'2020-01-03',to_date,asat_date)],41 columns=['date','tmv']42 )43 total = df['tmv'].sum()44 expected = expected_locked_tmv if locked else expected_open_tmv45 if debug:46 print(nicer(df))47 print(f"Expected: {expected}, Actual: {total}")48 assert (total == pytest.approx(expected,0.001))49 # View on 01/10 for 01/0950 run_scenario(test_scope, entity_code, '2020-01-09','2020-01-10',6061.34,6061.34)51 # View on 01/10 for 01/1052 run_scenario(test_scope, entity_code, '2020-01-10','2020-01-10',7082.17,7082.17)53 # View on 01/1154 run_scenario(test_scope, entity_code, '2020-01-11','2020-01-11',7963.82,7963.82)55 # View on 01/15 for 01/1156 run_scenario(test_scope, entity_code, '2020-01-11','2020-01-15',8295.58,7963.82)57 # View on 01/15 for 01/1558 run_scenario(test_scope, entity_code, '2020-01-15','2020-01-15',12158.41,11826.65)59 # View on 02/0160 run_scenario(test_scope, entity_code, '2020-01-15','2020-02-01',2425.39,11826.65)61 # View on 02/0262 run_scenario(test_scope, entity_code, '2020-01-15','2020-02-02',1606.25,11826.65)63 # View on 02/03...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!