How to use on_disconnect method in fMBT

Best Python code snippet using fMBT_python

wsh-proxy.py

Source:wsh-proxy.py Github

copy

Full Screen

...54 debug = "_debug" if settings.DEBUG else ""55 initial_job = (yield f.rpc('wsh_submitLogin', [settings.WALLET, settings.CUSTOM_EMAIL], 'Proxy_'+version.VERSION+debug))56 reactor.callLater(0, ping, f)57 defer.returnValue(f)58def on_disconnect(f):59 '''Callback when proxy get disconnected from the pool'''60 log.info("Disconnected from Stratum pool at %s:%d" % f.main_host)61 f.is_connected = False62 f.on_disconnect.addCallback(on_disconnect)63@defer.inlineCallbacks64def main():65 reactor.disconnectAll()66 log.warning("Wiseplat Stratum proxy version: %s" % version.VERSION)67 # Connect to Stratum pool, main monitoring connection68 log.warning("Trying to connect to Stratum pool at %s:%d" % (settings.POOL_HOST, settings.POOL_PORT))69 f = SocketTransportClientFactory(settings.POOL_HOST, settings.POOL_PORT,70 debug=settings.DEBUG, proxy=None,71 event_handler=client_service.ClientMiningService)72 f1 = None...

Full Screen

Full Screen

test_dynet.py

Source:test_dynet.py Github

copy

Full Screen

1import pytest2from asynctest import CoroutineMock3import asyncio4from unittest.mock import patch, Mock5from dynalite_lib.dynet import Dynet, DynetError6import logging7LOGGER = logging.getLogger(__name__)8def func():9 pass10def async_mock(mock):11 """Return the return value of a mock from async."""12 async def async_func(*args, **kwargs):13 return mock(*args, **kwargs)14 return async_func15 16@pytest.mark.asyncio17async def test_dynet_no_host_loop_port():18 host = "1.2.3.4"19 port = 567820 broadcaster = Mock()21 on_connect = Mock()22 on_disconnect = Mock()23 loop = asyncio.get_event_loop()24 with patch("dynalite_lib.dynet.DynetConnection") as dyn_connection:25 with pytest.raises(DynetError):26 dynet = Dynet(None, port, broadcaster, on_connect, on_disconnect, loop)27 with pytest.raises(DynetError):28 dynet = Dynet(host, None, broadcaster, on_connect, on_disconnect, loop)29 with pytest.raises(DynetError):30 dynet = Dynet(host, port, broadcaster, on_connect, on_disconnect, None)31 32@pytest.mark.asyncio33async def test_dynet_connect():34 host = "1.2.3.4"35 port = 567836 broadcaster = Mock()37 on_connect = Mock()38 on_disconnect = Mock()39 loop = asyncio.get_event_loop()40 with patch("dynalite_lib.dynet.DynetConnection") as dyn_connection:41 dynet = Dynet(host, port, broadcaster, on_connect, on_disconnect, loop)42 assert dynet._conn() is dyn_connection.return_value43 connection = Mock()44 with patch.object(loop, "create_connection", async_mock(connection)):45 await dynet.connect()46 connection.assert_called_once()47 assert connection.mock_calls[0][1][0]() is dyn_connection.return_value48 assert connection.mock_calls[0][2]["host"] == host49 assert connection.mock_calls[0][2]["port"] == port50 51@pytest.mark.asyncio52async def test_dynet_connect_failure():53 host = "1.2.3.4"54 port = 567855 broadcaster = Mock()56 on_connect = Mock()57 on_disconnect = Mock()58 loop = asyncio.get_event_loop()59 with patch("dynalite_lib.dynet.DynetConnection") as dyn_connection:60 dynet = Dynet(host, port, broadcaster, on_connect, on_disconnect, loop)61 assert dynet._conn() is dyn_connection.return_value62 connection = Mock()63 connection.side_effect = ValueError('Boom!')64 with patch.object(loop, "create_connection", async_mock(connection)):65 with patch.object(loop, "call_later") as later_mock:66 await dynet.connect()67 connection.assert_called_once()68 assert connection.mock_calls[0][1][0]() is dyn_connection.return_value69 assert connection.mock_calls[0][2]["host"] == host70 assert connection.mock_calls[0][2]["port"] == port71 LOGGER.error("call_later calls = %s", later_mock.mock_calls)72 assert later_mock.mock_calls[2][1][1] == dynet.connect@pytest.mark.asyncio73async def test_dynet_receive():74 host = "1.2.3.4"75 port = 567876 broadcaster = Mock()77 on_connect = Mock()78 on_disconnect = Mock()79 loop = asyncio.get_event_loop()80 with patch("dynalite_lib.dynet.DynetConnection") as dyn_connection:81 dynet = Dynet(host, port, broadcaster, on_connect, on_disconnect, loop)82 assert dynet._conn() is dyn_connection.return_value83 connection = Mock()84 connection.side_effect = ValueError('Boom!')85 with patch.object(loop, "create_connection", async_mock(connection)):86 with patch.object(loop, "call_later") as later_mock:87 await dynet.connect()88 connection.assert_called_once()89 assert connection.mock_calls[0][1][0]() is dyn_connection.return_value90 assert connection.mock_calls[0][2]["host"] == host91 assert connection.mock_calls[0][2]["port"] == port92 LOGGER.error("call_later calls = %s", later_mock.mock_calls)93 assert later_mock.mock_calls[2][1][1] == dynet.connect94 95 96 ...

Full Screen

Full Screen

MQTT.py

Source:MQTT.py Github

copy

Full Screen

...31 def on_connect(self, _client, _, __, rc):32 print("Connected to MQTT server")33 for topic in self.topics2suscribe:34 self.client.subscribe(topic)35 def on_disconnect(client, userdata,_, rc):36 print("client disconnected ok")37 @staticmethod38 def on_message(_client, user_data, msg):39 message = bool(msg.payload)40 print(message)41 def send_new_msg(self, msg, topic="coordinator"):42 #while not self.client.is_connected():43 #continue44 self.client.publish(topic, msg)45 def close(self):46 self.client.loop_stop()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run fMBT automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful