Best Python code snippet using lemoncheesecake
channel_test.py
Source:channel_test.py
...459 f3 = MethodFrame(42, 2, 3)460 c._pending_events = deque([f1, f2, 'cb', f3])461 expect(conn.send_frame).args(f1)462 expect(conn.send_frame).args(f2)463 c._flush_pending_events()464 assert_equals(deque(['cb', f3]), c._pending_events)465 def test_closed_cb_without_final_frame(self):466 c = Channel(mock(), None, self._CLASS_MAP)467 c._pending_events = 'foo'468 c._frame_buffer = 'foo'469 for val in c._class_map.values():470 expect(val._cleanup)471 expect(c._notify_close_listeners)472 c._closed_cb()473 assert_equals(deque([]), c._pending_events)474 assert_equals(deque([]), c._frame_buffer)475 assert_equals(None, c._connection)476 assert_false(hasattr(c, 'channel'))477 assert_false(hasattr(c, 'exchange'))...
session.py
Source:session.py
...62 def cursor(self, cursor):63 self._local.cursor = cursor64 def _hold_event(self, event):65 self.cursor.pending_events.append(event)66 def _flush_pending_events(self):67 for event in self.cursor.pending_events:68 self.event_manager.fire(event)69 del self.cursor.pending_events[:]70 def _discard_pending_event_if_any(self, event_class):71 if self.cursor.pending_events and isinstance(self.cursor.pending_events[-1], event_class):72 self.cursor.pending_events.pop()73 return True74 else:75 return False76 def _discard_or_fire_event(self, event_class, event):77 discarded = self._discard_pending_event_if_any(event_class)78 if not discarded:79 self.event_manager.fire(event)80 def _mark_location_as_failed(self, location):81 self._failures.add(location)82 def is_successful(self, location=None):83 if location:84 return location not in self._failures85 else:86 return len(self._failures) == 087 def start_step(self, description):88 self._end_step_if_any()89 self.cursor.step = description90 self._hold_event(91 events.StepStartEvent(self.cursor.location, description, _get_thread_id())92 )93 set_step = start_step94 def end_step(self):95 assert self.cursor.step, "There is no started step"96 self._discard_or_fire_event(97 events.StepStartEvent, events.StepEndEvent(self.cursor.location, self.cursor.step, _get_thread_id())98 )99 self.cursor.step = None100 def _end_step_if_any(self):101 if self.cursor.step:102 self.end_step()103 def _log(self, level, content):104 self._flush_pending_events()105 if level == Log.LEVEL_ERROR:106 self._mark_location_as_failed(self.cursor.location)107 self.event_manager.fire(108 events.LogEvent(self.cursor.location, self.cursor.step, _get_thread_id(), level, content)109 )110 def log_debug(self, content):111 return self._log(Log.LEVEL_DEBUG, content)112 def log_info(self, content):113 return self._log(Log.LEVEL_INFO, content)114 def log_warning(self, content):115 return self._log(Log.LEVEL_WARN, content)116 def log_error(self, content):117 return self._log(Log.LEVEL_ERROR, content)118 def log_check(self, description, is_successful, details):119 self._flush_pending_events()120 if is_successful is False:121 self._mark_location_as_failed(self.cursor.location)122 self.event_manager.fire(events.CheckEvent(123 self.cursor.location, self.cursor.step, _get_thread_id(), description, is_successful, details124 ))125 def log_url(self, url, description):126 self._flush_pending_events()127 self.event_manager.fire(128 events.LogUrlEvent(self.cursor.location, self.cursor.step, _get_thread_id(), url, description)129 )130 @contextmanager131 def prepare_attachment(self, filename, description, as_image=False):132 with self._attachment_lock:133 attachment_filename = "%04d_%s" % (self._attachment_count + 1, filename)134 self._attachment_count += 1135 if not os.path.exists(self._attachments_dir):136 os.mkdir(self._attachments_dir)137 yield os.path.join(self._attachments_dir, attachment_filename)138 self._flush_pending_events()139 self.event_manager.fire(events.LogAttachmentEvent(140 self.cursor.location, self.cursor.step, _get_thread_id(),141 "%s/%s" % (_ATTACHMENTS_DIR, attachment_filename), description, as_image142 ))143 def start_test_session(self):144 self.event_manager.fire(events.TestSessionStartEvent(self.report))145 def end_test_session(self):146 self.event_manager.fire(events.TestSessionEndEvent(self.report))147 def start_test_session_setup(self):148 self.cursor = _Cursor(ReportLocation.in_test_session_setup())149 self._hold_event(events.TestSessionSetupStartEvent())150 def end_test_session_setup(self):151 self._end_step_if_any()152 self._discard_or_fire_event(events.TestSessionSetupStartEvent, events.TestSessionSetupEndEvent())...
channel.py
Source:channel.py
...285 # Else just pass it through. Note that this situation could happen286 # on any broker-initiated message.287 if ev == cb:288 self._pending_events.popleft()289 self._flush_pending_events()290 return ev291 elif cb in self._pending_events:292 raise ChannelError(293 "Expected synchronous callback %s, got %s", ev, cb)294 # Return the passed-in callback by default295 return cb296 def _flush_pending_events(self):297 '''298 Send pending frames that are in the event queue.299 '''300 while len(self._pending_events) and \301 isinstance(self._pending_events[0], Frame):302 self._connection.send_frame(self._pending_events.popleft())303 def _closed_cb(self, final_frame=None):304 '''305 "Private" callback from the ChannelClass when a channel is closed. Only306 called after broker initiated close, or we receive a close_ok. Caller307 has the option to send a final frame, to be used to bypass any308 synchronous or otherwise-pending frames so that the channel can be309 cleanly closed.310 '''...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!