Best Python code snippet using fMBT_python
log.py
Source:log.py
1"""Sans-I/O backend event log service protocol."""2import dataclasses3import enum4import typing5from typing import Dict, List, Mapping, Optional6import attr7from ventserver.protocols import events8from ventserver.protocols.application import clocks, lists9from ventserver.protocols.protobuf import mcu_pb10from ventserver.sansio import channels, protocols11# Events12@enum.unique13class EventSource(enum.Enum):14 """Enum for specifying the type of connection update event."""15 MCU = enum.auto()16 BACKEND = enum.auto()17@attr.s18class ReceiveInputEvent(events.Event):19 """Event log receiver input event."""20 source: EventSource = attr.ib()21 wall_time: float = attr.ib()22 next_log_events: Optional[mcu_pb.NextLogEvents] = attr.ib(default=None)23 active_log_events: Optional[mcu_pb.ActiveLogEvents] = attr.ib(default=None)24 def has_data(self) -> bool:25 """Return whether the event has data."""26 return (27 self.next_log_events is not None28 or self.active_log_events is not None29 # wall_time only matters if next_log_events is not None, so it30 # doesn't count for has_data.31 )32@attr.s33class ReceiveOutputEvent(events.Event):34 """Event log receiver input event."""35 expected_log_event: Optional[mcu_pb.ExpectedLogEvent] = attr.ib(36 default=None37 )38 new_elements: List[mcu_pb.LogEvent] = attr.ib(factory=list)39 active_log_events: Optional[mcu_pb.ActiveLogEvents] = attr.ib(default=None)40 def has_data(self) -> bool:41 """Return whether the event has data."""42 return (43 self.expected_log_event is not None or len(self.new_elements) > 044 or self.active_log_events is not None45 )46@attr.s47class SendInputEvent(events.Event):48 """Event log sender input event."""49 expected_log_event: Optional[mcu_pb.ExpectedLogEvent] = \50 attr.ib(default=None)51 new_log_events: List[mcu_pb.LogEvent] = attr.ib(factory=list)52 def has_data(self) -> bool:53 """Return whether the event has data."""54 return (55 self.expected_log_event is not None or len(self.new_log_events) > 056 )57@attr.s58class LocalLogInputEvent(events.Event):59 """Local event log input event.60 next_log_event's id and time fields are ignored.61 """62 wall_time: Optional[float] = attr.ib(default=None) # s63 new_event: Optional[mcu_pb.LogEvent] = attr.ib(default=None)64 active: bool = attr.ib(default=False)65 def has_data(self) -> bool:66 """Return whether the event has data."""67 return self.wall_time is not None or self.new_event is not None68@attr.s69class LocalLogOutputEvent(events.Event):70 """Local event log output event.71 """72 new_events: List[mcu_pb.LogEvent] = attr.ib(factory=list)73 new_active_events: Mapping[mcu_pb.LogEventCode, int] = attr.ib(factory=dict)74 def has_data(self) -> bool:75 """Return whether the event has data."""76 return bool(self.new_events) or bool(self.new_active_events)77# Synchronizers78@attr.s79class EventLogReceiver(protocols.Filter[ReceiveInputEvent, ReceiveOutputEvent]):80 """Receipt of an event log from a local or remote source.81 Receives log events and the list of active log events from the, remaps IDs82 and timestamps, and outputs newly received events and ID-remapped active log83 events.84 If an event has not yet been received but it's on the list of active events,85 it will be excluded from the active events list until the event is received86 in the log.87 """88 _buffer: channels.DequeChannel[ReceiveInputEvent] = attr.ib(89 factory=channels.DequeChannel90 )91 _log_events_receiver: lists.ReceiveSynchronizer[mcu_pb.LogEvent] = attr.ib()92 _clock_synchronizer: clocks.Synchronizer = attr.ib(93 factory=clocks.Synchronizer94 )95 _next_log_events_prev: Optional[mcu_pb.NextLogEvents] = attr.ib(96 default=None97 )98 _events_log_next_id: int = attr.ib(default=0)99 _remote_id_mapping: Dict[int, int] = attr.ib(factory=dict)100 _remote_session_id: int = attr.ib(default=0)101 _active_remote_events: List[int] = attr.ib(factory=list)102 _local_id_mapping: Dict[int, int] = attr.ib(factory=dict)103 _active_local_events: List[int] = attr.ib(factory=list)104 @_log_events_receiver.default105 def init_log_events_list_receiver(self) -> lists.ReceiveSynchronizer[106 mcu_pb.LogEvent107 ]: # pylint: disable=no-self-use108 """Initialize the frontend log events list sender."""109 return lists.ReceiveSynchronizer()110 def input(self, event: Optional[ReceiveInputEvent]) -> None:111 """Handle input events."""112 if event is None or not event.has_data():113 return114 self._buffer.input(event)115 def output(self) -> Optional[ReceiveOutputEvent]:116 """Emit the next output event."""117 event = self._buffer.output()118 if event is None or not event.has_data():119 return None120 if event.source == EventSource.MCU:121 return self._handle_remote_event(event)122 if event.source == EventSource.BACKEND:123 return self._handle_local_event(event)124 return None125 @property126 def _active_log_events(self) -> List[int]:127 """Compute the set union of active remote and local events."""128 return list(set(self._active_local_events + self._active_remote_events))129 def _handle_remote_session_reset(130 self, update_event: lists.UpdateEvent[mcu_pb.LogEvent]131 ) -> None:132 """Handle a reset of the remote peer's log session."""133 self._remote_id_mapping.clear()134 self._clock_synchronizer.input(clocks.ResetEvent())135 self._remote_session_id = update_event.session_id136 def _remap_remote_element(137 self, element: mcu_pb.LogEvent, wall_time: float138 ) -> mcu_pb.LogEvent:139 """Remap a remote log event's ID & time into the local reference."""140 new_element = dataclasses.replace(element)141 # Remap ID142 new_element.id = self._events_log_next_id143 self._events_log_next_id += 1144 # Maintain ID mappings for active log events145 self._remote_id_mapping[element.id] = new_element.id146 # Remap time147 self._clock_synchronizer.input(clocks.UpdateEvent(148 wall_time=wall_time, remote_time=new_element.time149 ))150 new_element.time += self._clock_synchronizer.output()151 return new_element152 def _handle_remote_event(self, event: ReceiveInputEvent) \153 -> ReceiveOutputEvent:154 """Process events received from a remote source."""155 # Check for newly-received events156 if event.next_log_events != self._next_log_events_prev:157 # Don't re-input previously received segments into receiver.158 self._log_events_receiver.input(event.next_log_events)159 self._next_log_events_prev = \160 dataclasses.replace(event.next_log_events)161 update_event = self._log_events_receiver.output()162 output_event = ReceiveOutputEvent()163 if update_event is not None:164 # Reset session-specific state when needed165 if update_event.session_id != self._remote_session_id:166 # The remote peer reset its event log, so invalidate all id167 # mappings for active log events and reset the clock sychronizer168 self._handle_remote_session_reset(update_event)169 # Remap remote IDs and times to local ID numbering & clock170 output_event.new_elements = [171 self._remap_remote_element(element, event.wall_time)172 for element in update_event.new_elements173 ]174 # Generate the next ExpectedLogEvent175 if update_event.next_expected is not None:176 output_event.expected_log_event = mcu_pb.ExpectedLogEvent(177 id=update_event.next_expected,178 session_id=self._remote_session_id179 )180 if event.active_log_events is not None:181 self._active_remote_events = [182 self._remote_id_mapping[id]183 for id in event.active_log_events.id184 if id in self._remote_id_mapping185 # TODO: we may need error handling if id isn't in mapping186 # (e.g. because the backend crashed and restarted while187 # the firmware stayed running. But if active_log_events188 # arrived before the corresponding log event arrived,189 # that shouldn't be treated like an error. So maybe we190 # actually just need to map (session ID, event ID) pairs,191 # and we need to persist that mapping to the filesystem,192 # and we can discard any keys with stale sessions.193 ]194 output_event.active_log_events = mcu_pb.ActiveLogEvents(195 id=self._active_log_events196 )197 return output_event198 def _remap_local_element(self, element: mcu_pb.LogEvent) -> mcu_pb.LogEvent:199 """Remap a local log event's ID into the local reference."""200 new_element = dataclasses.replace(element)201 # Remap ID202 new_element.id = self._events_log_next_id203 self._events_log_next_id += 1204 # Maintain ID mappings for active log events205 self._local_id_mapping[element.id] = new_element.id206 return new_element207 def _handle_local_event(self, event: ReceiveInputEvent) \208 -> Optional[ReceiveOutputEvent]:209 """Process events received from a local source."""210 output_event = ReceiveOutputEvent()211 # Remap ID numbering212 output_event.new_elements = []213 if event.next_log_events is not None:214 output_event.new_elements = [215 self._remap_local_element(element)216 for element in event.next_log_events.elements217 ]218 if event.active_log_events is not None:219 self._active_local_events = [220 self._local_id_mapping[id]221 for id in event.active_log_events.id222 if id in self._local_id_mapping223 # TODO: do we need error handling if id isn't in mapping?224 ]225 output_event.active_log_events = mcu_pb.ActiveLogEvents(226 id=self._active_log_events227 )228 return output_event229@attr.s230class EventLogSender(protocols.Filter[SendInputEvent, mcu_pb.NextLogEvents]):231 """Sending of an event log to a remote peer.232 Receives event log list synchronization cursor from the peer, and sends any233 events starting at that cursor.234 """235 _log_events_sender: lists.SendSynchronizer[mcu_pb.LogEvent] = attr.ib()236 _expected_log_event_prev: Optional[mcu_pb.ExpectedLogEvent] = attr.ib(237 default=None238 )239 @_log_events_sender.default240 def init_log_events_list_sender(self) -> lists.SendSynchronizer[241 mcu_pb.LogEvent242 ]: # pylint: disable=no-self-use243 """Initialize the frontend log events list sender."""244 return lists.SendSynchronizer(segment_type=mcu_pb.NextLogEvents)245 def input(self, event: Optional[SendInputEvent]) -> None:246 """Handle input events."""247 if event is None or not event.has_data():248 return249 if (250 event.expected_log_event != self._expected_log_event_prev or251 len(event.new_log_events) > 0252 ):253 next_expected = None254 if event.expected_log_event is not None:255 next_expected = event.expected_log_event.id256 self._expected_log_event_prev = dataclasses.replace(257 event.expected_log_event258 )259 self._log_events_sender.input(lists.UpdateEvent(260 next_expected=next_expected, new_elements=event.new_log_events261 ))262 def output(self) -> Optional[mcu_pb.NextLogEvents]:263 """Emit the next output event."""264 return typing.cast(265 mcu_pb.NextLogEvents, self._log_events_sender.output()266 )267# Managers268@attr.s269class LocalLogSource(protocols.Filter[270 LocalLogInputEvent, LocalLogOutputEvent271]):272 """Annotation of new local log events with IDs and timestamps.273 The IDs of all events passed through this filter are numbered consecutively.274 Takes input events, each of which contains a LogEvent whose id and timestamp275 fields are to be overridden (in a copy, so that the input is not modified),276 and a flag specifying whether the event is active.277 Outputs a list of annotated LogEvents to input into an EventLogReceiver, and278 a mapping of LogEventCodes to IDs for active log events.279 """280 wall_time: float = attr.ib(default=0) # s281 next_log_event_id: int = attr.ib(default=0)282 _new_events: List[mcu_pb.LogEvent] = attr.ib(factory=list)283 _new_active_events: Dict[mcu_pb.LogEventCode, int] = attr.ib(factory=dict)284 def input(self, event: Optional[LocalLogInputEvent]) -> None:285 """Handle input events."""286 if event is None:287 return288 if event.wall_time is not None:289 self.wall_time = event.wall_time290 if event.new_event is None:291 return292 log_event = dataclasses.replace(event.new_event)293 log_event.id = self.next_log_event_id294 log_event.time = int(self.wall_time * 1000)295 self._new_events.append(log_event)296 if event.active:297 self._new_active_events[log_event.code] = log_event.id298 self.next_log_event_id += 1299 def output(self) -> LocalLogOutputEvent:300 """Emit the next output event."""301 output = LocalLogOutputEvent(302 new_events=self._new_events,303 new_active_events=self._new_active_events304 )305 self._new_events = []306 self._new_active_events = {}...
protocols.py
Source:protocols.py
1"""Sans-I/O protocol interfaces with an h11-like design.2Provides an abstract Filter class which defines a transformation between input3events and output events. Provides an abstract Protocol class which associates4two Filters transforming events in opposite directions (receive and send).5 Typical usage example:6 @attr.s7 class ToUpperFilter(Filter[str, str]):8 last_input: Optional[str] = attr.ib(default=None)9 def input(self, event: Optional[str]) -> None:10 if event is None:11 return12 self.last_input = event13 def output(self) -> Optional[str]:14 if self.last_input is None:15 return None16 last_input = self.last_input.upper()17 self.last_input = None18 return last_input19 @attr.s20 class ToLowerFilter(Filter[str, str]):21 last_input: Optional[str] = attr.ib(default=None)22 def input(self, event: Optional[str]) -> None:23 if event is None:24 return25 self.last_input = event26 def output(self) -> Optional[str]:27 if self.last_input is None:28 return None29 last_input = self.last_input.lower()30 self.last_input = None31 return last_input32 to_upper_filter = ToUpperFilter()33 to_upper_filter.input('foo')34 to_upper_filter.input('bar')35 print(to_upper_filter.output() # 'BAR'36 to_upper_filter.input('foo')37 for output in to_upper_filter.output_all():38 print(output) # 'FOO'39 @attr.s40 class UpperLowerProtocol(Protocol[str, str, str, str]):41 to_upper_filter: ToUpperFilter = attr.ib(factory=ToUpperFilter)42 to_lower_filter: ToLowerFilter = attr.ib(factory=ToLowerFilter)43 @property44 def receive(self) -> ToUpperFilter:45 return self.to_upper_filter46 @property47 def send(self) -> ToLowerFilter:48 return self.to_lower_filter49 upper_lower_protocol = UpperLowerProtocol()50 upper_lower_protocol.send.input('FOO')51 print(upper_lower_protocol.send.output()) # 'foo'52 upper_lower_protocol.receive.input('bar')53 print(upper_lower_protocol.receive.output()) # 'BAR'54"""55import abc56from typing import Generator, Generic, Optional, TypeVar57# Filters58_InputEvent = TypeVar('_InputEvent')59_OutputEvent = TypeVar('_OutputEvent')60class Filter(Generic[_InputEvent, _OutputEvent], abc.ABC):61 """Interface class for filters in a pipe-and-filter architecture.62 This interface defines a filter in which processing and emission of output63 events is done one step at a time, allowing for querying of any public state64 exposed by the Filter between output calls. The Filter's internal state65 (private or public) may be modified by the input and output methods.66 If the filter is expected to be used by calling code with any form of67 asynchronous or concurrent flow control, the filter should not contain any68 state shared with the outside world, such that it only communicates with69 other software components through its input and output methods.70 """71 @abc.abstractmethod72 def input(self, event: Optional[_InputEvent]) -> None:73 """Handle an input event by modifying internal state accordingly.74 Typically this just will put the input event onto an internal buffer to75 be processed sequentially in repeated calls of the output method.76 Args:77 event: An optional event for the filter to process as input. If it78 is None, it should be ignored and internal state should not be79 modified.80 Raises:81 May raise an exception if the input is rejected.82 """83 @abc.abstractmethod84 def output(self) -> Optional[_OutputEvent]:85 """Process internal state to emit the next output event.86 Returns:87 The next output from the filter's processing of its internal state,88 or None if no output is available.89 Raises:90 May raise an exception if the internal state is invalid.91 """92 def output_all(self) -> Generator[_OutputEvent, None, None]:93 """Yield output events while they are available.94 The calling code is allowed to query any public state exposed by the95 Filter between every output yielded by this method. Note that this96 method will never yield None, as that is the sentinel value to indicate97 that no outputs are available.98 """99 while True:100 output = self.output()101 if output is None:102 return103 yield output104# Protocols105ReceiveInputEvent = TypeVar('ReceiveInputEvent')106ReceiveOutputEvent = TypeVar('ReceiveOutputEvent')107SendInputEvent = TypeVar('SendInputEvent')108SendOutputEvent = TypeVar('SendOutputEvent')109class Protocol(Generic[110 ReceiveInputEvent, ReceiveOutputEvent, SendInputEvent, SendOutputEvent111], abc.ABC):112 """Interface class for send/receive protocols.113 This interface associates a pair of Filters with semantic directions of data114 flow - one filter defines the receive direction for transforming data, while115 the other filter defines the send direction for transforming data.116 """117 @property118 @abc.abstractmethod119 def receive(self) -> Filter[ReceiveInputEvent, ReceiveOutputEvent]:120 """Return a Filter interface for the receive direction."""121 @property122 @abc.abstractmethod123 def send(self) -> Filter[SendInputEvent, SendOutputEvent]:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!