Best Python code snippet using playwright-python
server.py
Source:server.py
1"""Sans-I/O ventilator backend server protocol."""2from typing import Optional, Union, Tuple3import logging4import attr5from ventserver.protocols import events, exceptions6from ventserver.protocols.backend import backend, connections7from ventserver.protocols.devices import file, frontend, mcu, rotary_encoder8from ventserver.protocols.protobuf import mcu_pb9from ventserver.sansio import protocols10# Events11@attr.s12class GenericConnectionEvent(events.Event):13 """Generic connection status event."""14 connected: bool = attr.ib(default=False)15 def has_data(self) -> bool:16 """Return whether the event has data."""17 return True18class FrontendConnectionEvent(GenericConnectionEvent):19 """Frontend connection status event."""20class MCUConnectionEvent(GenericConnectionEvent):21 """MCU connection status event."""22ConnectionEvent = Union[MCUConnectionEvent, FrontendConnectionEvent]23@attr.s24class ReceiveDataEvent(events.Event):25 """Server receive input event."""26 # This is the system's "wall clock" time, which may jump backwards or27 # forwards when the system time is adjusted. It should only be used for28 # recording timestamps intended to be read by people.29 wall_time: Optional[float] = attr.ib(default=None)30 # This is the system's monotonic time, which always increases at the same31 # rate regardless of adjustments to system time. Anything which needs to32 # compute the duration of elapsed time since some instant should use the33 # monotonic time.34 monotonic_time: Optional[float] = attr.ib(default=None)35 serial_receive: Optional[bytes] = attr.ib(default=None)36 websocket_receive: Optional[bytes] = attr.ib(default=None)37 rotary_encoder_receive: Tuple[int, bool] = attr.ib(default=None)38 file_receive: Optional[file.LowerReceiveEvent] = attr.ib(default=None)39 def has_data(self) -> bool:40 """Return whether the event has data."""41 return (42 self.wall_time is not None43 or self.monotonic_time is not None44 or bool(self.serial_receive)45 or self.websocket_receive is not None46 or self.rotary_encoder_receive is not None47 or self.file_receive is not None48 )49@attr.s50class ReceiveOutputEvent(events.Event):51 """Server receive output/send event."""52 states_send: Optional[backend.SendEvent] = attr.ib(default=None)53 sysclock_setting: Optional[float] = attr.ib(default=None)54 kill_frontend: bool = attr.ib(default=False)55 def has_data(self) -> bool:56 """Return whether the event has data."""57 if (self.states_send is not None) and self.states_send.has_data():58 return True59 if (self.sysclock_setting is not None) or self.kill_frontend:60 return True61 return False62ReceiveEvent = Union[ReceiveDataEvent, ConnectionEvent]63SendEvent = backend.SendEvent64@attr.s65class SendOutputEvent(events.Event):66 """Server send output/send event."""67 serial_send: Optional[bytes] = attr.ib(default=None)68 websocket_send: Optional[bytes] = attr.ib(default=None)69 file_send: Optional[file.StateData] = attr.ib(default=None)70 def has_data(self) -> bool:71 """Return whether the event has data."""72 return (73 bool(self.serial_send) or74 self.websocket_send is not None or75 self.file_send is not None76 )77def make_serial_receive(78 serial_receive: bytes,79 wall_time: float, monotonic_time: Optional[float],80) -> ReceiveDataEvent:81 """Make a ReceiveDataEvent from serial receive data."""82 return ReceiveDataEvent(83 serial_receive=serial_receive,84 wall_time=wall_time, monotonic_time=monotonic_time85 )86def make_websocket_receive(87 ws_receive: bytes,88 wall_time: float, monotonic_time: Optional[float]89) -> ReceiveDataEvent:90 """Make a ReceiveDataEvent from websocket receive data."""91 return ReceiveDataEvent(92 websocket_receive=ws_receive,93 wall_time=wall_time, monotonic_time=monotonic_time94 )95def make_rotary_encoder_receive(96 re_receive: Tuple[int, bool],97 wall_time: float, monotonic_time: Optional[float]98) -> ReceiveDataEvent:99 """Make a ReceiveDataEvent from rotary encoder receive data."""100 return ReceiveDataEvent(101 rotary_encoder_receive=re_receive,102 wall_time=wall_time, monotonic_time=monotonic_time103 )104# Filters105@attr.s106class Receiver(protocols.Filter[ReceiveEvent, ReceiveOutputEvent]):107 """Filter which transforms receive bytes into high-level events.108 Because this filter immediately handles inputs instead of taking inputs109 through an internal buffer and then processing the buffer in the output()110 method, the input method is not thread-safe! It should only be used in111 sequential or async/await code.112 """113 _logger = logging.getLogger('.'.join((__name__, 'Receiver')))114 wall_time: float = attr.ib(default=0)115 monotonic_time: float = attr.ib(default=0)116 _backend: backend.Receiver = attr.ib(factory=backend.Receiver)117 # Devices118 _mcu: mcu.Receiver = attr.ib(factory=mcu.Receiver)119 _frontend: frontend.Receiver = attr.ib(factory=frontend.Receiver)120 _file: file.Receiver = attr.ib(factory=file.Receiver)121 _rotary_encoder: rotary_encoder.Receiver = attr.ib(122 factory=rotary_encoder.Receiver123 )124 _connections: connections.TimeoutHandler = \125 attr.ib(factory=connections.TimeoutHandler)126 _connection_states: mcu_pb.BackendConnections = \127 attr.ib(factory=mcu_pb.BackendConnections)128 def input(self, event: Optional[ReceiveEvent]) -> None:129 """Handle input events."""130 if event is None or not event.has_data():131 return132 if isinstance(event, MCUConnectionEvent):133 update_type = (134 connections.Update.MCU_CONNECTED if event.connected135 else connections.Update.MCU_DISCONNECTED136 )137 self._connections.input(connections.UpdateEvent(138 monotonic_time=self.monotonic_time, type=update_type139 ))140 return141 if isinstance(event, FrontendConnectionEvent):142 update_type = (143 connections.Update.FRONTEND_CONNECTED if event.connected144 else connections.Update.FRONTEND_DISCONNECTED145 )146 self._connections.input(connections.UpdateEvent(147 monotonic_time=self.monotonic_time, type=update_type148 ))149 return150 if event.wall_time is not None:151 self.wall_time = event.wall_time152 if event.monotonic_time is not None:153 self.monotonic_time = event.monotonic_time154 self._connections.input(connections.UpdateEvent(155 monotonic_time=self.monotonic_time156 ))157 self._mcu.input(event.serial_receive)158 self._frontend.input(event.websocket_receive)159 self._rotary_encoder.input(rotary_encoder.ReceiveEvent(160 wall_time=self.wall_time, re_data=event.rotary_encoder_receive161 ))162 self._file.input(event.file_receive)163 def output(self) -> Optional[ReceiveOutputEvent]:164 """Emit the next output event."""165 devices_processed = self._process_devices()166 if not devices_processed:167 self._backend.input(backend.ReceiveDataEvent(168 wall_time=self.wall_time, monotonic_time=self.monotonic_time169 ))170 # Process backend output until the backend has data to output or it171 # indicates that it has no more receive data to process.172 output_event = ReceiveOutputEvent()173 while True:174 backend_send = self._backend.output()175 if backend_send is None or backend_send.has_data():176 break177 if backend_send is not None:178 output_event.states_send = backend_send.states_send179 output_event.kill_frontend = self._process_connection_statuses()180 if backend_send is not None:181 output_event.sysclock_setting = backend_send.sysclock_setting182 if not (devices_processed or output_event.has_data()):183 return None184 return output_event185 def _process_mcu(self) -> bool:186 """Process the next event from the mcu protocol."""187 mcu_output = self._mcu.output()188 if mcu_output is None:189 return False190 self._backend.input(backend.ReceiveDataEvent(191 wall_time=self.wall_time, monotonic_time=self.monotonic_time,192 mcu_receive=mcu_output, frontend_receive=None193 ))194 self._connections.input(connections.UpdateEvent(195 monotonic_time=self.monotonic_time,196 type=connections.Update.MCU_RECEIVED197 ))198 if not self._connection_states.has_mcu:199 self._logger.info('Receiving data from the MCU')200 self._backend.input(backend.ExternalLogEvent(201 wall_time=self.wall_time, monotonic_time=self.monotonic_time,202 active=False,203 code=mcu_pb.LogEventCode.backend_mcu_connection_down204 ))205 self._connection_states.has_mcu = True206 self._backend.input(backend.ReceiveDataEvent(207 wall_time=self.wall_time, monotonic_time=self.monotonic_time,208 server_receive=self._connection_states209 ))210 return True211 def _process_frontend(self) -> bool:212 """Process the next event from the frontend protocol."""213 frontend_output = self._frontend.output()214 if frontend_output is None:215 return False216 self._backend.input(backend.ReceiveDataEvent(217 wall_time=self.wall_time, monotonic_time=self.monotonic_time,218 frontend_receive=frontend_output219 ))220 self._connections.input(connections.UpdateEvent(221 monotonic_time=self.monotonic_time,222 type=connections.Update.FRONTEND_RECEIVED223 ))224 if not self._connection_states.has_frontend:225 self._logger.info('Receiving data from the frontend')226 self._backend.input(backend.ExternalLogEvent(227 wall_time=self.wall_time, monotonic_time=self.monotonic_time,228 active=False,229 code=mcu_pb.LogEventCode.backend_frontend_connection_down230 ))231 self._backend.input(backend.ExternalLogEvent(232 wall_time=self.wall_time, monotonic_time=self.monotonic_time,233 code=mcu_pb.LogEventCode.backend_frontend_connection_up234 ))235 self._connection_states.has_frontend = True236 self._backend.input(backend.ReceiveDataEvent(237 wall_time=self.wall_time, monotonic_time=self.monotonic_time,238 server_receive=self._connection_states239 ))240 return True241 def _process_file(self) -> bool:242 """Process the next event from the file."""243 file_output = self._file.output() # throws ProtocolDataError244 if file_output is None:245 return False246 self._backend.input(backend.ReceiveDataEvent(247 wall_time=self.wall_time, monotonic_time=self.monotonic_time,248 file_receive=file_output249 ))250 return True251 def _process_rotary_encoder(self) -> bool:252 """Process the next event from the rotary encoder."""253 rotary_encoder_output = self._rotary_encoder.output()254 if rotary_encoder_output is None:255 return False256 self._backend.input(backend.ReceiveDataEvent(257 wall_time=self.wall_time, monotonic_time=self.monotonic_time,258 frontend_receive=rotary_encoder_output259 ))260 return True261 def _process_devices(self) -> bool:262 """Process the next event from every device."""263 any_updated = False264 any_updated = self._process_mcu() or any_updated265 any_updated = self._process_frontend() or any_updated266 # TODO: why do we need an exception handler here?267 try:268 any_updated = self._process_file() or any_updated269 except exceptions.ProtocolDataError as err:270 self._logger.error(err)271 any_updated = self._process_rotary_encoder() or any_updated272 return any_updated273 def _process_connection_statuses(self) -> bool:274 """Handle any connection timeouts."""275 actions = self._connections.output()276 if actions.alarm_mcu:277 self._backend.input(backend.ExternalLogEvent(278 wall_time=self.wall_time, monotonic_time=self.monotonic_time,279 active=True,280 code=mcu_pb.LogEventCode.backend_mcu_connection_down281 ))282 self._connection_states.has_mcu = False283 self._backend.input(backend.ReceiveDataEvent(284 wall_time=self.wall_time, monotonic_time=self.monotonic_time,285 server_receive=self._connection_states286 ))287 # The backend should temporarily override any active alarm mute by288 # cancelling it in case local alarm sounds need to be audible in the289 # backend, e.g. for loss of the MCU. It will also propagate this290 # override to the frontend, if the frontend is connected; the291 # frontend will also independently cancel the alarm mute, so when292 # the backend overrides the frontend (or vice versa) the alarm mute293 # is still cancelled. AlarmMute will be overridden again by the MCU294 # when it reconnects, but the MCU will also cancel any active alarm295 # mute in the MCU, so the alarm mute is still cancelled.296 self._backend.input(backend.AlarmMuteCancellationEvent(297 wall_time=self.wall_time, monotonic_time=self.monotonic_time,298 source=mcu_pb.AlarmMuteSource.backend_mcu_loss299 ))300 if actions.alarm_frontend:301 self._backend.input(backend.ExternalLogEvent(302 wall_time=self.wall_time, monotonic_time=self.monotonic_time,303 active=True,304 code=mcu_pb.LogEventCode.backend_frontend_connection_down305 ))306 self._connection_states.has_frontend = False307 self._backend.input(backend.ReceiveDataEvent(308 wall_time=self.wall_time, monotonic_time=self.monotonic_time,309 server_receive=self._connection_states310 ))311 # We don't need to request an alarm mute cancellation from the312 # MCU, because the MCU already cancels the alarm mute when the313 # backend tells it that the frontend has been lost. If the MCU is314 # also lost, that will be handled in the previous `if` statement.315 return actions.kill_frontend316 def input_serial(self, serial_receive: bytes) -> None:317 """Input a ReceiveDataEvent corresponding to serial data.318 This is just a convenience function intended for writing unit tests319 more concisely.320 """321 self.input(make_serial_receive(322 serial_receive, self.wall_time, self.monotonic_time323 ))324 def input_websocket(self, websocket: bytes) -> None:325 """Input a ReceiveDataEvent corresponding to websocket data.326 This is just a convenience function intended for writing unit tests327 more concisely.328 """329 self.input(make_websocket_receive(330 websocket, self.wall_time, self.monotonic_time331 ))332 @property333 def backend(self) -> backend.Receiver:334 """Return the backend receiver."""335 return self._backend336 @property337 def file(self) -> file.Receiver:338 """Return the file receiver"""339 return self._file340@attr.s341class Sender(protocols.Filter[SendEvent, SendOutputEvent]):342 """Filter which transforms high-level events into send bytes."""343 # Devices344 _mcu: mcu.Sender = attr.ib(factory=mcu.Sender)345 _frontend: frontend.Sender = attr.ib(factory=frontend.Sender)346 _file: file.Sender = attr.ib(factory=file.Sender)347 def input(self, event: Optional[SendEvent]) -> None:348 """Handle input events."""349 self._mcu.input(backend.get_mcu_send(event))350 self._frontend.input(backend.get_frontend_send(event))351 self._file.input(backend.get_file_send(event))352 def output(self) -> Optional[SendOutputEvent]:353 """Emit the next output event."""354 send_event = SendOutputEvent(355 serial_send=self._mcu.output(),356 websocket_send=self._frontend.output(),357 file_send=self._file.output()358 )359 if not send_event.has_data():360 return None361 return send_event362 @property363 def file(self) -> file.Sender:364 """Return file sendfilter"""365 return self._file366# Protocols367@attr.s368class Protocol(protocols.Protocol[369 ReceiveEvent, ReceiveOutputEvent, SendEvent, SendOutputEvent370]):371 """Backend communication protocol."""372 _receive: Receiver = attr.ib(factory=Receiver)373 _send: Sender = attr.ib(factory=Sender)374 @property375 def receive(self) -> Receiver:376 """Return a Filter interface for receive events."""377 return self._receive378 @property379 def send(self) -> Sender:380 """Return a Filter interface for send events."""...
mock_time.py
Source:mock_time.py
1#!/usr/bin/env python2#3# Copyright 2015 British Broadcasting Corporation4# 5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8# 9# http://www.apache.org/licenses/LICENSE-2.010# 11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16import unittest17import _useDvbCssUninstalled18import dvbcss.monotonic_time as monotonic_time19import threading20class MockTime(object):21 """\22 Object that provides mocks for :func:`monotonic_time.time` and23 :func:`dvbcss.monotonic_time.sleep`24 25 Use :func:`install` and :func:`uninstall` to plug the mock in.26 27 It works by replacing the time() and sleep() function in the28 :mod:`dvbcss.monotonic_time` module (or alternative module that you specify29 when calling :func:`install`).30 It therefore will mock the monotonic_time module in any module that has31 already imported it, or that imports it in the future, with only one32 call to :func:`install`.33 """34 def __init__(self):35 super(MockTime,self).__init__()36 self._timeNow = 037 self.doctoredTimeModule=None38 self.oldTimeFunc = None39 self.oldSleepFunc = None40 self.sleepEvents = []41 self._incrStep = 042 43 @property44 def timeNow(self):45 """\46 Get/set this property to set the value that the mock time() function will return.47 48 If setting this property mean that the time has passed the point at which49 a call to the mocked :func:`sleep` function is due to wake (unblock)50 and return then this will take place.51 """52 # process any pending wakeups for this time53 # go through list of pending sleeps and wake any up that need to be,54 # deleting them on the way.55 i=056 while i<len(self.sleepEvents):57 wakeUpTime, event = self.sleepEvents[i]58 if self._timeNow >= wakeUpTime:59 event.set()60 del self.sleepEvents[i]61 else:62 i=i+163 64 t = self._timeNow65 66 # do auto increment (if enabled)67 if self._incrStep != 0:68 self._incrCountdown -= 169 if self._incrCountdown == 0:70 self._incrCountdown = self._incrInterval71 self._timeNow += self._incrStep72 73 return t74 75 @timeNow.setter76 def timeNow(self, newValue):77 self._timeNow = newValue78 # trigger reading the time to cause any sleeps that need to be woken 79 # to be processed80 _ = self.timeNow81 82 def enableAutoIncrementBy(self, step, numReadsBetweenIncrements=1):83 self._incrStep = step84 self._incrInterval = numReadsBetweenIncrements85 self._incrCountdown = self._incrInterval86 87 def disableAutoIncrement(self):88 self._incrStep = 089 90 def flush(self):91 """\92 Flushes any calls blocked on the mock :func:`sleep` function by causing93 them to all wake (unblock) and return.94 """95 while len(self.sleepEvents):96 wakeUpTime, event = self.sleepEvents[0]97 event.set()98 del self.sleepEvents[0]99 100 def install(self, module=monotonic_time):101 """\102 Plugs the mock in (or does nothing if already plugged into something.103 104 :param module: The "time" module to have its `time()` and `sleep()` functions replaced. Defaults to the dvbcss.monotonic_time module.105 """106 if self.doctoredTimeModule is None:107 108 self.doctoredTimeModule = module109 self.oldTimeFunc = getattr(module,"time")110 111 setattr(module,"time", self.mockTimeFunc)112 self.oldSleepFunc = getattr(module,"sleep")113 114 setattr(module,"sleep", self.mockSleepFunc)115 else:116 raise Exception("MockTime is already plugged in. Cannot be plugged in in two places.")117 def mockTimeFunc(self):118 return self.timeNow119 120 def mockSleepFunc(self, durationSecs):121 wakeUpTime = self.timeNow + durationSecs122 event = threading.Event()123 event.clear()124 self.sleepEvents.append((wakeUpTime, event))125 event.wait()126 127 def uninstall(self):128 """\129 Unplugs the mock (or does nothing if already unplugged.130 131 Also causes a :func:`flush` to happen.132 """133 if self.doctoredTimeModule is not None:134 135 self.flush()136 137 setattr(self.doctoredTimeModule,"time", self.oldTimeFunc)138 self.oldTimeFunc=None139 140 setattr(self.doctoredTimeModule,"sleep", self.oldSleepFunc)141 self.oldSleepFunc=None142 143 self.doctoredTimeModule = None144 else:145 raise Exception("MockTime is not already plugged in. Cannot be unplugged.")146 147class SleepThread(threading.Thread):148 def __init__(self, sleepArg):149 super(SleepThread,self).__init__()150 self.sleepArg = sleepArg151 self.daemon = True152 def run(self):153 monotonic_time.sleep(self.sleepArg)154class Test_mock_time(unittest.TestCase):155 """\156 Tests for the MockTime class.157 158 Turtles all the way down.159 """160 161 def testInstallUninstall(self):162 """\163 Check if the mock appears to replace (when installing) and164 restore (when uninstalling) the monotonic_module time() and sleep() functions.165 """166 import dvbcss.monotonic_time as monotonic_time167 168 oldTime = monotonic_time.time169 oldSleep = monotonic_time.sleep170 171 mockUnderTest = MockTime()172 self.assertEquals(oldTime, monotonic_time.time)173 self.assertEquals(oldSleep, monotonic_time.sleep)174 mockUnderTest.install(monotonic_time)175 try:176 self.assertNotEquals(oldTime, monotonic_time.time)177 self.assertNotEquals(oldSleep, monotonic_time.sleep)178 finally:179 mockUnderTest.uninstall()180 181 self.assertEquals(oldTime, monotonic_time.time)182 self.assertEquals(oldSleep, monotonic_time.sleep)183 184 185 def testTime(self):186 import dvbcss.monotonic_time as monotonic_time187 188 mockUnderTest = MockTime()189 mockUnderTest.install(monotonic_time)190 try:191 mockUnderTest.timeNow = 1234192 self.assertEquals(monotonic_time.time(), 1234)193 194 mockUnderTest.timeNow = -485.2701195 self.assertEquals(monotonic_time.time(), -485.2701)196 finally:197 mockUnderTest.uninstall()198 def testSleep(self):199 import dvbcss.monotonic_time as monotonic_time200 201 mockUnderTest = MockTime()202 mockUnderTest.install(monotonic_time)203 204 try:205 a = SleepThread(5.0)206 b = SleepThread(2.0)207 c = SleepThread(7.0)208 209 mockUnderTest.timeNow = 1000.0210 a.start() # will happen at t > 1005211 212 self.assertTrue(a.isAlive())213 214 mockUnderTest.timeNow = 1001.0215 b.start() # will happen at t >= 1003216 c.start() # will happen at t >= 1008217 218 self.assertTrue(a.isAlive())219 self.assertTrue(b.isAlive())220 self.assertTrue(c.isAlive())221 mockUnderTest.timeNow = 1002.99222 self.assertTrue(a.isAlive())223 self.assertTrue(b.isAlive())224 self.assertTrue(c.isAlive())225 226 mockUnderTest.timeNow = 1003.1227 b.join(1.0)228 self.assertTrue(a.isAlive())229 self.assertFalse(b.isAlive())230 self.assertTrue(c.isAlive())231 232 mockUnderTest.timeNow = 1004.99233 self.assertTrue(a.isAlive())234 self.assertTrue(c.isAlive())235 mockUnderTest.timeNow = 1005.7236 a.join(1.0)237 self.assertFalse(a.isAlive())238 self.assertTrue(c.isAlive())239 240 mockUnderTest.timeNow = 1007.8241 self.assertTrue(c.isAlive())242 mockUnderTest.timeNow = 1008.0243 c.join(1.0)244 self.assertFalse(c.isAlive())245 finally:246 mockUnderTest.uninstall()247 def testSleepFlush(self):248 """\249 Check that sleeps that have not triggered are unblocked and return when250 the flush method is called.251 """252 import dvbcss.monotonic_time as monotonic_time253 254 mockUnderTest = MockTime()255 mockUnderTest.install(monotonic_time)256 257 try:258 a = SleepThread(5.0)259 b = SleepThread(2.0)260 c = SleepThread(7.0)261 262 mockUnderTest.timeNow = 1000.0263 a.start() # will happen at t > 1005264 265 self.assertTrue(a.isAlive())266 267 mockUnderTest.timeNow = 1001.0268 b.start() # will happen at t >= 1003269 c.start() # will happen at t >= 1008270 271 self.assertTrue(a.isAlive())272 self.assertTrue(b.isAlive())273 self.assertTrue(c.isAlive())274 275 mockUnderTest.flush()276 a.join(1.0) 277 b.join(1.0) 278 c.join(1.0) 279 280 self.assertFalse(a.isAlive()) 281 self.assertFalse(b.isAlive()) 282 self.assertFalse(c.isAlive())283 finally:284 mockUnderTest.uninstall()285 def testSleepUninstallFlush(self):286 """\287 Check that sleeps that have not triggered are unblocked and return when288 the flush method is called.289 """290 import dvbcss.monotonic_time as monotonic_time291 292 mockUnderTest = MockTime()293 mockUnderTest.install(monotonic_time)294 295 try:296 a = SleepThread(5.0)297 b = SleepThread(2.0)298 c = SleepThread(7.0)299 300 mockUnderTest.timeNow = 1000.0301 a.start() # will happen at t > 1005302 303 self.assertTrue(a.isAlive())304 305 mockUnderTest.timeNow = 1001.0306 b.start() # will happen at t >= 1003307 c.start() # will happen at t >= 1008308 309 self.assertTrue(a.isAlive())310 self.assertTrue(b.isAlive())311 self.assertTrue(c.isAlive())312 313 finally:314 mockUnderTest.uninstall()315 a.join(1.0) 316 b.join(1.0) 317 c.join(1.0) 318 319 self.assertFalse(a.isAlive()) 320 self.assertFalse(b.isAlive()) 321 self.assertFalse(c.isAlive())322 323 def testIncrInterval(self):324 """\325 Mock time can be auto incremented326 """327 import dvbcss.monotonic_time as monotonic_time328 329 mockUnderTest = MockTime()330 mockUnderTest.install(monotonic_time)331 332 try:333 mockUnderTest.timeNow = 5334 335 # no auto increment by Default336 for i in range(0,10000):337 self.assertEquals(5, monotonic_time.time())338 339 mockUnderTest.enableAutoIncrementBy(0.1, numReadsBetweenIncrements=5)340 for i in range(0,100):341 for j in range(0,5):342 self.assertAlmostEquals(5 + i*0.1, monotonic_time.time(), delta=0.0001)343 344 finally:345 mockUnderTest.uninstall()346 347if __name__ == "__main__":348 #import sys;sys.argv = ['', 'Test.testName']...
connections.py
Source:connections.py
1"""Sans-I/O backend connection loss handling protocol."""2import enum3from typing import Optional, Tuple4import logging5import attr6from ventserver.protocols import events7from ventserver.protocols.application import connections8from ventserver.sansio import protocols9# Events10@enum.unique11class Update(enum.Enum):12 """Enum for specifying the type of connection update event."""13 MCU_CONNECTED = enum.auto()14 MCU_DISCONNECTED = enum.auto()15 MCU_RECEIVED = enum.auto()16 FRONTEND_CONNECTED = enum.auto()17 FRONTEND_DISCONNECTED = enum.auto()18 FRONTEND_RECEIVED = enum.auto()19 CLOCK = enum.auto()20@attr.s21class UpdateEvent(events.Event):22 """Generic connection status event."""23 monotonic_time: float = attr.ib()24 type: Update = attr.ib(default=Update.CLOCK)25 def has_data(self) -> bool:26 """Return whether the event has data."""27 return True28@attr.s29class ActionsEvent(events.Event):30 """Actions execution event."""31 alarm_mcu: bool = attr.ib(default=False)32 alarm_frontend: bool = attr.ib(default=False)33 kill_frontend: bool = attr.ib(default=False)34 def has_data(self) -> bool:35 """Return whether the event has data."""36 return True37@attr.s38class TimeoutHandler(protocols.Filter[UpdateEvent, ActionsEvent]):39 """Filter which handles connection timeouts for MCU and frontend."""40 _logger = logging.getLogger('.'.join((__name__, 'TimeoutHandler')))41 monotonic_time: float = attr.ib(default=0)42 _mcu_status: connections.TimeoutDetector = attr.ib()43 _mcu_alarm_trigger: connections.ActionTrigger = \44 attr.ib(factory=connections.ActionTrigger)45 _frontend_status: connections.TimeoutDetector = attr.ib()46 _frontend_kill_trigger: connections.ActionTrigger = attr.ib()47 _frontend_alarm_trigger: connections.ActionTrigger = \48 attr.ib(factory=connections.ActionTrigger)49 @_mcu_status.default50 def init_mcu_status(self) -> \51 connections.TimeoutDetector: # pylint: disable=no-self-use52 """Initialize the MCU connection status tracker."""53 return connections.TimeoutDetector(event_timeout=0.5)54 @_frontend_status.default55 def init_frontend_status(self) -> \56 connections.TimeoutDetector: # pylint: disable=no-self-use57 """Initialize the frontend connection status tracker."""58 return connections.TimeoutDetector(event_timeout=2)59 @_frontend_kill_trigger.default60 def init_frontend_kill_trigger(self) -> \61 connections.ActionTrigger: # pylint: disable=no-self-use62 """Initialize the frontend kill trigger."""63 return connections.ActionTrigger(repeat_interval=5)64 def input(self, event: Optional[UpdateEvent]) -> None:65 """Handle input events."""66 if event is None:67 return68 self.monotonic_time = event.monotonic_time69 self._update_clocks()70 if event.type == Update.MCU_CONNECTED:71 self._logger.info('Received connection from the MCU')72 self._handle_mcu_connection(True)73 elif event.type == Update.MCU_DISCONNECTED:74 self._logger.warning('Lost connection from the MCU!')75 self._handle_mcu_connection(False)76 elif event.type == Update.MCU_RECEIVED:77 self._mcu_status.input(connections.UpdateEvent(78 monotonic_time=self.monotonic_time,79 event_received=True80 ))81 elif event.type == Update.FRONTEND_CONNECTED:82 self._logger.info('Received connection from the frontend')83 self._handle_frontend_connection(True)84 elif event.type == Update.FRONTEND_DISCONNECTED:85 self._logger.warning('Lost connection from the frontend!')86 self._handle_frontend_connection(False)87 elif event.type == Update.FRONTEND_RECEIVED:88 self._frontend_status.input(connections.UpdateEvent(89 monotonic_time=self.monotonic_time,90 event_received=True91 ))92 def output(self) -> ActionsEvent:93 """Emit the next output event."""94 output = ActionsEvent()95 output.alarm_mcu = self._process_mcu()96 (output.alarm_frontend, output.kill_frontend) = self._process_frontend()97 return output98 def _update_clocks(self) -> None:99 """Update all clocks."""100 status_clock_update = connections.UpdateEvent(101 monotonic_time=self.monotonic_time102 )103 self._mcu_status.input(status_clock_update)104 self._frontend_status.input(status_clock_update)105 trigger_clock_update = connections.ActionStatus(106 monotonic_time=self.monotonic_time107 )108 self._mcu_alarm_trigger.input(trigger_clock_update)109 self._frontend_alarm_trigger.input(trigger_clock_update)110 self._frontend_kill_trigger.input(trigger_clock_update)111 def _process_mcu(self) -> bool:112 """Decide if the MCU connection is unresponsive and what to do."""113 mcu_status = self._mcu_status.output()114 # Update action trigger115 if mcu_status is not None:116 self._mcu_alarm_trigger.input(connections.ActionStatus(117 monotonic_time=self.monotonic_time, trigger=mcu_status.timed_out118 ))119 # Decide whether to run action120 alarm_mcu = self._mcu_alarm_trigger.output()121 if alarm_mcu:122 self._logger.warning('No longer receiving data from the MCU!')123 self._mcu_alarm_trigger.input(connections.ActionStatus(124 monotonic_time=self.monotonic_time, execute=True125 ))126 return alarm_mcu127 def _process_frontend(self) -> Tuple[bool, bool]:128 """Decide if the frontend connection is unresponsive and what to do."""129 frontend_status = self._frontend_status.output()130 # Update action triggers131 if frontend_status is not None:132 self._frontend_alarm_trigger.input(connections.ActionStatus(133 monotonic_time=self.monotonic_time,134 trigger=frontend_status.timed_out135 ))136 self._frontend_kill_trigger.input(connections.ActionStatus(137 monotonic_time=self.monotonic_time, trigger=(138 frontend_status.timed_out and frontend_status.uptime > 2139 )140 ))141 # Decide whether to run actions142 alarm_frontend = self._frontend_alarm_trigger.output()143 if alarm_frontend:144 self._logger.warning('No longer receiving data from the frontend!')145 self._frontend_alarm_trigger.input(connections.ActionStatus(146 monotonic_time=self.monotonic_time,147 execute=True148 ))149 kill_frontend = self._frontend_kill_trigger.output()150 if kill_frontend:151 self._frontend_kill_trigger.input(connections.ActionStatus(152 monotonic_time=self.monotonic_time,153 execute=True154 ))155 return (alarm_frontend, kill_frontend)156 def _handle_mcu_connection(self, connected: bool) -> None:157 """Handle a connection event for the MCU."""158 self._mcu_status.input(connections.UpdateEvent(159 connected=connected, monotonic_time=self.monotonic_time160 ))161 def _handle_frontend_connection(self, connected: bool) -> None:162 """Handle a connection event for the Frontend."""163 if connected:164 # Stop repeatedly trying to kill the frontend, to give the frontend165 # connection some time to start producing events166 self._frontend_kill_trigger.input(connections.ActionStatus(167 monotonic_time=self.monotonic_time,168 trigger=False169 ))170 self._frontend_status.input(connections.UpdateEvent(171 connected=connected, monotonic_time=self.monotonic_time...
alarm_mute.py
Source:alarm_mute.py
1"""alarm muting request and response"""2import logging3import random4import typing5from typing import Mapping, Optional6import attr7import betterproto8from ventserver.protocols.backend import alarms, log, states9from ventserver.protocols.protobuf import mcu_pb10@attr.s11class Service:12 """Implement Alarm Mute Service"""13 MUTE_MAX_DURATION = 120000 # ms14 _logger = logging.getLogger('.'.join((__name__, 'Service')))15 active: bool = attr.ib(default=False)16 mute_start_time: Optional[float] = attr.ib(default=None)17 seq_num: int = attr.ib(default=random.getrandbits(32))18 source: mcu_pb.AlarmMuteSource = \19 attr.ib(default=mcu_pb.AlarmMuteSource.initialization)20 # TODO: also allow canceling alarm mute upon loss of connection, maybe with21 # an AlarmMute client (rather than a service)22 def transform(23 self, monotonic_time: float,24 store: Mapping[states.StateSegment, Optional[betterproto.Message]],25 events_log: alarms.Manager26 ) -> None:27 """Update the parameters for alarm mute service"""28 alarm_mute_request = typing.cast(29 mcu_pb.AlarmMuteRequest,30 store[states.StateSegment.ALARM_MUTE_REQUEST]31 )32 alarm_mute = typing.cast(33 mcu_pb.AlarmMute, store[states.StateSegment.ALARM_MUTE]34 )35 backend_connections = typing.cast(36 mcu_pb.BackendConnections,37 store[states.StateSegment.BACKEND_CONNECTIONS]38 )39 self.transform_mute(40 monotonic_time, alarm_mute_request, alarm_mute, events_log41 )42 if (43 backend_connections is not None and44 not backend_connections.has_frontend45 ):46 self.transform_mute_internal(47 monotonic_time, False,48 mcu_pb.AlarmMuteSource.backend_frontend_loss,49 alarm_mute, events_log50 )51 def transform_mute(52 self, monotonic_time: float,53 request: mcu_pb.AlarmMuteRequest, response: mcu_pb.AlarmMute,54 events_log: alarms.Manager55 ) -> None:56 """Implement alarm muting."""57 if request.seq_num == self.seq_num + 1:58 self._update_internal_state(59 request.active, monotonic_time, request.source,60 events_log61 )62 self._update_response(monotonic_time, response, events_log)63 def transform_mute_internal(64 self, monotonic_time: float,65 mute: bool, source: mcu_pb.AlarmMuteSource,66 response: mcu_pb.AlarmMute, events_log: alarms.Manager67 ) -> None:68 """Implement alarm muting."""69 self._update_internal_state(70 mute, monotonic_time, source, events_log71 )72 self._update_response(monotonic_time, response, events_log)73 def _update_internal_state(74 self, active: bool, monotonic_time: float,75 source: mcu_pb.AlarmMuteSource, events_log: alarms.Manager76 ) -> None:77 """Update internal state."""78 if self.active == active:79 return80 self.seq_num += 181 self.active = active82 self.source = source83 if active:84 self.mute_start_time = monotonic_time * 100085 if source == mcu_pb.AlarmMuteSource.user_software:86 log_event_code = mcu_pb.LogEventCode.alarms_muted_user_software87 else:88 self._logger.error(89 'Unexpected alarm mute activation source %s', source90 )91 log_event_code = mcu_pb.LogEventCode.alarms_muted_unknown92 else:93 self.mute_start_time = None94 if source == mcu_pb.AlarmMuteSource.initialization:95 log_event_code = \96 mcu_pb.LogEventCode.alarms_unmuted_initialization97 elif source == mcu_pb.AlarmMuteSource.user_software:98 log_event_code = \99 mcu_pb.LogEventCode.alarms_unmuted_user_software100 elif source == mcu_pb.AlarmMuteSource.timeout:101 log_event_code = \102 mcu_pb.LogEventCode.alarms_unmuted_timeout103 elif source == mcu_pb.AlarmMuteSource.backend_frontend_loss:104 log_event_code = \105 mcu_pb.LogEventCode.alarms_unmuted_backend_frontend_loss106 elif source == mcu_pb.AlarmMuteSource.frontend_backend_loss:107 log_event_code = \108 mcu_pb.LogEventCode.alarms_unmuted_frontend_backend_loss109 else:110 self._logger.error(111 'Unexpected alarm mute cancellation source %s', source112 )113 log_event_code = mcu_pb.LogEventCode.alarms_unmuted_unknown114 events_log.input(log.LocalLogInputEvent(new_event=mcu_pb.LogEvent(115 code=log_event_code, type=mcu_pb.LogEventType.system116 )))117 def _update_response(118 self, monotonic_time: float, response: mcu_pb.AlarmMute,119 events_log: alarms.Manager120 ) -> None:121 """Update response based on internal state."""122 response.active = self.active123 response.seq_num = self.seq_num124 response.source = self.source125 self._update_remaining(monotonic_time, response)126 if response.remaining > 0:127 return128 self._update_internal_state(129 False, monotonic_time, mcu_pb.AlarmMuteSource.timeout, events_log130 )131 response.active = self.active132 response.seq_num = self.seq_num133 response.source = self.source134 self._update_remaining(monotonic_time, response)135 def _update_remaining(136 self, monotonic_time: float, response: mcu_pb.AlarmMute137 ) -> None:138 """Update remaining field of response based on internal state."""139 if not self.active:140 response.remaining = self.MUTE_MAX_DURATION141 return142 if self.mute_start_time is None:143 self.mute_start_time = monotonic_time * 1000144 mute_duration = monotonic_time * 1000 - self.mute_start_time145 remaining = self.MUTE_MAX_DURATION - int(mute_duration)...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!