Best Python code snippet using ATX
api_connector.py
Source:api_connector.py
...50 def _on_timeout():51 APIConnector.log_debug("%s: timed out", self.debug_str())52 self.has_errors = True53 self.timeout_callback = None54 stream.stop_on_recv()55 stream.close()56 self._release()57 if on_timeout is not None:58 on_timeout()59 def _on_recv(msg):60 APIConnector.log_debug("%s: message received", self.debug_str())61 if self.timeout_callback is not None:62 loop.remove_timeout(self.timeout_callback)63 self.timeout_callback = None64 stream.stop_on_recv()65 self._release()66 msg = json.loads(msg[0])67 if on_recv is not None:68 on_recv(msg)69 self.log_debug("%s: making call with timeout %r", self.debug_str(), timeout)70 self.timeout_callback = loop.add_timeout(timeout, _on_timeout)71 stream.on_recv(_on_recv)72 self.queue.incr_outstanding(1)73 self.sock.send(send_data)74 @staticmethod75 def send_recv(api_name, cmd, args=None, vargs=None, on_recv=None, on_timeout=None, on_overload=None, timeout=None):76 send_data = APIConnector.make_req(cmd, args=args, vargs=vargs)77 api = APIConnector._get_conn(api_name)78 if api.queue.mean_outstanding >= APIQueue.BUFFER_SZ:79 on_overload()80 return81 APIConnector.log_debug("%s: calling %s", api.debug_str(), cmd)82 api.conn_send_recv(send_data, on_recv, on_timeout, timeout)83 @staticmethod84 def send_terminate_msg(api_name, on_recv=None):85 APIConnector.send_recv(api_name, APIConnector.CMD_TERMINATE, on_recv=on_recv)...
test_zmqstream.py
Source:test_zmqstream.py
...42 assert not timed_out43 def test_callable_check(self):44 """Ensure callable check works (py3k)."""45 self.stream.on_send(lambda *args: None)46 self.stream.on_recv(lambda *args: None)47 self.assertRaises(AssertionError, self.stream.on_recv, 1)48 self.assertRaises(AssertionError, self.stream.on_send, 1)49 self.assertRaises(AssertionError, self.stream.on_recv, zmq)50 def test_on_recv_basic(self):51 sent = [b'basic']52 def callback(msg):53 assert msg == sent54 self.loop.stop()55 self.loop.add_callback(lambda: self.push.send_multipart(sent))56 self.pull.on_recv(callback)57 self.run_until_timeout()58 def test_on_recv_wake(self):59 sent = [b'wake']60 def callback(msg):61 assert msg == sent62 self.loop.stop()63 self.pull.on_recv(callback)64 self.loop.call_later(1, lambda: self.push.send_multipart(sent))...
concurrentbase.py
Source:concurrentbase.py
...17 self.recv_future.result() # here ?18 self.send_future.result()19 def recv_routine(self, on_recv):20 while self.is_active:21 on_recv()22 time.sleep(1)23 def send_routine(self, on_working):24 on_working()25 def cancel(self):26 self.is_active = False27if __name__ == "__main__":28 active = True29 def on_recv():30 print('on_recv')31 def on_working():32 # How to cancel? 33 while active:34 print('send_routine')35 time.sleep(1)36 concurrent = ConcurrentBase(on_recv, on_working)37 concurrent.run()38 # wait for KeyboardInterrupt39 try:40 while True:41 time.sleep(0.01)42 except KeyboardInterrupt:43 concurrent.cancel() # or shutdown?...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!