Best Python code snippet using lisa_python
MarketDataService.py
Source:MarketDataService.py
...68 69 Returns:70 Paged result of CurveRange entity.71 """72 return _get_event_loop().run_until_complete(self.readCurveRangeAsync(id, page, pageSize, product, versionFrom, versionTo))73 async def readMarketDataRegistryByIdAsync(self, id: int) -> MarketDataEntityOutput:74 """75 Reads MarketData by id with MarketDataID.76 Args:77 id: ID of the marketdata to be retrieved.78 79 Returns:80 MarketData Entity Output (Async).81 """82 url = "/marketdata/entity/" + str(id) 83 with self.__client as c:84 res = await asyncio.gather(*[self.__executor.exec(c.exec, 'GET', url, None, retcls=MarketDataEntityOutput)])85 return cast(MarketDataEntityOutput,res[0])86 def readMarketDataRegistryById(self, id: int) -> MarketDataEntityOutput : 87 """88 Reads MarketData by curve name with MarketDataID.89 Args:90 id: ID of the marketdata to be retrieved.91 Returns:92 MarketData Entity Output.93 """ 94 return _get_event_loop().run_until_complete(self.readMarketDataRegistryByIdAsync(id))95 async def updateMarketDataAsync(self, id: int, entity: MarketDataEntityInput) -> MarketDataEntityOutput :96 """ 97 Saves the given MarketData Entity98 Args:99 id: int of the marketdata to be updated100 101 Returns:102 MarketData Entity Output (Async).103 """104 url = "/marketdata/entity/" + str(id) 105 with self.__client as c:106 res = await asyncio.gather(*[self.__executor.exec(c.exec, 'PUT', url, entity, MarketDataEntityOutput)])107 return cast(MarketDataEntityOutput,res[0])108 def updateMarketData(self, id: int, entity: MarketDataEntityInput):109 """ 110 Saves the given MarketData Entity111 Args:112 id: int of the marketdata to be updated113 Returns:114 MarketData Entity Output.115 """116 return _get_event_loop().run_until_complete(self.updateMarketDataAsync(id, entity))117 async def deleteMarketDataAsync(self, id: int) -> None :118 """ 119 Delete the specific MarketData entity by id120 Args:121 id: int of the marketdata to be deleted122 Returns:123 MarketData Entity Output (Async).124 """125 url = "/marketdata/entity/" + str(id) 126 with self.__client as c:127 await asyncio.gather(*[self.__executor.exec(c.exec, 'DELETE', url, None)])128 return None129 def deleteMarketData(self, id: int) -> None :130 """ 131 Delete the specific MarketData entity by id132 Args: 133 id: int of the marketdata to be deleted134 Returns:135 MarketData Entity Output.136 """137 return _get_event_loop().run_until_complete(self.deleteMarketDataAsync(id))138 async def readMarketDataRegistryByNameAsync(self, provider: str, curveName: str) -> MarketDataEntityOutput:139 """140 Reads MarketData by provider and curve name.141 Args:142 provider: string of the provider to be retrieved.143 curveName: string of the curve name to be retrieved.144 Returns:145 MarketData Entity Output (Async).146 """147 url = "/marketdata/entity" 148 params = {"provider": provider , "curveName":curveName}149 with self.__client as c:150 res = await asyncio.gather(*[self.__executor.exec(c.exec, 'GET', url, None, retcls=MarketDataEntityOutput, params = params)])151 return cast(MarketDataEntityOutput,res[0])152 def readMarketDataRegistryByName(self, provider: str, curveName: str) -> MarketDataEntityOutput:153 """154 Reads MarketData by provider and curve name.155 Args:156 provider: string of the provider to be retrieved.157 curveName: string of the curve name to be retrieved.158 159 Returns:160 MarketData Entity Output.161 """162 return _get_event_loop().run_until_complete(self.readMarketDataRegistryByNameAsync(provider, curveName))163 async def registerMarketDataAsync(self, entity: MarketDataEntityInput) -> MarketDataEntityOutput:164 """165 Register a new MarketData entity.166 Args:167 entity: The Market Data Entity Input168 169 Returns:170 MarketData Entity Output (Async).171 """172 url = "/marketdata/entity"173 with self.__client as c:174 res = await asyncio.gather(*[self.__executor.exec(c.exec, 'POST', url, entity, MarketDataEntityOutput)])175 return cast(MarketDataEntityOutput,res[0])176 def registerMarketData(self, entity: MarketDataEntityInput) -> MarketDataEntityOutput:177 """178 Register a new MarketData entity.179 Args:180 entity: The Market Data Entity Input181 182 Returns:183 MarketData Entity Output.184 """185 return _get_event_loop().run_until_complete(self.registerMarketDataAsync(entity))186 async def upsertDataAsync(self, data:UpsertData) -> None:187 url = "/marketdata/upsertdata"188 with self.__client as c:189 await asyncio.gather(*[self.__executor.exec(c.exec, 'POST', url, data)])190 return None191 192 def upsertData(self, data:UpsertData) -> None:193 return _get_event_loop().run_until_complete(self.upsertDataAsync(data))194def _get_event_loop():195 """196 Wrapper around asyncio get_event_loop.197 Ensures that there is an event loop available.198 An event loop may not be available if the sdk is not run in the main event loop199 """200 try:201 asyncio.get_event_loop()202 except RuntimeError as ex:203 if "There is no current event loop in thread" in str(ex):204 loop = asyncio.new_event_loop()205 asyncio.set_event_loop(loop)206 ...
test_event_loop.py
Source:test_event_loop.py
...3import pytest4from samsung_multiroom.api import ApiResponse5from samsung_multiroom.event import EventLoop6from samsung_multiroom.event.event import Event7def _get_event_loop():8 api_stream = MagicMock()9 event_loop = EventLoop(api_stream)10 return (event_loop, api_stream)11def _fake_event_factory(response):12 event = MagicMock(spec=Event)13 event.name = 'fake.event'14 return event15def _get_api_response(name):16 event = MagicMock(spec=ApiResponse)17 event.name = name18 return event19class TestEventLoop(): # pytest-asyncio doesn't play well with unittest.TestCase20 @pytest.mark.asyncio21 async def test_loop(self):22 listener = MagicMock()23 event_loop, api_stream = _get_event_loop()24 api_stream.open.return_value = iter([25 _get_api_response('FakeEvent'),26 ])27 event_loop.register_factory(_fake_event_factory)28 event_loop.add_listener('fake.event', listener)29 await event_loop.loop()30 listener.assert_called_once()31 @pytest.mark.asyncio32 async def test_loop_no_events(self):33 listener = MagicMock()34 event_loop, api_stream = _get_event_loop()35 api_stream.open.return_value = iter([36 _get_api_response('FakeEvent'),37 ])38 event_loop.add_listener('*', listener)39 await event_loop.loop()...
async_utils.py
Source:async_utils.py
...4# LICENSE file in the root directory of this source tree.5import asyncio6from contextlib import contextmanager7@contextmanager8def _get_event_loop():9 loop = asyncio.get_event_loop()10 if not loop.is_running():11 yield loop12 else:13 yield asyncio.new_event_loop()14def wait_for(coro):15 with _get_event_loop() as loop:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!