Best Python code snippet using tavern
server.py
Source:server.py
1"""2Core module of the Lightstreamer SDK for Python Adapters, containing all3classes (public and private), needed to configure and start the Remote4Adapters.5"""6import socket7import queue8import logging9import time10import os11from multiprocessing import cpu_count12from threading import Thread, Event13from concurrent.futures.thread import ThreadPoolExecutor14from abc import ABCMeta, abstractmethod15import lightstreamer_adapter.protocol as protocol16import lightstreamer_adapter.metadata_protocol as meta_protocol17import lightstreamer_adapter.data_protocol as data_protocol18from lightstreamer_adapter.interfaces.data import (DataProvider,19 SubscribeError,20 DataProviderError)21from lightstreamer_adapter.interfaces.metadata import (Mode,22 MetadataProvider,23 MetadataProviderError)24from lightstreamer_adapter.protocol import RemotingException25from lightstreamer_adapter.subscription import SubscriptionManager, ItemTask26from . import DATA_PROVIDER_LOGGER as DATA_LOGGER27from . import METADATA_PROVIDER_LOGGER as METADATA_LOGGER28__all__ = ['Server', 'DataProviderServer', 'MetadataProviderServer',29 'ExceptionHandler']30def notify(function):31 """Decorator function which add timestamp information to each notification32 sent to the Proxy Adapter.33 """34 def wrap(obj, notification):35 """"Add the timestamp to the supplied notification, in the following36 format:37 <timestamp>|<notification>.38 """39 timestamp = int(round(time.time() * 1000))40 notification = "|".join([str(timestamp), notification])41 function(obj, notification)42 return wrap43def create_socket_and_connect(address, ssl_context=None):44 """Connect to the Proxy Adapter listening on the provided address, and45 return the socket object.46 If an SSLContext is specified, the connection is established by using the47 wrapped socket.48 """49 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)50 if ssl_context is not None:51 hostname = address[0]52 client_socket = ssl_context.wrap_socket(sock, server_hostname=hostname)53 else:54 client_socket = sock55 client_socket.connect(address)56 return client_socket57class _Sender():58 """Helper class which manages the communications from the Remote Adapter to59 the ProxyAdapter, sending data over the "request/replies" or60 "notifications" channels.61 """62 _STOP_WAITING_PILL = "STOP_WAITING_PILL"63 _KEEPALIVE_PILL = "KEEPALIVE_PILL"64 def __init__(self, name, sock, server, keepalive, log):65 self._sock = sock66 self._server = server67 self._name = name68 self._log = log69 self._keepalive = keepalive70 self._keep_alive_log = logging.getLogger(log.name + ".keep_alives")71 self._send_queue = None72 self._send_thread = None73 def start(self):74 """Starts the management of communications from the Remote Adapter to75 the Proxy Adapter.76 """77 # Creates a queue to append incoming replies/notifications to be sent78 # to the ProxyAdapter.79 self._send_queue = queue.Queue()80 # Starts new thread for dequeuing replies/notifications and then81 # sending to the ProxyAdapter.82 self._send_thread = Thread(target=self._do_run, name="Sender-Thread-{}"83 .format(self._name))84 self._send_thread.start()85 def send(self, notification):86 """Enqueues a reply or notification to be sent to the Proxy Adapter."""87 self._log.debug("%s Enqueing line: %s", self._name, notification)88 self._send_queue.put(notification)89 def _do_run(self):90 """Target method for the Sender-Thread-XXX, started in the start'91 method."""92 self._log.info("%s starting", self._name)93 while True:94 try:95 to_send = None96 current_log = self._log97 current_log.debug("%s Waiting for a line to send...",98 self._name)99 if self._keepalive > 0:100 try:101 to_send = self._send_queue.get(timeout=self._keepalive)102 except queue.Empty:103 # Keepalive Timeout triggered.104 to_send = protocol.Method.KEEPALIVE.name105 current_log = self._keep_alive_log106 else:107 to_send = self._send_queue.get()108 if to_send == _Sender._STOP_WAITING_PILL:109 # Request of stopping dequeuing, thread termination.110 break111 if to_send is None or to_send == _Sender._KEEPALIVE_PILL:112 to_send = protocol.Method.KEEPALIVE.name113 current_log = self._keep_alive_log114 # Send to_send over the network.115 current_log.debug("%s Sending line: %s", self._name, to_send)116 self._sock.sendall(bytes(to_send + '\r\n', 'utf-8'))117 except OSError as err:118 self._server.on_ioexception(err)119 break120 self._log.info("'%s' stopped", self._name)121 def change_keep_alive(self, keepalive, also_interrupt=False):122 self._keepalive = keepalive123 self._log.debug("%s Changing keepalive to: %d", self._name,124 self._keepalive)125 if also_interrupt:126 # Interrupts the current wait as though a keepalive were needed;127 # in most cases, this keepalive will be redundant128 self._send_queue.put(_Sender._KEEPALIVE_PILL)129 def quit(self):130 """Terminates the communications with the Proxy Adapter."""131 # Enqueues a None item to notify the Sender-Thread-XXX of stopping132 # dequeuing of incoming replies/notifications.133 self._send_queue.put(_Sender._STOP_WAITING_PILL)134 self._send_thread.join()135class _RequestReceiver():136 """Helper class which manages the bi-directional communications with the137 Proxy Adpater counterpart over the "request/replies" channel.138 """139 def __init__(self, sock, keepalive, server):140 self._log = logging.getLogger("lightstreamer-adapter.requestreply"141 ".requests")142 self._sock = sock143 self._server = server144 reply_sender_log = logging.getLogger("lightstreamer-adapter."145 "requestreply.replies."146 "ReplySender")147 self._reply_sender = _Sender(sock=sock, name=self._server.name,148 server=self._server,149 keepalive=keepalive, log=reply_sender_log)150 self._stop_request = Event()151 def start(self):152 """Starts the management of bidirectional communications with the Proxy153 Adapter: requests coming form the Proxy Adapter, and the responses154 coming from the Remote Adapters.155 """156 # Starts new thread for reading data from the socket.157 thread = Thread(target=self._do_run,158 name="RequestReceiver-Thread-{}"159 .format(self._server.name),160 args=(self._sock,))161 thread.start()162 # Starts the reply sender.163 self._reply_sender.start()164 def _do_run(self, sock):165 """Target method for the RequestReceiver-Thread-XXX, started in the166 'start' method.167 """168 self._log.info("Request receiver '%s' starting...", self._server.name)169 buffer = ''170 while not self._stop_request.is_set():171 try:172 self._log.debug("%s Reading from socket...",173 self._server.name)174 data = sock.recv(1024)175 self._log.debug("%s Received %d bytes of request data [%s]",176 self._server.name, len(data), data)177 if not data:178 raise EOFError('Socket connection broken')179 buffer += data.decode('ascii')180 self._log.debug("%s Current buffer [%s]", self._server.name,181 buffer)182 tokens = buffer.splitlines(keepends=True)183 buffer = ''184 for token in tokens:185 if token.endswith('\n'):186 self._log.debug("%s Request line: %s",187 self._server.name, token)188 self._server.on_received_request(token)189 buffer = ''190 else:191 self._log.debug("'%s' Buffering remaining token %s",192 self._server.name, token)193 buffer = token194 except (OSError, EOFError) as err:195 if self._stop_request.is_set():196 self._log.debug("'%s' Error raised because of explicitly "197 "closed socket, no issue",198 self._server.name)199 break200 # An exception has been raised, due to some issue in the201 # network communication.202 self._server.on_ioexception(err)203 break204 self._log.info("Request receiver '%s' stopped", self._server.name)205 def send_reply(self, request_id, response):206 """Sends a reply to the Proxy Adapter.207 """208 reply = '|'.join((request_id, response))209 self._reply_sender.send(reply)210 def change_keep_alive(self, keepalive):211 self._reply_sender.change_keep_alive(keepalive)212 def quit(self):213 """Terminates the communications with the Proxy Adapter.214 """215 # Issues a request to terminate the 'RequestReceiver-Thread-XXX'216 # thread.217 self._stop_request.set()218 # Issues a request to terminate the associated reply sender.219 self._reply_sender.quit()220class Server(metaclass=ABCMeta):221 """An abstract class meant to be extended, which represents a generic222 Remote Server object, which can run a Remote Data or Metadata Adapter and223 connect it to the Proxy Adapter running on Lightstreamer Server.224 The object should be provided with a suitable Adapter instance and with225 suitable initialization parameters, then activated through its own226 :meth:`start` and finally disposed through its own :meth:`close`. Further227 reuse of the same instance is not supported.228 The Remote Server will take care of sending keepalive packets on the229 connections when needed. The interval can be configured through the230 provider ``keep_alive`` parameter, where a value of 0 or negative means no231 keepalives. By default, it is set to 10 sec.232 However, if a stricter interval is requested by the Proxy Adapter on233 startup, it will be obeyed (with a safety minimum of 1 second). This234 should ensure that the Proxy Adapter activity checks will always succeed,235 but for some old versions of the Proxy Adapter.236 """237 _DEFAULT_POOL_SIZE = 4238 _STRICT_KEEPALIVE = 1000239 _DEFAULT_KEEPALIVE = 10000240 _MIN_KEEPALIVE = 1000241 # Number of current instances of Server' subclasses.242 _number = 0243 def __init__(self, address, name, keep_alive, thread_pool_size,244 ssl_context):245 Server._number += 1246 # Logger actually overridden by subclasses.247 self._log = logging.getLogger("lightstreamer-adapter.server")248 self._exception_handler = None249 self._remote_user = None250 self._remote_password = None251 self._close_expected = True252 self._config = {}253 self._config['address'] = address254 self._config['name'] = "#{}".format(Server._number) if (name is255 None) else name256 self._configured_keep_alive = keep_alive * 1000 if (keep_alive is not257 None) else None258 self._config['keep_alive'] = max(0, keep_alive) if (keep_alive is not259 None) else Server._DEFAULT_KEEPALIVE / 1000260 pool = max(0, thread_pool_size) if thread_pool_size is not None else 0261 if pool == 0:262 try:263 self._config['thread_pool_size'] = cpu_count()264 except NotImplementedError:265 self._config['thread_pool_size'] = Server._DEFAULT_POOL_SIZE266 else:267 self._config['thread_pool_size'] = pool268 self._executor = ThreadPoolExecutor(self._config['thread_pool_size'])269 self._server_sock = None270 self._request_receiver = None271 self._ssl_context = ssl_context272 @property273 def name(self):274 """The name, used for logging purposes, associated to the Server275 instance.276 :type: str277 """278 return self._config['name']279 @property280 def keep_alive(self):281 """The keepalive interval expressed in seconds (or fractions)282 :type: float283 """284 return self._config['keep_alive']285 @property286 def remote_user(self):287 """The username credential to be sent to the Proxy Adapter upon288 connection. The credentials are needed only if the Proxy Adapter is289 configured to require Remote Adapter authentication.290 :type: str291 """292 return self._remote_user293 @remote_user.setter294 def remote_user(self, value):295 self._remote_user = value296 @property297 def remote_password(self):298 """The password credential to be sent to the Proxy Adapter upon299 connection. The credentials are needed only if the Proxy Adapter is300 configured to require Remote Adapter authentication.301 :type: str302 """303 return self._remote_password304 @remote_password.setter305 def remote_password(self, value):306 self._remote_password = value307 @property308 def thread_pool_size(self):309 """The thread pool size310 :type: int311 """312 return self._config['thread_pool_size']313 def set_exception_handler(self, handler):314 """Sets the handler for error conditions occurring on the Remote315 Server. By setting the handler, it's possible to override the default316 exception handling.317 :param lightstreamer_adapter.server.ExceptionHandler handler: the318 handler for error conditions occurring on the Remote Server.319 """320 self._exception_handler = handler321 def _on_init(self, subprotocol, params, config_file, data, adapter,322 invoke_listener=False):323 max_version = '1.8.3'324 parsed_data = subprotocol.read_init(data)325 parsed_data.setdefault(protocol.ARI_VERSION, None)326 parsed_data.setdefault(protocol.KEEPALIVE_HINTS, None)327 proxy_version = parsed_data[protocol.ARI_VERSION]328 keep_alive_hint = parsed_data[protocol.KEEPALIVE_HINTS]329 del parsed_data[protocol.ARI_VERSION]330 del parsed_data[protocol.KEEPALIVE_HINTS]331 try:332 if proxy_version is None:333 proxy_version = '1.8.0'334 self._log.info("Received no Proxy Adapter protocol version "335 "information; assuming 1.8.0: accepted.")336 elif proxy_version == '1.8.0':337 raise Exception("Unsupported protocol version number: {}"338 .format(proxy_version))339 elif proxy_version == '1.8.1':340 raise Exception("Unsupported reserved protocol version "341 "number: {}".format(proxy_version))342 elif proxy_version == '1.8.2':343 self._log.info("Received Proxy Adapter protocol version as %s "344 "for %s: accepted downgrade.", proxy_version,345 self.name)346 elif proxy_version == max_version:347 self._log.info("Received Proxy Adapter protocol version as %s "348 "for %s: versions match", proxy_version,349 self.name)350 else:351 proxy_version = max_version352 self._log.info("Received Proxy Adapter protocol version as %s "353 "for %s: requesting %s", proxy_version,354 self.name, max_version)355 if params is not None:356 init_params = params.copy()357 parsed_data.update(init_params)358 adapter.initialize(parsed_data, config_file)359 if invoke_listener is True:360 adapter.set_listener(self)361 except Exception as err:362 res = subprotocol.write_init(exception=err)363 else:364 proxy_parameters = None365 if proxy_version in ('1.8.0', '1.8.2'):366 self._close_expected = False367 if proxy_version != '1.8.0':368 proxy_parameters = {}369 proxy_parameters[protocol.ARI_VERSION] = proxy_version370 res = subprotocol.write_init(proxy_parameters)371 self._use_keep_alive_hint(keep_alive_hint)372 return res373 def _use_keep_alive_hint(self, keepalive_hint=None):374 if keepalive_hint is None:375 # No information provided, we stick to a stricter default376 if self._configured_keep_alive is None:377 self._log.info("Keepalive time for %s finally set to %d "378 "milliseconds to support old Proxy Adapter",379 self.name, self._STRICT_KEEPALIVE)380 self._change_keep_alive(Server._STRICT_KEEPALIVE)381 # else:382 # For backward compatibility we keep the setting; it is383 # possible that the setting is too long and the Proxy Adapter384 # activity check is triggered385 else:386 keepalive_time = float(keepalive_hint)387 if keepalive_time <= 0:388 pass389 # No restrictions, so our default is still meaningful390 elif self._configured_keep_alive is None:391 if keepalive_time < Server._DEFAULT_KEEPALIVE:392 if keepalive_time >= Server._MIN_KEEPALIVE:393 self._log.info("Keepalive time for %s finally set to "394 "%d milliseconds as per Proxy Adapter "395 "suggestion", self.name, keepalive_time)396 self._change_keep_alive(keepalive_time)397 else:398 self._log.warning("Keepalive time for %s finally set "399 "to %d milliseconds, despite a Proxy"400 " Adapter suggestion of %d "401 "milliseconds", self.name,402 Server._MIN_KEEPALIVE,403 keepalive_time)404 self._change_keep_alive(Server._MIN_KEEPALIVE)405 else:406 # The default setting is stricter, so it's ok407 self._log.info("Keepalive time for %s finally confirmed to"408 " %d milliseconds consistently with Proxy "409 "Adapter suggestion", self.name,410 Server._DEFAULT_KEEPALIVE)411 elif self._configured_keep_alive > 0:412 if keepalive_time < self._configured_keep_alive:413 if keepalive_time >= Server._MIN_KEEPALIVE:414 self._log.warning("Keepalive time for %s changed to %d"415 " milliseconds as per Proxy Adapter "416 "suggestion", self.name,417 keepalive_time)418 self._change_keep_alive(keepalive_time)419 else:420 self._log.warning("Keepalive time for %s changed to %d"421 " milliseconds, despite a Proxy "422 "Adapter suggestion of %d "423 "milliseconds", self.name,424 Server._MIN_KEEPALIVE,425 keepalive_time)426 self._change_keep_alive(Server._MIN_KEEPALIVE)427 else:428 # Our setting is stricter, so it's ok429 pass430 else:431 if keepalive_time >= Server._MIN_KEEPALIVE:432 self._log.warning("Keepalives for %s forced with time %d "433 "milliseconds as per Proxy Adapter "434 "suggestion", self.name, keepalive_time)435 self._change_keep_alive(keepalive_time)436 else:437 self._log.warning("Keepalives for %s forced with time %d "438 "milliseconds, despite a Proxy Adapter "439 "suggestion of %d milliseconds",440 self.name, Server._MIN_KEEPALIVE,441 keepalive_time)442 def _change_keep_alive(self, keep_alive_milliseconds):443 keep_alive_seconds = keep_alive_milliseconds / 1000444 self._config['keep_alive'] = keep_alive_seconds445 self._request_receiver.change_keep_alive(keep_alive_seconds)446 @abstractmethod447 def start(self):448 """Starts the Remote Adapter. A connection to the Proxy Adapter is449 performed (as soon as one is available). Then, requests issued by450 the Proxy Adapter are received and forwarded to the Remote Adapter.451 """452 if self.keep_alive > 0:453 self._log.info("Keepalive time for %s set to %d seconds",454 self.name, self.keep_alive)455 else:456 self._log.info("Keepalive for %s disabled", self.name)457 self._server_sock = create_socket_and_connect(self._config['address'],458 self._ssl_context)459 # Creates and starts the Request Receiver.460 self._request_receiver = _RequestReceiver(sock=self._server_sock,461 keepalive=self.keep_alive,462 server=self)463 self._request_receiver.start()464 # Invokes hook to notify subclass that the Request Receiver has been465 # started.466 self._on_request_receiver_started()467 self._send_remote_credentials()468 def close(self):469 """Stops the management of the Remote Adapter and destroys the threads470 used by this Server. This instance can no longer be used.471 Note that this does not stop the supplied Remote Adapter, as no close472 method is available in the Remote Adapter interface. If the process is473 not terminating, then the Remote Adapter cleanup should be performed by474 accessing the supplied Adapter instance directly and calling custom475 methods.476 """477 self._request_receiver.quit()478 self._executor.shutdown()479 self._server_sock.close()480 def on_received_request(self, request):481 """Invoked when the RequestReceiver gets a new request coming from the482 Proxy Adapter.483 This method takes the responsibility to proceed with a first484 coarse-grained parsing, to identify the three main components of the485 packet structure, as follows:486 <ID>|<method>|<data>487 | | |488 | | The arguments to be passed to the method489 | |490 | The method to invoke on the Remote Adapter491 |492 The Request Id493 Once parsed, the request is then dispatched to the subclass for later494 management.495 """496 try:497 parsed_request = protocol.parse_request(request)498 if parsed_request is None:499 self._log.warning("Discarding malformed request: %s", request)500 return501 request_id = parsed_request["id"]502 method_name = parsed_request["method"]503 data = parsed_request["data"]504 self._handle_received_request(request_id, data, method_name)505 except RemotingException as err:506 self.on_exception(err)507 def _handle_received_request(self, request_id, data, method_name):508 close_request = method_name == str(protocol.Method.CLOSE)509 if close_request and self._close_expected:510 if request_id != '0':511 raise RemotingException("Unexpected id found while parsing a {}"512 "request".format(method_name))513 close_data = protocol.read_close(data)514 close_data.setdefault(protocol.KEY_CLOSE_REASON, None)515 closed_reason = close_data[protocol.KEY_CLOSE_REASON]516 if closed_reason is not None:517 self._log.info("Close requested by the counterpart with "518 "reason: %s", closed_reason)519 self.close()520 return521 self._handle_request(request_id, data, method_name)522 def _send_reply(self, request_id, response):523 self._log.debug("Sending reply for request: %s", request_id)524 self._request_receiver.send_reply(request_id, response)525 def on_ioexception(self, ioexception):526 """Called by the Remote Server upon a read or write operation failure.527 See documentation from the ExceptionHandler.handle_io exception method528 for further details.529 """530 if self._exception_handler is not None:531 self._log.info("Caught exception: %s, notifying the "532 "application...", str(ioexception))533 # Enable default handling in case the exception handler534 # returns False.535 if not self._exception_handler.handle_ioexception(ioexception):536 return537 self._handle_ioexception(ioexception)538 def on_exception(self, exception):539 """Called by the Remote Server upon an unexpected error.540 See documentation from the ExceptionHandler.handle_exception method for541 further details.542 """543 if self._exception_handler is not None:544 self._log.info("Caught exception: %s, notifying the "545 "application...", str(exception))546 # Enable default handling in case the exception handler547 # returns False.548 if not self._exception_handler.handle_exception(exception):549 return550 self._handle_exception(exception)551 @abstractmethod552 def _handle_ioexception(self, ioexception):553 os._exit(1)554 return False555 def _handle_exception(self, exception):556 pass557 @abstractmethod558 def _on_request_receiver_started(self):559 """Hook method to notify the subclass that the Request Receiver has560 been started.561 This method is intended to be overridden by subclasses.562 """563 @abstractmethod564 def _handle_request(self, request_id, data, method_name):565 """Intended to be overridden by subclasses, invoked for handling the566 received request, already splitted into the supplied parameters.567 """568 @abstractmethod569 def _send_remote_credentials(self):570 """Intended to be overridden by subclasses, invoked for sending the571 remote credentials to the Proxy Adapter.572 """573class MetadataProviderServer(Server):574 """A Remote Server object which can run a Remote Metadata Adapter and575 connect it to the Proxy Adapter running on Lightstreamer Server.576 The object should be provided with a MetadataProvider instance and with577 suitable initialization parameters and established connections,578 then activated through :meth:`MetadataProviderServer.start` and finally579 disposed through :meth:`Server.close`.580 Further reuse of the same instance is not supported.581 By default, the invocations to the Metadata Adapter methods will be done in582 a limited thread pool with a size determined by the number of detected cpu583 cores. The size can be specified through the provided ``thread_pool_size``584 parameter. A size of 1 enforces strictly sequential invocations and can be585 used if parallelization of the calls is not supported by the Metadata586 Adapter. A value of 0, negative or ``None`` also implies the default587 behaviour as stated above.588 Note that requests with an implicit ordering, like589 :meth:`lightstreamer_adapter.interfaces.metadata.MetadataProvider.notify_new_session`590 and591 :meth:`lightstreamer_adapter.interfaces.metadata.MetadataProvider.notify_session_close`592 for the same session, are always guaranteed to be and sequentialized in the593 right way, although they may not occur in the same thread.594 """595 def __init__(self, adapter, address, name=None, keep_alive=None,596 thread_pool_size=0, ssl_context=None):597 """Creates a server with the supplied configuration parameters. The598 :meth:`lightstreamer_adapter.interfaces.metadata.MetadataProvider.initialize`599 method will be invoked only upon a Proxy Adapter request.600 :param lightstreamer_adapter.interfaces.metadata.MetadataProvider \601 adapter: the Remote Metadata Adapter instance to be run.602 :param tuple address: the address of the Proxy Adapter supplied as a603 2-tuple ``(host, request_reply_port)`` where:604 * host: a string representing the hostname or the IP address605 * request_reply_port: an int representing the request/reply port606 :param str name: the name associated to the Server instance.607 :param float keep_alive: the keepalive interval expressed in seconds608 (or fractions)609 :param int thread_pool_size: the thread pool size610 :param SSLContext ssl_context: the SSL context to be used in the case611 of encrypted communications with the Proxy Adapter612 :raises TypeError: if the supplied Remote Adapter is not an instance of613 a subclass of614 :class:`lightstreamer_adapter.interfaces.metadata.MetadataProvider`.615 """616 super(MetadataProviderServer, self).__init__(address, name, keep_alive,617 thread_pool_size,618 ssl_context)619 if not isinstance(adapter, MetadataProvider):620 raise TypeError("The provided adapter is not a subclass of "621 "lightstreamer_adapter.interfaces."622 "MetadataProvider")623 self._config_file = None624 self._params = None625 self._adapter = adapter626 self.init_expected = True627 def _send_remote_credentials(self):628 """Invoked for sending the remote credentials to the Proxy Metadata629 Adapter.630 """631 unsolicited_message = protocol.write_credentials(self.remote_user,632 self.remote_password)633 self._send_reply("1", unsolicited_message)634 def _on_request_receiver_started(self):635 """Invoked to notify this subclass the the Request Receiver has been636 started.637 This class has a void implementation.638 """639 def _handle_request(self, request_id, data, method_name):640 init_request = method_name == str(meta_protocol.Method.MPI)641 if init_request and not self.init_expected:642 raise RemotingException("Unexpected late {} request"643 .format(str(meta_protocol.Method.MPI)))644 if not init_request and self.init_expected:645 raise RemotingException("Unexpected request {} while waiting for "646 "{} request"647 .format(method_name,648 meta_protocol.Method.MPI))649 if init_request:650 self.init_expected = False651 res = self._on_mpi(data)652 self._send_reply(request_id, res)653 return654 # Builds the name of the method_name do be invoked, starting from the655 # protocol method_name name, and retrieves such method_name.656 on_method_name = "_on_" + method_name.lower()657 try:658 on_method = getattr(self, on_method_name)659 except AttributeError:660 METADATA_LOGGER.warning("Discarding unknown request: %s",661 method_name)662 return663 # Invokes the retrieved method, which in turn returns an asynchronous664 # function to be executed through the executor.665 async_func = on_method(data)666 # Define a task function to wrap the execution of the returned667 # async_func and the sending of the gotten reply.668 def execute_and_reply():669 try:670 # async_func may raise again a RemotingExcption, which need671 # to be caught and forwarded to the ExceptionHandler.672 reply = async_func()673 self._send_reply(request_id, reply)674 except RemotingException as err:675 self.on_exception(err)676 # Submits the task to the executor for asynchronous execution.677 self._executor.submit(execute_and_reply)678 def _on_mpi(self, data):679 return self._on_init(meta_protocol, self._params, self._config_file,680 data, self._adapter)681 def _on_nus(self, data):682 parsed_data = meta_protocol.read_notify_user(data)683 user = parsed_data["user"]684 password = parsed_data["password"]685 http_headers = parsed_data["httpHeaders"]686 def execute():687 try:688 self._adapter.notify_user(user, password, http_headers)689 max_bandwidth = self._adapter.get_allowed_max_bandwidth(user)690 wants_tb_ntf = self._adapter.wants_tables_notification(user)691 except Exception as err:692 res = meta_protocol.write_notiy_user(meta_protocol.Method.NUS,693 exception=err)694 else:695 res = meta_protocol.write_notiy_user(meta_protocol.Method.NUS,696 max_bandwidth,697 wants_tb_ntf)698 return res699 return execute700 def _on_nua(self, data):701 parsed_data = meta_protocol.read_notify_user_auth(data)702 user = parsed_data["user"]703 http_headers = parsed_data["httpHeaders"]704 password = parsed_data["password"]705 client_principal = parsed_data["clientPrincipal"]706 def execute():707 try:708 self._adapter.notify_user_with_principal(user, password,709 http_headers,710 client_principal)711 max_bandwidth = self._adapter.get_allowed_max_bandwidth(user)712 wants_tb_notify = self._adapter.wants_tables_notification(user)713 except Exception as err:714 res = meta_protocol.write_notiy_user(meta_protocol.Method.NUA,715 exception=err)716 else:717 res = meta_protocol.write_notiy_user(meta_protocol.Method.NUA,718 max_bandwidth,719 wants_tb_notify)720 return res721 return execute722 def _on_nns(self, data):723 parsed_data = meta_protocol.read_notify_new_session(data)724 session_id = parsed_data["session_id"]725 user = parsed_data["user"]726 client_context = parsed_data["clientContext"]727 def execute():728 try:729 self._adapter.notify_new_session(user,730 session_id,731 client_context)732 res = meta_protocol.write_notify_new_session()733 except Exception as err:734 res = meta_protocol.write_notify_new_session(err)735 return res736 return execute737 def _on_nsc(self, data):738 session_id = meta_protocol.read_notifiy_session_close(data)739 def execute():740 try:741 self._adapter.notify_session_close(session_id)742 except Exception as err:743 res = meta_protocol.write_notify_session_close(err)744 else:745 res = meta_protocol.write_notify_session_close()746 return res747 return execute748 def _on_gis(self, data):749 parsed_data = meta_protocol.read_get_items(data)750 session_id = parsed_data["session_id"]751 user = parsed_data["user"]752 group = parsed_data["group"]753 def execute():754 try:755 items = self._adapter.get_items(user, session_id, group)756 if not items:757 METADATA_LOGGER.warning("None or empty field list from "758 "get_items for group '%s'", group)759 except Exception as err:760 res = meta_protocol.write_get_items(exception=err)761 else:762 res = meta_protocol.write_get_items(items)763 return res764 return execute765 def _on_gsc(self, data):766 parsed_data = meta_protocol.read_get_schema(data)767 session_id = parsed_data["session_id"]768 group = parsed_data["group"]769 user = parsed_data["user"]770 schema = parsed_data["schema"]771 def execute():772 try:773 fields = self._adapter.get_schema(user, session_id, group,774 schema)775 if not fields:776 METADATA_LOGGER.warning("None or empty field list from "777 "get_schema for schema '%s' in "778 "group '%s'", schema, group)779 except Exception as err:780 res = meta_protocol.write_get_schema(exception=err)781 else:782 res = meta_protocol.write_get_schema(fields)783 return res784 return execute785 def _on_git(self, data):786 parsed_data = meta_protocol.read_get_item_data(data)787 def execute():788 try:789 items = [{"allowedModeList":790 [mode for mode in list(Mode)791 if self._adapter.mode_may_be_allowed(item, mode)],792 "distinctSnapshotLength":793 self._adapter.get_distinct_snapshot_length(item),794 "minSourceFrequency":795 self._adapter.get_min_source_frequency(item)}796 for item in parsed_data]797 except Exception as err:798 res = meta_protocol.write_get_item_data(exception=err)799 else:800 res = meta_protocol.write_get_item_data(items)801 return res802 return execute803 def _on_gui(self, data):804 parsed_data = meta_protocol.read_get_user_item_data(data)805 user = parsed_data["user"]806 user_items = parsed_data["items"]807 def execute():808 try:809 items = [{"allowedModeList":810 [mode for mode in list(Mode)811 if self._adapter.ismode_allowed(user, item, mode)],812 "allowedBufferSize":813 self._adapter.get_allowed_buffer_size(user, item),814 "allowedMaxFrequency":815 self._adapter.get_allowed_max_item_frequency(user,816 item)}817 for item in user_items]818 except Exception as err:819 res = meta_protocol.write_get_user_item_data(exception=err)820 else:821 res = meta_protocol.write_get_user_item_data(items)822 return res823 return execute824 def _on_num(self, data):825 parsed_data = meta_protocol.read_notify_user_message(data)826 session_id = parsed_data["session_id"]827 message = parsed_data["message"]828 user = parsed_data["user"]829 def execute():830 try:831 self._adapter.notify_user_message(user, session_id, message)832 except Exception as err:833 res = meta_protocol.write_notify_user_message(err)834 else:835 res = meta_protocol.write_notify_user_message()836 return res837 return execute838 def _on_nnt(self, data):839 parsed_data = meta_protocol.read_notify_new_tables(data)840 session_id = parsed_data["session_id"]841 user = parsed_data["user"]842 table_infos = parsed_data["tableInfos"]843 def execute():844 try:845 self._adapter.notify_new_tables(user, session_id, table_infos)846 except Exception as err:847 res = meta_protocol.write_notify_new_tables(err)848 else:849 res = meta_protocol.write_notify_new_tables()850 return res851 return execute852 def _on_ntc(self, data):853 data = meta_protocol.read_notify_tables_close(data)854 table_infos = data["tableInfos"]855 session_id = data["session_id"]856 def execute():857 try:858 self._adapter.notify_tables_close(session_id, table_infos)859 except Exception as err:860 res = meta_protocol.write_notify_tables_close(err)861 else:862 res = meta_protocol.write_notify_tables_close()863 return res864 return execute865 def _on_mda(self, data):866 parsed_data = meta_protocol.read_notify_device_access(data)867 mpn_device_info = parsed_data["mpnDeviceInfo"]868 session_id = parsed_data["sessionId"]869 user = parsed_data["user"]870 def execute():871 try:872 self._adapter.notify_mpn_device_access(user, session_id,873 mpn_device_info)874 except Exception as err:875 res = meta_protocol.write_notify_device_acces(err)876 else:877 res = meta_protocol.write_notify_device_acces()878 return res879 return execute880 def _on_msa(self, data):881 parsed_data = meta_protocol.read_subscription_activation(data)882 session_id = parsed_data["session_id"]883 table = parsed_data["table"]884 subscription = parsed_data["subscription"]885 user = parsed_data["user"]886 def execute():887 try:888 self._adapter.notify_mpn_subscription_activation(user,889 session_id,890 table,891 subscription)892 except Exception as err:893 res = meta_protocol.write_subscription_activation(err)894 else:895 res = meta_protocol.write_subscription_activation()896 return res897 return execute898 def _on_mdc(self, data):899 parsed_data = meta_protocol.read_device_token_change(data)900 session_id = parsed_data["sessionId"]901 mpn_device_info = parsed_data["mpnDeviceInfo"]902 user = parsed_data["user"]903 new_device_toksn = parsed_data["newDeviceToken"]904 def execute():905 try:906 self._adapter.notify_mpn_device_token_change(user,907 session_id,908 mpn_device_info,909 new_device_toksn)910 except Exception as err:911 res = meta_protocol.write_device_token_change(err)912 else:913 res = meta_protocol.write_device_token_change()914 return res915 return execute916 def _handle_ioexception(self, ioexception):917 METADATA_LOGGER.fatal("Exception caught while reading/writing from/to"918 " network: <%s>, aborting...", str(ioexception))919 super(MetadataProviderServer, self)._handle_ioexception(ioexception)920 def _handle_exception(self, exception):921 METADATA_LOGGER.error("Caught exception: %s", str(exception))922 return False923 @property924 def adapter_config(self):925 """The pathname of an optional configuration file for the Remote926 Metadata Adapter, to be passed to the927 :meth:`lightstreamer_adapter.interfaces.metadata.MetadataProvider.initialize`928 method.929 :Getter: Returns the pathname of the optional configuration file930 :Setter: Sets the pathname of the optional configuration file931 :type: str932 """933 return self._config_file934 @adapter_config.setter935 def adapter_config(self, value):936 self._config_file = value937 @property938 def adapter_params(self):939 """A dictionary object to be passed to the940 :meth:`lightstreamer_adapter.interfaces.metadata.MetadataProvider.initialize`941 method, to supply optional parameters.942 :Getter: Returns the dictionary object of optional parameters943 :Setter: Sets the dictionary object of optional parameters944 :type: dict945 """946 return self._params947 @adapter_params.setter948 def adapter_params(self, value):949 self._params = value950 def start(self):951 """Starts the Remote Metadata Adapter. A connection to the Proxy952 Adapter is performed (as soon as one is available). Then, requests953 issued by the Proxy Adapter are received and forwarded to the Remote954 Adapter.955 :raises \956 lightstreamer_adapter.interfaces.metadata.MetadataProviderError: If an957 error occurred in the initialization phase. The adapter was not958 started.959 """960 METADATA_LOGGER.info("Managing Metadata Adapter %s with a thread pool"961 " size of %d", self.name, self.thread_pool_size)962 try:963 super(MetadataProviderServer, self).start()964 except (TypeError, OSError) as err:965 raise MetadataProviderError("Caught an error during the "966 "initialization phase") from err967class DataProviderServer(Server):968 """A Remote Server object which can run a Remote Data Adapter and connect969 it to the Proxy Adapter running on Lightstreamer Server.970 The object should be provided with a DataProvider instance and with971 suitable initialization parameters and established connections, then972 activated through :meth:`DataProviderServer.start()` and finally disposed973 through :meth:`Server.close()`.974 Further reuse of the same instance is not supported.975 By default, the invocations to the Data Adapter methods will be done in976 a limited thread pool with a size determined by the number of detected cpu977 cores. The size can be specified through the provided978 ``thread_pool_size`` parameter. A size of 1 enforces strictly sequential979 invocations and can be used if parallelization of the calls is not980 supported by the Metadata Adapter. A value of 0, negative or ``None`` also981 implies the default behaviour as stated above.982 Note that :meth:`.subscribe` and :meth:`.unsubscribe` invocations for the983 same item are always guaranteed to be sequentialized in the right way,984 although they may not occur in the same thread.985 """986 def __init__(self, adapter, address, name=None, keep_alive=None,987 thread_pool_size=0, ssl_context=None):988 """Creates a server with the supplied configuration parameters. The989 initialize method of the Remote Adapter will be invoked only upon a990 Proxy Adapter request.991 :param lightstreamer_adapter.interfaces.data.DataProvider adapter: The992 Remote Adapter instance to be run.993 :param tuple address: the address of the Proxy Adapter supplied as a994 3-tuple ``(host, request_reply_port, notify_port)`` where:995 * host: a string representing the hostname or the IP address996 * request_reply_port: an int representing the request/reply port997 * notify_port: an int representing the notify port998 :param str name: the name associated to the Server instance.999 :param float keep_alive: the keepalive interval expressed in seconds1000 (or fractions)1001 :param int thread_pool_size: the thread pool size1002 :param SSLContext ssl_context: the SSL context to be used in the1003 case of encrypted communications with the Proxy Adapter1004 :raises TypeError: if the supplied Remote Adapter is not an instance of1005 a subclass of1006 :class:`lightstreamer_adapter.interfaces.data.DataProvider`.1007 """1008 super(DataProviderServer, self).__init__((address[0], address[1]),1009 name,1010 keep_alive,1011 thread_pool_size,1012 ssl_context)1013 if not isinstance(adapter, DataProvider):1014 raise TypeError("The provided adapter is not a subclass of "1015 "lightstreamer_adapter.interfaces.DataProvider")1016 self._config_file = None1017 self._params = None1018 self._adapter = adapter1019 self._subscription_mgr = SubscriptionManager(self._executor)1020 self.init_expected = True1021 self._notify_sender = None1022 self._notify_address = (address[0], address[2])1023 self._ntfy_sock = None1024 def _send_remote_credentials(self):1025 """Invoked for sending the remote credentials to the Proxy Metadata1026 Adapter.1027 """1028 unsolicited_message = protocol.write_credentials(self.remote_user,1029 self.remote_password)1030 self._send_reply("1", unsolicited_message)1031 self._send_notify(unsolicited_message)1032 def _handle_request(self, request_id, data, method_name):1033 init_request = method_name == str(data_protocol.Method.DPI)1034 if init_request and not self.init_expected:1035 raise RemotingException("Unexpected late {} request"1036 .format(str(data_protocol.Method.DPI)))1037 if not init_request and self.init_expected:1038 raise RemotingException("Unexpected request {} while waiting for "1039 "{} request"1040 .format(method_name,1041 data_protocol.Method.DPI))1042 if init_request:1043 self.init_expected = False1044 res = self._on_dpi(data)1045 self._send_reply(request_id, res)1046 elif method_name == "SUB":1047 self._on_sub(request_id, data)1048 elif method_name == "USB":1049 self._on_usb(request_id, data)1050 else:1051 DATA_LOGGER.warning("Discarding unknown request: %s", method_name)1052 @property1053 def adapter_config(self):1054 """The pathname of an optional configuration file for the Remote1055 Data Adapter, to be passed to the1056 :meth:`lightstreamer_adapter.interfaces.data.DataProvider.initialize`1057 method.1058 :Getter: Returns the pathname of the optional configuration file1059 :Setter: Sets the pathname of the optional configuration file1060 :type: str1061 """1062 return self._config_file1063 @adapter_config.setter1064 def adapter_config(self, value):1065 self._config_file = value1066 @property1067 def adapter_params(self):1068 """A dictionary object to be passed to the1069 :meth:`lightstreamer_adapter.interfaces.data.DataProvider.initialize`1070 method, to supply optional parameters.1071 :Getter: Returns the dictionary object of optional parameters1072 :Setter: Sets the dictionary object of optional parameters1073 :type: dict1074 """1075 return self._params1076 @adapter_params.setter1077 def adapter_params(self, value):1078 self._params = value1079 def _on_request_receiver_started(self):1080 """Invoked to notify this subclass the the Request Receiver has been1081 started.1082 This class creates an additional socket to enable the communication1083 from the Remote Data Adapter to the Proxy Adapter, in order to send1084 data over the notification channel.1085 """1086 self._ntfy_sock = create_socket_and_connect(self._notify_address,1087 self._ssl_context)1088 notify_sender_log = logging.getLogger("lightstreamer-adapter."1089 "requestreply.notifications."1090 "NotifySender")1091 self._notify_sender = _Sender(sock=self._ntfy_sock, server=self,1092 name=self.name + " (Notify)",1093 keepalive=self.keep_alive,1094 log=notify_sender_log)1095 self._notify_sender.start()1096 def _on_dpi(self, data):1097 return self._on_init(data_protocol, self._params, self._config_file,1098 data, self._adapter, True)1099 def _on_sub(self, request_id, data):1100 item_name = data_protocol.read_sub(data)1101 def do_task():1102 DATA_LOGGER.debug("Processing SUB request: %s", request_id)1103 success = False1104 try:1105 snpt_available = self._adapter.issnapshot_available(item_name)1106 if snpt_available is False:1107 self.end_of_snapshot(item_name)1108 self._adapter.subscribe(item_name)1109 success = True1110 except Exception as err:1111 res = data_protocol.write_sub(err)1112 else:1113 res = data_protocol.write_sub()1114 self._send_reply(request_id, res)1115 return success1116 def do_late_task():1117 DATA_LOGGER.info("Skipping request: %s", request_id)1118 subscribe_err = SubscribeError("Subscribe request come too late")1119 res = data_protocol.write_sub(subscribe_err)1120 return res1121 sub_task = ItemTask(request_id, True, do_task, do_late_task)1122 self._subscription_mgr.do_subscription(item_name, sub_task)1123 def _on_usb(self, request_id, data):1124 item_name = data_protocol.read_usub(data)1125 def do_task():1126 DATA_LOGGER.debug("Processing USB request: %s", request_id)1127 success = False1128 try:1129 self._adapter.unsubscribe(item_name)1130 success = True1131 except Exception as err:1132 res = data_protocol.write_unsub(err)1133 else:1134 res = data_protocol.write_unsub()1135 self._send_reply(request_id, res)1136 return success1137 def do_late_task():1138 DATA_LOGGER.info("Skipping request: %s", request_id)1139 res = data_protocol.write_unsub()1140 self._send_reply(request_id, res)1141 unsub_task = ItemTask(request_id, False, do_task, do_late_task)1142 self._subscription_mgr.do_unsubscription(item_name, unsub_task)1143 @notify1144 def _send_notify(self, ntfy):1145 self._notify_sender.send(ntfy)1146 def update(self, item_name, events_map, issnapshot):1147 request_id = self._subscription_mgr.get_active_item(item_name)1148 if request_id:1149 try:1150 res = data_protocol.write_update_map(item_name, request_id,1151 issnapshot, events_map)1152 self._send_notify(res)1153 except RemotingException as err:1154 self.on_exception(err)1155 else:1156 DATA_LOGGER.warning("Unexpected update for item_name %s",1157 item_name)1158 def end_of_snapshot(self, item_name):1159 request_id = self._subscription_mgr.get_active_item(item_name)1160 if request_id:1161 try:1162 res = data_protocol.write_eos(item_name, request_id)1163 self._send_notify(res)1164 except RemotingException as err:1165 self.on_exception(err)1166 else:1167 DATA_LOGGER.warning("Unexpected end_of_snapshot notify for "1168 "item_name %s", item_name)1169 def clear_snapshot(self, item_name):1170 request_id = self._subscription_mgr.get_active_item(item_name)1171 if request_id:1172 try:1173 res = data_protocol.write_cls(item_name, request_id)1174 self._send_notify(res)1175 except RemotingException as err:1176 self.on_exception(err)1177 else:1178 DATA_LOGGER.warning("Unexpected clear_snapshot for item_name %s",1179 item_name)1180 def failure(self, exception):1181 try:1182 res = data_protocol.write_failure(exception)1183 self._send_notify(res)1184 except RemotingException as err:1185 self.on_exception(err)1186 def _handle_ioexception(self, ioexception):1187 DATA_LOGGER.fatal("Exception caught while reading/writing from/to "1188 "network: <%s>, aborting...", str(ioexception))1189 super(DataProviderServer, self)._handle_ioexception(ioexception)1190 def _handle_exception(self, exception):1191 DATA_LOGGER.error("Caught exception: %s, trying to notify a "1192 "failure...", str(exception))1193 try:1194 res = data_protocol.write_failure(exception)1195 self._send_notify(res)1196 except RemotingException:1197 DATA_LOGGER.exception("Caught second-level exception while trying"1198 " to notify a first-level exception")1199 return False1200 def _change_keep_alive(self, keep_alive_milliseconds):1201 super(DataProviderServer, self)._change_keep_alive(keep_alive_milliseconds)1202 self._notify_sender.change_keep_alive(keep_alive_milliseconds / 1000, True)1203 def start(self):1204 """Starts the Remote Data Adapter. A connection to the Proxy Adapter is1205 performed (as soon as one is available). Then, requests issued by the1206 Proxy Adapter are received and forwarded to the Remote Adapter.1207 :raises lightstreamer_adapter.interfaces.data.DataProviderError: If an1208 error occurred in the initialization phase. The adapter was not1209 started.1210 """1211 DATA_LOGGER.info("Managing Data Adapter %s with a thread pool size of"1212 " %d", self.name, self.thread_pool_size)1213 try:1214 super(DataProviderServer, self).start()1215 except (TypeError, OSError) as err:1216 raise DataProviderError("Caught an error during the "1217 "initialization phase") from err1218 def close(self):1219 """Stops the management of the Remote Data Adapter attached to this1220 Server object.1221 The method first invokes the inherited Server.close() and then closes1222 the Notify Sender object.1223 """1224 super(DataProviderServer, self).close()1225 self._ntfy_sock.close()1226 self._notify_sender.quit()1227class ExceptionHandler(metaclass=ABCMeta):1228 """An abstract class meant to to be implemented in order to provide a1229 Remote Server instance with a custom handler for error conditions occurring1230 on the Remote Server.1231 Note that multiple redundant invocations on the same Remote Server instance1232 are possible.1233 """1234 def __init__(self):1235 pass1236 def handle_ioexception(self, exception):1237 """Called by the Remote Server upon a read or write operation failure.1238 This may mean that the connection to Lightstreamer Server is lost; in1239 any way, after this error, the correct operation of this Remote Server1240 operation is compromised.1241 This can be the signal of a normal termination of Lightstreamer Server.1242 If this is not the case, then this Remote Server should be closed and a1243 new one should be created and initialized. This may mean closing and1244 restarting the process or just creating a new instance, depending on1245 the implementation choice. This will be detected by the Proxy Adapter,1246 which will react accordingly.1247 The default handling just terminates the process.1248 :param Exception exception: An Exception showing the cause of the1249 problem.1250 :return bool: ``True`` to enable the default handling, false to1251 suppress it.1252 """1253 def handle_exception(self, exception):1254 """Called by the Remote Server upon an unexpected error. After this1255 error, the correct operation of this Remote Server instance is1256 compromised.1257 If this is the case, then this Remote Server instance should be closed1258 and a new one should be created and initialized. This may mean closing1259 and restarting the process or just creating a new instance, depending1260 on the implementation choice. This will be detected by the Proxy1261 Adapter, which will react accordingly.1262 The default handling, in case of a Remote Data Adapter, issues an1263 asynchronous failure notification to the Proxy Adapter.1264 In case of a Remote Metadata Adapter, the default handling ignores the1265 notification; however, as a consequence of the Remote Protocol being1266 broken, the Proxy Adapter may return exceptions against one or more1267 specific requests by Lightstreamer Kernel.1268 :param Exception exception: An Exception showing the cause of the1269 problem.1270 :return: ``True`` to enable the default handling, false to1271 suppress it.1272 :rtype: bool...
test_mqtt.py
Source:test_mqtt.py
...114 MQTTClient.subscribe(mock_client, "abc")115 assert mock_client._subscribed[123].topic == "abc"116 assert mock_client._subscribed[123].subscribed == False117 def test_no_subscribe_on_err(self):118 def subscribe_err(topic, *args, **kwargs):119 return (1, 123)120 mock_client = TestSubscription.get_mock_client_with(subscribe_err)121 MQTTClient.subscribe(mock_client, "abc")122 assert mock_client._subscribed == {}123 def test_no_subscribe_on_unrecognised_suback(self):124 def subscribe_success(topic, *args, **kwargs):125 return (0, 123)126 mock_client = TestSubscription.get_mock_client_with(subscribe_success)127 MQTTClient._on_subscribe(mock_client, "abc", {}, 123, 0)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!