Best Python code snippet using molotov_python
selector_events.py
Source:selector_events.py
1"""Event loop using a selector and related classes.2A selector is a "notify-when-ready" multiplexer. For a subclass which3also includes support for signal handling, see the unix_events sub-module.4"""5__all__ = ['BaseSelectorEventLoop']6import collections7import errno8import functools9import socket10import sys11import warnings12try:13 import ssl14except ImportError: # pragma: no cover15 ssl = None16from . import base_events17from . import constants18from . import events19from . import futures20from . import selectors21from . import transports22from . import sslproto23from .coroutines import coroutine24from .log import logger25def _test_selector_event(selector, fd, event):26 # Test if the selector is monitoring 'event' events27 # for the file descriptor 'fd'.28 try:29 key = selector.get_key(fd)30 except KeyError:31 return False32 else:33 return bool(key.events & event)34class BaseSelectorEventLoop(base_events.BaseEventLoop):35 """Selector event loop.36 See events.EventLoop for API specification.37 """38 def __init__(self, selector=None):39 super().__init__()40 if selector is None:41 selector = selectors.DefaultSelector()42 logger.debug('Using selector: %s', selector.__class__.__name__)43 self._selector = selector44 self._make_self_pipe()45 def _make_socket_transport(self, sock, protocol, waiter=None, *,46 extra=None, server=None):47 return _SelectorSocketTransport(self, sock, protocol, waiter,48 extra, server)49 def _make_ssl_transport(self, rawsock, protocol, sslcontext, waiter=None,50 *, server_side=False, server_hostname=None,51 extra=None, server=None):52 if not sslproto._is_sslproto_available():53 return self._make_legacy_ssl_transport(54 rawsock, protocol, sslcontext, waiter,55 server_side=server_side, server_hostname=server_hostname,56 extra=extra, server=server)57 ssl_protocol = sslproto.SSLProtocol(self, protocol, sslcontext, waiter,58 server_side, server_hostname)59 _SelectorSocketTransport(self, rawsock, ssl_protocol,60 extra=extra, server=server)61 return ssl_protocol._app_transport62 def _make_legacy_ssl_transport(self, rawsock, protocol, sslcontext,63 waiter, *,64 server_side=False, server_hostname=None,65 extra=None, server=None):66 # Use the legacy API: SSL_write, SSL_read, etc. The legacy API is used67 # on Python 3.4 and older, when ssl.MemoryBIO is not available.68 return _SelectorSslTransport(69 self, rawsock, protocol, sslcontext, waiter,70 server_side, server_hostname, extra, server)71 def _make_datagram_transport(self, sock, protocol,72 address=None, waiter=None, extra=None):73 return _SelectorDatagramTransport(self, sock, protocol,74 address, waiter, extra)75 def close(self):76 if self.is_running():77 raise RuntimeError("Cannot close a running event loop")78 if self.is_closed():79 return80 self._close_self_pipe()81 super().close()82 if self._selector is not None:83 self._selector.close()84 self._selector = None85 def _socketpair(self):86 raise NotImplementedError87 def _close_self_pipe(self):88 self.remove_reader(self._ssock.fileno())89 self._ssock.close()90 self._ssock = None91 self._csock.close()92 self._csock = None93 self._internal_fds -= 194 def _make_self_pipe(self):95 # A self-socket, really. :-)96 self._ssock, self._csock = self._socketpair()97 self._ssock.setblocking(False)98 self._csock.setblocking(False)99 self._internal_fds += 1100 self.add_reader(self._ssock.fileno(), self._read_from_self)101 def _process_self_data(self, data):102 pass103 def _read_from_self(self):104 while True:105 try:106 data = self._ssock.recv(4096)107 if not data:108 break109 self._process_self_data(data)110 except InterruptedError:111 continue112 except BlockingIOError:113 break114 def _write_to_self(self):115 # This may be called from a different thread, possibly after116 # _close_self_pipe() has been called or even while it is117 # running. Guard for self._csock being None or closed. When118 # a socket is closed, send() raises OSError (with errno set to119 # EBADF, but let's not rely on the exact error code).120 csock = self._csock121 if csock is not None:122 try:123 csock.send(b'\0')124 except OSError:125 if self._debug:126 logger.debug("Fail to write a null byte into the "127 "self-pipe socket",128 exc_info=True)129 def _start_serving(self, protocol_factory, sock,130 sslcontext=None, server=None):131 self.add_reader(sock.fileno(), self._accept_connection,132 protocol_factory, sock, sslcontext, server)133 def _accept_connection(self, protocol_factory, sock,134 sslcontext=None, server=None):135 try:136 conn, addr = sock.accept()137 if self._debug:138 logger.debug("%r got a new connection from %r: %r",139 server, addr, conn)140 conn.setblocking(False)141 except (BlockingIOError, InterruptedError, ConnectionAbortedError):142 pass # False alarm.143 except OSError as exc:144 # There's nowhere to send the error, so just log it.145 if exc.errno in (errno.EMFILE, errno.ENFILE,146 errno.ENOBUFS, errno.ENOMEM):147 # Some platforms (e.g. Linux keep reporting the FD as148 # ready, so we remove the read handler temporarily.149 # We'll try again in a while.150 self.call_exception_handler({151 'message': 'socket.accept() out of system resource',152 'exception': exc,153 'socket': sock,154 })155 self.remove_reader(sock.fileno())156 self.call_later(constants.ACCEPT_RETRY_DELAY,157 self._start_serving,158 protocol_factory, sock, sslcontext, server)159 else:160 raise # The event loop will catch, log and ignore it.161 else:162 extra = {'peername': addr}163 accept = self._accept_connection2(protocol_factory, conn, extra,164 sslcontext, server)165 self.create_task(accept)166 @coroutine167 def _accept_connection2(self, protocol_factory, conn, extra,168 sslcontext=None, server=None):169 protocol = None170 transport = None171 try:172 protocol = protocol_factory()173 waiter = futures.Future(loop=self)174 if sslcontext:175 transport = self._make_ssl_transport(176 conn, protocol, sslcontext, waiter=waiter,177 server_side=True, extra=extra, server=server)178 else:179 transport = self._make_socket_transport(180 conn, protocol, waiter=waiter, extra=extra,181 server=server)182 try:183 yield from waiter184 except:185 transport.close()186 raise187 # It's now up to the protocol to handle the connection.188 except Exception as exc:189 if self._debug:190 context = {191 'message': ('Error on transport creation '192 'for incoming connection'),193 'exception': exc,194 }195 if protocol is not None:196 context['protocol'] = protocol197 if transport is not None:198 context['transport'] = transport199 self.call_exception_handler(context)200 def add_reader(self, fd, callback, *args):201 """Add a reader callback."""202 self._check_closed()203 handle = events.Handle(callback, args, self)204 try:205 key = self._selector.get_key(fd)206 except KeyError:207 self._selector.register(fd, selectors.EVENT_READ,208 (handle, None))209 else:210 mask, (reader, writer) = key.events, key.data211 self._selector.modify(fd, mask | selectors.EVENT_READ,212 (handle, writer))213 if reader is not None:214 reader.cancel()215 def remove_reader(self, fd):216 """Remove a reader callback."""217 if self.is_closed():218 return False219 try:220 key = self._selector.get_key(fd)221 except KeyError:222 return False223 else:224 mask, (reader, writer) = key.events, key.data225 mask &= ~selectors.EVENT_READ226 if not mask:227 self._selector.unregister(fd)228 else:229 self._selector.modify(fd, mask, (None, writer))230 if reader is not None:231 reader.cancel()232 return True233 else:234 return False235 def add_writer(self, fd, callback, *args):236 """Add a writer callback.."""237 self._check_closed()238 handle = events.Handle(callback, args, self)239 try:240 key = self._selector.get_key(fd)241 except KeyError:242 self._selector.register(fd, selectors.EVENT_WRITE,243 (None, handle))244 else:245 mask, (reader, writer) = key.events, key.data246 self._selector.modify(fd, mask | selectors.EVENT_WRITE,247 (reader, handle))248 if writer is not None:249 writer.cancel()250 def remove_writer(self, fd):251 """Remove a writer callback."""252 if self.is_closed():253 return False254 try:255 key = self._selector.get_key(fd)256 except KeyError:257 return False258 else:259 mask, (reader, writer) = key.events, key.data260 # Remove both writer and connector.261 mask &= ~selectors.EVENT_WRITE262 if not mask:263 self._selector.unregister(fd)264 else:265 self._selector.modify(fd, mask, (reader, None))266 if writer is not None:267 writer.cancel()268 return True269 else:270 return False271 def sock_recv(self, sock, n):272 """Receive data from the socket.273 The return value is a bytes object representing the data received.274 The maximum amount of data to be received at once is specified by275 nbytes.276 This method is a coroutine.277 """278 if self._debug and sock.gettimeout() != 0:279 raise ValueError("the socket must be non-blocking")280 fut = futures.Future(loop=self)281 self._sock_recv(fut, False, sock, n)282 return fut283 def _sock_recv(self, fut, registered, sock, n):284 # _sock_recv() can add itself as an I/O callback if the operation can't285 # be done immediately. Don't use it directly, call sock_recv().286 fd = sock.fileno()287 if registered:288 # Remove the callback early. It should be rare that the289 # selector says the fd is ready but the call still returns290 # EAGAIN, and I am willing to take a hit in that case in291 # order to simplify the common case.292 self.remove_reader(fd)293 if fut.cancelled():294 return295 try:296 data = sock.recv(n)297 except (BlockingIOError, InterruptedError):298 self.add_reader(fd, self._sock_recv, fut, True, sock, n)299 except Exception as exc:300 fut.set_exception(exc)301 else:302 fut.set_result(data)303 def sock_sendall(self, sock, data):304 """Send data to the socket.305 The socket must be connected to a remote socket. This method continues306 to send data from data until either all data has been sent or an307 error occurs. None is returned on success. On error, an exception is308 raised, and there is no way to determine how much data, if any, was309 successfully processed by the receiving end of the connection.310 This method is a coroutine.311 """312 if self._debug and sock.gettimeout() != 0:313 raise ValueError("the socket must be non-blocking")314 fut = futures.Future(loop=self)315 if data:316 self._sock_sendall(fut, False, sock, data)317 else:318 fut.set_result(None)319 return fut320 def _sock_sendall(self, fut, registered, sock, data):321 fd = sock.fileno()322 if registered:323 self.remove_writer(fd)324 if fut.cancelled():325 return326 try:327 n = sock.send(data)328 except (BlockingIOError, InterruptedError):329 n = 0330 except Exception as exc:331 fut.set_exception(exc)332 return333 if n == len(data):334 fut.set_result(None)335 else:336 if n:337 data = data[n:]338 self.add_writer(fd, self._sock_sendall, fut, True, sock, data)339 def sock_connect(self, sock, address):340 """Connect to a remote socket at address.341 The address must be already resolved to avoid the trap of hanging the342 entire event loop when the address requires doing a DNS lookup. For343 example, it must be an IP address, not an hostname, for AF_INET and344 AF_INET6 address families. Use getaddrinfo() to resolve the hostname345 asynchronously.346 This method is a coroutine.347 """348 if self._debug and sock.gettimeout() != 0:349 raise ValueError("the socket must be non-blocking")350 fut = futures.Future(loop=self)351 try:352 if self._debug:353 base_events._check_resolved_address(sock, address)354 except ValueError as err:355 fut.set_exception(err)356 else:357 self._sock_connect(fut, sock, address)358 return fut359 def _sock_connect(self, fut, sock, address):360 fd = sock.fileno()361 try:362 while True:363 try:364 sock.connect(address)365 except InterruptedError:366 continue367 else:368 break369 except BlockingIOError:370 fut.add_done_callback(functools.partial(self._sock_connect_done,371 fd))372 self.add_writer(fd, self._sock_connect_cb, fut, sock, address)373 except Exception as exc:374 fut.set_exception(exc)375 else:376 fut.set_result(None)377 def _sock_connect_done(self, fd, fut):378 self.remove_writer(fd)379 def _sock_connect_cb(self, fut, sock, address):380 if fut.cancelled():381 return382 try:383 err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)384 if err != 0:385 # Jump to any except clause below.386 raise OSError(err, 'Connect call failed %s' % (address,))387 except (BlockingIOError, InterruptedError):388 # socket is still registered, the callback will be retried later389 pass390 except Exception as exc:391 fut.set_exception(exc)392 else:393 fut.set_result(None)394 def sock_accept(self, sock):395 """Accept a connection.396 The socket must be bound to an address and listening for connections.397 The return value is a pair (conn, address) where conn is a new socket398 object usable to send and receive data on the connection, and address399 is the address bound to the socket on the other end of the connection.400 This method is a coroutine.401 """402 if self._debug and sock.gettimeout() != 0:403 raise ValueError("the socket must be non-blocking")404 fut = futures.Future(loop=self)405 self._sock_accept(fut, False, sock)406 return fut407 def _sock_accept(self, fut, registered, sock):408 fd = sock.fileno()409 if registered:410 self.remove_reader(fd)411 if fut.cancelled():412 return413 try:414 conn, address = sock.accept()415 conn.setblocking(False)416 except (BlockingIOError, InterruptedError):417 self.add_reader(fd, self._sock_accept, fut, True, sock)418 except Exception as exc:419 fut.set_exception(exc)420 else:421 fut.set_result((conn, address))422 def _process_events(self, event_list):423 for key, mask in event_list:424 fileobj, (reader, writer) = key.fileobj, key.data425 if mask & selectors.EVENT_READ and reader is not None:426 if reader._cancelled:427 self.remove_reader(fileobj)428 else:429 self._add_callback(reader)430 if mask & selectors.EVENT_WRITE and writer is not None:431 if writer._cancelled:432 self.remove_writer(fileobj)433 else:434 self._add_callback(writer)435 def _stop_serving(self, sock):436 self.remove_reader(sock.fileno())437 sock.close()438class _SelectorTransport(transports._FlowControlMixin,439 transports.Transport):440 max_size = 256 * 1024 # Buffer size passed to recv().441 _buffer_factory = bytearray # Constructs initial value for self._buffer.442 # Attribute used in the destructor: it must be set even if the constructor443 # is not called (see _SelectorSslTransport which may start by raising an444 # exception)445 _sock = None446 def __init__(self, loop, sock, protocol, extra=None, server=None):447 super().__init__(extra, loop)448 self._extra['socket'] = sock449 self._extra['sockname'] = sock.getsockname()450 if 'peername' not in self._extra:451 try:452 self._extra['peername'] = sock.getpeername()453 except socket.error:454 self._extra['peername'] = None455 self._sock = sock456 self._sock_fd = sock.fileno()457 self._protocol = protocol458 self._protocol_connected = True459 self._server = server460 self._buffer = self._buffer_factory()461 self._conn_lost = 0 # Set when call to connection_lost scheduled.462 self._closing = False # Set when close() called.463 if self._server is not None:464 self._server._attach()465 def __repr__(self):466 info = [self.__class__.__name__]467 if self._sock is None:468 info.append('closed')469 elif self._closing:470 info.append('closing')471 info.append('fd=%s' % self._sock_fd)472 # test if the transport was closed473 if self._loop is not None:474 polling = _test_selector_event(self._loop._selector,475 self._sock_fd, selectors.EVENT_READ)476 if polling:477 info.append('read=polling')478 else:479 info.append('read=idle')480 polling = _test_selector_event(self._loop._selector,481 self._sock_fd,482 selectors.EVENT_WRITE)483 if polling:484 state = 'polling'485 else:486 state = 'idle'487 bufsize = self.get_write_buffer_size()488 info.append('write=<%s, bufsize=%s>' % (state, bufsize))489 return '<%s>' % ' '.join(info)490 def abort(self):491 self._force_close(None)492 def close(self):493 if self._closing:494 return495 self._closing = True496 self._loop.remove_reader(self._sock_fd)497 if not self._buffer:498 self._conn_lost += 1499 self._loop.call_soon(self._call_connection_lost, None)500 # On Python 3.3 and older, objects with a destructor part of a reference501 # cycle are never destroyed. It's not more the case on Python 3.4 thanks502 # to the PEP 442.503 if sys.version_info >= (3, 4):504 def __del__(self):505 if self._sock is not None:506 warnings.warn("unclosed transport %r" % self, ResourceWarning)507 self._sock.close()508 def _fatal_error(self, exc, message='Fatal error on transport'):509 # Should be called from exception handler only.510 if isinstance(exc, (BrokenPipeError,511 ConnectionResetError, ConnectionAbortedError)):512 if self._loop.get_debug():513 logger.debug("%r: %s", self, message, exc_info=True)514 else:515 self._loop.call_exception_handler({516 'message': message,517 'exception': exc,518 'transport': self,519 'protocol': self._protocol,520 })521 self._force_close(exc)522 def _force_close(self, exc):523 if self._conn_lost:524 return525 if self._buffer:526 self._buffer.clear()527 self._loop.remove_writer(self._sock_fd)528 if not self._closing:529 self._closing = True530 self._loop.remove_reader(self._sock_fd)531 self._conn_lost += 1532 self._loop.call_soon(self._call_connection_lost, exc)533 def _call_connection_lost(self, exc):534 try:535 if self._protocol_connected:536 self._protocol.connection_lost(exc)537 finally:538 self._sock.close()539 self._sock = None540 self._protocol = None541 self._loop = None542 server = self._server543 if server is not None:544 server._detach()545 self._server = None546 def get_write_buffer_size(self):547 return len(self._buffer)548class _SelectorSocketTransport(_SelectorTransport):549 def __init__(self, loop, sock, protocol, waiter=None,550 extra=None, server=None):551 super().__init__(loop, sock, protocol, extra, server)552 self._eof = False553 self._paused = False554 self._loop.call_soon(self._protocol.connection_made, self)555 # only start reading when connection_made() has been called556 self._loop.call_soon(self._loop.add_reader,557 self._sock_fd, self._read_ready)558 if waiter is not None:559 # only wake up the waiter when connection_made() has been called560 self._loop.call_soon(waiter._set_result_unless_cancelled, None)561 def pause_reading(self):562 if self._closing:563 raise RuntimeError('Cannot pause_reading() when closing')564 if self._paused:565 raise RuntimeError('Already paused')566 self._paused = True567 self._loop.remove_reader(self._sock_fd)568 if self._loop.get_debug():569 logger.debug("%r pauses reading", self)570 def resume_reading(self):571 if not self._paused:572 raise RuntimeError('Not paused')573 self._paused = False574 if self._closing:575 return576 self._loop.add_reader(self._sock_fd, self._read_ready)577 if self._loop.get_debug():578 logger.debug("%r resumes reading", self)579 def _read_ready(self):580 try:581 data = self._sock.recv(self.max_size)582 except (BlockingIOError, InterruptedError):583 pass584 except Exception as exc:585 self._fatal_error(exc, 'Fatal read error on socket transport')586 else:587 if data:588 self._protocol.data_received(data)589 else:590 if self._loop.get_debug():591 logger.debug("%r received EOF", self)592 keep_open = self._protocol.eof_received()593 if keep_open:594 # We're keeping the connection open so the595 # protocol can write more, but we still can't596 # receive more, so remove the reader callback.597 self._loop.remove_reader(self._sock_fd)598 else:599 self.close()600 def write(self, data):601 if not isinstance(data, (bytes, bytearray, memoryview)):602 raise TypeError('data argument must be byte-ish (%r)',603 type(data))604 if self._eof:605 raise RuntimeError('Cannot call write() after write_eof()')606 if not data:607 return608 if self._conn_lost:609 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:610 logger.warning('socket.send() raised exception.')611 self._conn_lost += 1612 return613 if not self._buffer:614 # Optimization: try to send now.615 try:616 n = self._sock.send(data)617 except (BlockingIOError, InterruptedError):618 pass619 except Exception as exc:620 self._fatal_error(exc, 'Fatal write error on socket transport')621 return622 else:623 data = data[n:]624 if not data:625 return626 # Not all was written; register write handler.627 self._loop.add_writer(self._sock_fd, self._write_ready)628 # Add it to the buffer.629 self._buffer.extend(data)630 self._maybe_pause_protocol()631 def _write_ready(self):632 assert self._buffer, 'Data should not be empty'633 try:634 n = self._sock.send(self._buffer)635 except (BlockingIOError, InterruptedError):636 pass637 except Exception as exc:638 self._loop.remove_writer(self._sock_fd)639 self._buffer.clear()640 self._fatal_error(exc, 'Fatal write error on socket transport')641 else:642 if n:643 del self._buffer[:n]644 self._maybe_resume_protocol() # May append to buffer.645 if not self._buffer:646 self._loop.remove_writer(self._sock_fd)647 if self._closing:648 self._call_connection_lost(None)649 elif self._eof:650 self._sock.shutdown(socket.SHUT_WR)651 def write_eof(self):652 if self._eof:653 return654 self._eof = True655 if not self._buffer:656 self._sock.shutdown(socket.SHUT_WR)657 def can_write_eof(self):658 return True659class _SelectorSslTransport(_SelectorTransport):660 _buffer_factory = bytearray661 def __init__(self, loop, rawsock, protocol, sslcontext, waiter=None,662 server_side=False, server_hostname=None,663 extra=None, server=None):664 if ssl is None:665 raise RuntimeError('stdlib ssl module not available')666 if not sslcontext:667 sslcontext = sslproto._create_transport_context(server_side, server_hostname)668 wrap_kwargs = {669 'server_side': server_side,670 'do_handshake_on_connect': False,671 }672 if server_hostname and not server_side:673 wrap_kwargs['server_hostname'] = server_hostname674 sslsock = sslcontext.wrap_socket(rawsock, **wrap_kwargs)675 super().__init__(loop, sslsock, protocol, extra, server)676 # the protocol connection is only made after the SSL handshake677 self._protocol_connected = False678 self._server_hostname = server_hostname679 self._waiter = waiter680 self._sslcontext = sslcontext681 self._paused = False682 # SSL-specific extra info. (peercert is set later)683 self._extra.update(sslcontext=sslcontext)684 if self._loop.get_debug():685 logger.debug("%r starts SSL handshake", self)686 start_time = self._loop.time()687 else:688 start_time = None689 self._on_handshake(start_time)690 def _wakeup_waiter(self, exc=None):691 if self._waiter is None:692 return693 if not self._waiter.cancelled():694 if exc is not None:695 self._waiter.set_exception(exc)696 else:697 self._waiter.set_result(None)698 self._waiter = None699 def _on_handshake(self, start_time):700 try:701 self._sock.do_handshake()702 except ssl.SSLWantReadError:703 self._loop.add_reader(self._sock_fd,704 self._on_handshake, start_time)705 return706 except ssl.SSLWantWriteError:707 self._loop.add_writer(self._sock_fd,708 self._on_handshake, start_time)709 return710 except BaseException as exc:711 if self._loop.get_debug():712 logger.warning("%r: SSL handshake failed",713 self, exc_info=True)714 self._loop.remove_reader(self._sock_fd)715 self._loop.remove_writer(self._sock_fd)716 self._sock.close()717 self._wakeup_waiter(exc)718 if isinstance(exc, Exception):719 return720 else:721 raise722 self._loop.remove_reader(self._sock_fd)723 self._loop.remove_writer(self._sock_fd)724 peercert = self._sock.getpeercert()725 if not hasattr(self._sslcontext, 'check_hostname'):726 # Verify hostname if requested, Python 3.4+ uses check_hostname727 # and checks the hostname in do_handshake()728 if (self._server_hostname and729 self._sslcontext.verify_mode != ssl.CERT_NONE):730 try:731 ssl.match_hostname(peercert, self._server_hostname)732 except Exception as exc:733 if self._loop.get_debug():734 logger.warning("%r: SSL handshake failed "735 "on matching the hostname",736 self, exc_info=True)737 self._sock.close()738 self._wakeup_waiter(exc)739 return740 # Add extra info that becomes available after handshake.741 self._extra.update(peercert=peercert,742 cipher=self._sock.cipher(),743 compression=self._sock.compression(),744 )745 self._read_wants_write = False746 self._write_wants_read = False747 self._loop.add_reader(self._sock_fd, self._read_ready)748 self._protocol_connected = True749 self._loop.call_soon(self._protocol.connection_made, self)750 # only wake up the waiter when connection_made() has been called751 self._loop.call_soon(self._wakeup_waiter)752 if self._loop.get_debug():753 dt = self._loop.time() - start_time754 logger.debug("%r: SSL handshake took %.1f ms", self, dt * 1e3)755 def pause_reading(self):756 # XXX This is a bit icky, given the comment at the top of757 # _read_ready(). Is it possible to evoke a deadlock? I don't758 # know, although it doesn't look like it; write() will still759 # accept more data for the buffer and eventually the app will760 # call resume_reading() again, and things will flow again.761 if self._closing:762 raise RuntimeError('Cannot pause_reading() when closing')763 if self._paused:764 raise RuntimeError('Already paused')765 self._paused = True766 self._loop.remove_reader(self._sock_fd)767 if self._loop.get_debug():768 logger.debug("%r pauses reading", self)769 def resume_reading(self):770 if not self._paused:771 raise RuntimeError('Not paused')772 self._paused = False773 if self._closing:774 return775 self._loop.add_reader(self._sock_fd, self._read_ready)776 if self._loop.get_debug():777 logger.debug("%r resumes reading", self)778 def _read_ready(self):779 if self._write_wants_read:780 self._write_wants_read = False781 self._write_ready()782 if self._buffer:783 self._loop.add_writer(self._sock_fd, self._write_ready)784 try:785 data = self._sock.recv(self.max_size)786 except (BlockingIOError, InterruptedError, ssl.SSLWantReadError):787 pass788 except ssl.SSLWantWriteError:789 self._read_wants_write = True790 self._loop.remove_reader(self._sock_fd)791 self._loop.add_writer(self._sock_fd, self._write_ready)792 except Exception as exc:793 self._fatal_error(exc, 'Fatal read error on SSL transport')794 else:795 if data:796 self._protocol.data_received(data)797 else:798 try:799 if self._loop.get_debug():800 logger.debug("%r received EOF", self)801 keep_open = self._protocol.eof_received()802 if keep_open:803 logger.warning('returning true from eof_received() '804 'has no effect when using ssl')805 finally:806 self.close()807 def _write_ready(self):808 if self._read_wants_write:809 self._read_wants_write = False810 self._read_ready()811 if not (self._paused or self._closing):812 self._loop.add_reader(self._sock_fd, self._read_ready)813 if self._buffer:814 try:815 n = self._sock.send(self._buffer)816 except (BlockingIOError, InterruptedError, ssl.SSLWantWriteError):817 n = 0818 except ssl.SSLWantReadError:819 n = 0820 self._loop.remove_writer(self._sock_fd)821 self._write_wants_read = True822 except Exception as exc:823 self._loop.remove_writer(self._sock_fd)824 self._buffer.clear()825 self._fatal_error(exc, 'Fatal write error on SSL transport')826 return827 if n:828 del self._buffer[:n]829 self._maybe_resume_protocol() # May append to buffer.830 if not self._buffer:831 self._loop.remove_writer(self._sock_fd)832 if self._closing:833 self._call_connection_lost(None)834 def write(self, data):835 if not isinstance(data, (bytes, bytearray, memoryview)):836 raise TypeError('data argument must be byte-ish (%r)',837 type(data))838 if not data:839 return840 if self._conn_lost:841 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:842 logger.warning('socket.send() raised exception.')843 self._conn_lost += 1844 return845 if not self._buffer:846 self._loop.add_writer(self._sock_fd, self._write_ready)847 # Add it to the buffer.848 self._buffer.extend(data)849 self._maybe_pause_protocol()850 def can_write_eof(self):851 return False852class _SelectorDatagramTransport(_SelectorTransport):853 _buffer_factory = collections.deque854 def __init__(self, loop, sock, protocol, address=None,855 waiter=None, extra=None):856 super().__init__(loop, sock, protocol, extra)857 self._address = address858 self._loop.call_soon(self._protocol.connection_made, self)859 # only start reading when connection_made() has been called860 self._loop.call_soon(self._loop.add_reader,861 self._sock_fd, self._read_ready)862 if waiter is not None:863 # only wake up the waiter when connection_made() has been called864 self._loop.call_soon(waiter._set_result_unless_cancelled, None)865 def get_write_buffer_size(self):866 return sum(len(data) for data, _ in self._buffer)867 def _read_ready(self):868 try:869 data, addr = self._sock.recvfrom(self.max_size)870 except (BlockingIOError, InterruptedError):871 pass872 except OSError as exc:873 self._protocol.error_received(exc)874 except Exception as exc:875 self._fatal_error(exc, 'Fatal read error on datagram transport')876 else:877 self._protocol.datagram_received(data, addr)878 def sendto(self, data, addr=None):879 if not isinstance(data, (bytes, bytearray, memoryview)):880 raise TypeError('data argument must be byte-ish (%r)',881 type(data))882 if not data:883 return884 if self._address and addr not in (None, self._address):885 raise ValueError('Invalid address: must be None or %s' %886 (self._address,))887 if self._conn_lost and self._address:888 if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:889 logger.warning('socket.send() raised exception.')890 self._conn_lost += 1891 return892 if not self._buffer:893 # Attempt to send it right away first.894 try:895 if self._address:896 self._sock.send(data)897 else:898 self._sock.sendto(data, addr)899 return900 except (BlockingIOError, InterruptedError):901 self._loop.add_writer(self._sock_fd, self._sendto_ready)902 except OSError as exc:903 self._protocol.error_received(exc)904 return905 except Exception as exc:906 self._fatal_error(exc,907 'Fatal write error on datagram transport')908 return909 # Ensure that what we buffer is immutable.910 self._buffer.append((bytes(data), addr))911 self._maybe_pause_protocol()912 def _sendto_ready(self):913 while self._buffer:914 data, addr = self._buffer.popleft()915 try:916 if self._address:917 self._sock.send(data)918 else:919 self._sock.sendto(data, addr)920 except (BlockingIOError, InterruptedError):921 self._buffer.appendleft((data, addr)) # Try again later.922 break923 except OSError as exc:924 self._protocol.error_received(exc)925 return926 except Exception as exc:927 self._fatal_error(exc,928 'Fatal write error on datagram transport')929 return930 self._maybe_resume_protocol() # May append to buffer.931 if not self._buffer:932 self._loop.remove_writer(self._sock_fd)933 if self._closing:...
test_rerun.py
Source:test_rerun.py
1# -*- coding: utf-8 -*-2from bamboo_engine.builder import * # noqa3from bamboo_engine.engine import Engine4from pipeline.eri.runtime import BambooDjangoRuntime5from ..utils import * # noqa6def test_single_node_rerun():7 start = EmptyStartEvent()8 act_1 = ServiceActivity(component_code="debug_node")9 act_2 = ServiceActivity(component_code="loop_count_node")10 eg = ExclusiveGateway(conditions={0: "${a_i} < ${c}", 1: "${a_i} >= ${c}"})11 end = EmptyEndEvent()12 act_2.component.inputs.input_a = Var(type=Var.SPLICE, value="${input_a}")13 start.extend(act_1).extend(act_2).extend(eg).connect(act_1, end)14 pipeline_data = Data()15 pipeline_data.inputs["${a_i}"] = NodeOutput(type=Var.SPLICE, source_act=act_2.id, source_key="_loop", value="")16 pipeline_data.inputs["${input_a}"] = Var(type=Var.SPLICE, value='${l.split(",")[a_i]}')17 pipeline_data.inputs["${l}"] = Var(type=Var.PLAIN, value="a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t")18 pipeline_data.inputs["${c}"] = Var(type=Var.PLAIN, value="4")19 pipeline = build_tree(start, data=pipeline_data)20 runtime = BambooDjangoRuntime()21 engine = Engine(runtime)22 engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)23 assert_all_finish([pipeline["id"]])24 state = runtime.get_state(act_1.id)25 assert state.name == states.FINISHED26 assert state.loop == 427 state = runtime.get_state(eg.id)28 assert state.name == states.FINISHED29 assert state.loop == 430 state = runtime.get_state(act_2.id)31 assert state.name == states.FINISHED32 assert state.loop == 433 assert_exec_data_equal(34 {35 act_1.id: {36 "inputs": {"_loop": 4, "_inner_loop": 4},37 "outputs": {"_loop": 4, "_inner_loop": 4, "_result": True},38 },39 act_2.id: {40 "inputs": {"_loop": 4, "_inner_loop": 4, "input_a": "e"},41 "outputs": {"_loop": 4, "_inner_loop": 4, "loop": 4, "input_a": "e", "_result": True},42 },43 }44 )45 histories = runtime.get_histories(act_1.id)46 assert len(histories) == 347 assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1}48 assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "_result": True}49 assert histories[0].loop == 150 assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2}51 assert histories[1].outputs == {"_loop": 2, "_inner_loop": 2, "_result": True}52 assert histories[1].loop == 253 assert histories[2].inputs == {"_loop": 3, "_inner_loop": 3}54 assert histories[2].outputs == {"_loop": 3, "_inner_loop": 3, "_result": True}55 assert histories[2].loop == 356 histories = runtime.get_histories(act_2.id)57 assert len(histories) == 358 assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b"}59 assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "loop": 1, "input_a": "b", "_result": True}60 assert histories[0].loop == 161 assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2, "input_a": "c"}62 assert histories[1].outputs == {"_loop": 2, "_inner_loop": 2, "loop": 2, "input_a": "c", "_result": True}63 assert histories[1].loop == 264 assert histories[2].inputs == {"_loop": 3, "_inner_loop": 3, "input_a": "d"}65 assert histories[2].outputs == {"_loop": 3, "_inner_loop": 3, "loop": 3, "input_a": "d", "_result": True}66 assert histories[2].loop == 367def test_subprocess_rerun():68 start_sub = EmptyStartEvent()69 act_1_sub = ServiceActivity(component_code="debug_node")70 end_sub = EmptyEndEvent()71 act_1_sub.component.inputs.input_a = Var(type=Var.SPLICE, value="${input_a}")72 start_sub.extend(act_1_sub).extend(end_sub)73 start = EmptyStartEvent()74 act_1 = ServiceActivity(component_code="debug_node")75 act_2 = SubProcess(76 start=start_sub,77 data={78 "inputs": {79 "${input_a}": {"type": "splice", "value": '${l.split(",")[a_i]}'},80 "${a_i}": {"type": "plain", "value": "", "is_param": True},81 "${l}": {"type": "plain", "value": "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t"},82 "${output_a}": {"type": "splice", "source_act": act_1_sub.id, "source_key": "input_a"},83 },84 "outputs": ["${output_a}"],85 },86 params={"${a_i}": {"type": "splice", "value": "${s_i}"}},87 )88 eg = ExclusiveGateway(conditions={0: "${s_i} < 4", 1: "${s_i} >= 4"})89 end = EmptyEndEvent()90 start.extend(act_1).extend(act_2).extend(eg).connect(act_2, end)91 pipeline_data = Data()92 pipeline_data.inputs["${s_i}"] = NodeOutput(type=Var.SPLICE, source_act=act_2.id, source_key="_loop", value="")93 pipeline = build_tree(start, data=pipeline_data)94 runtime = BambooDjangoRuntime()95 engine = Engine(runtime)96 engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)97 assert_all_finish([pipeline["id"]])98 state = runtime.get_state(start_sub.id)99 assert state.name == states.FINISHED100 assert state.loop == 4101 state = runtime.get_state(act_1_sub.id)102 assert state.name == states.FINISHED103 assert state.loop == 4104 state = runtime.get_state(end_sub.id)105 assert state.name == states.FINISHED106 assert state.loop == 4107 state = runtime.get_state(end_sub.id)108 assert state.name == states.FINISHED109 assert state.loop == 4110 state = runtime.get_state(act_2.id)111 assert state.name == states.FINISHED112 assert state.loop == 4113 state = runtime.get_state(eg.id)114 assert state.name == states.FINISHED115 assert state.loop == 4116 assert_exec_data_equal(117 {118 act_1_sub.id: {119 "inputs": {"_loop": 4, "_inner_loop": 1, "input_a": "e"},120 "outputs": {"_loop": 4, "_inner_loop": 1, "input_a": "e", "_result": True},121 },122 act_1.id: {123 "inputs": {"_loop": 1, "_inner_loop": 1},124 "outputs": {"_loop": 1, "_inner_loop": 1, "_result": True},125 },126 act_2.id: {"inputs": {"${a_i}": 4}, "outputs": {"${output_a}": "e", "_loop": 4, "_inner_loop": 4}},127 }128 )129 histories = runtime.get_histories(act_1_sub.id)130 assert len(histories) == 3131 assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b"}132 assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b", "_result": True}133 assert histories[0].loop == 1134 assert histories[1].inputs == {"_loop": 2, "_inner_loop": 1, "input_a": "c"}135 assert histories[1].outputs == {"_loop": 2, "_inner_loop": 1, "input_a": "c", "_result": True}136 assert histories[1].loop == 2137 assert histories[2].inputs == {"_loop": 3, "_inner_loop": 1, "input_a": "d"}138 assert histories[2].outputs == {"_loop": 3, "_inner_loop": 1, "input_a": "d", "_result": True}139 assert histories[2].loop == 3140 histories = runtime.get_histories(act_2.id)141 assert len(histories) == 3142 assert histories[0].inputs == {"${a_i}": 1}143 assert histories[0].outputs == {"${output_a}": "b", "_loop": 1, "_inner_loop": 1}144 assert histories[0].loop == 1145 assert histories[1].inputs == {"${a_i}": 2}146 assert histories[1].outputs == {"${output_a}": "c", "_loop": 2, "_inner_loop": 2}147 assert histories[1].loop == 2148 assert histories[2].inputs == {"${a_i}": 3}149 assert histories[2].outputs == {"${output_a}": "d", "_loop": 3, "_inner_loop": 3}150 assert histories[2].loop == 3151def test_parallel_gateway_rerun():152 start = EmptyStartEvent()153 act_1 = ServiceActivity(component_code="debug_node")154 pg = ParallelGateway()155 act_2 = ServiceActivity(component_code="loop_count_node")156 act_3 = ServiceActivity(component_code="loop_count_node")157 act_4 = ServiceActivity(component_code="loop_count_s_node")158 cg = ConvergeGateway()159 eg = ExclusiveGateway(160 conditions={161 0: "${a_i} < ${c} and ${b_i} < ${c} and ${c_i} < ${c} and ${d} < ${c}",162 1: "${a_i} >= ${c} and ${b_i} >= ${c} and ${c_i} >= ${c} and ${d} >= ${c}",163 }164 )165 end = EmptyEndEvent()166 act_2.component.inputs.input_a = Var(type=Var.SPLICE, value="${input_a}")167 act_3.component.inputs.input_a = Var(type=Var.SPLICE, value="${input_b}")168 act_4.component.inputs.input_a = Var(type=Var.SPLICE, value="${input_c}")169 start.extend(act_1).extend(pg).connect(act_2, act_3, act_4).to(pg).converge(cg).extend(eg).connect(act_1, end)170 pipeline = build_tree(171 start,172 data={173 "inputs": {174 "${a_i}": {"source_act": act_2.id, "source_key": "_loop", "type": "splice", "value": ""},175 "${b_i}": {"source_act": act_3.id, "source_key": "_loop", "type": "splice", "value": ""},176 "${c_i}": {"source_act": act_4.id, "source_key": "_loop", "type": "splice", "value": ""},177 "${input_a}": {"type": "splice", "value": '${l.split(",")[a_i]}'},178 "${input_b}": {"type": "splice", "value": '${l.split(",")[b_i]}'},179 "${input_c}": {"type": "splice", "value": '${l.split(",")[c_i]}'},180 "${d}": {"type": "splice", "value": "${c_i}"},181 "${l}": {"type": "plain", "value": "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t"},182 "${c}": {"type": "plain", "value": "3"},183 },184 "outputs": [],185 },186 )187 runtime = BambooDjangoRuntime()188 engine = Engine(runtime)189 engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)190 assert_all_finish([pipeline["id"]])191 state = runtime.get_state(act_1.id)192 assert state.name == states.FINISHED193 assert state.loop == 3194 state = runtime.get_state(pg.id)195 assert state.name == states.FINISHED196 assert state.loop == 3197 state = runtime.get_state(act_2.id)198 assert state.name == states.FINISHED199 assert state.loop == 3200 state = runtime.get_state(act_3.id)201 assert state.name == states.FINISHED202 assert state.loop == 3203 state = runtime.get_state(act_4.id)204 assert state.name == states.FINISHED205 assert state.loop == 3206 state = runtime.get_state(eg.id)207 assert state.name == states.FINISHED208 assert state.loop == 3209 assert_exec_data_equal(210 {211 act_1.id: {212 "inputs": {"_loop": 3, "_inner_loop": 3},213 "outputs": {"_loop": 3, "_inner_loop": 3, "_result": True},214 },215 act_2.id: {216 "inputs": {"_loop": 3, "_inner_loop": 3, "input_a": "d"},217 "outputs": {"_loop": 3, "_inner_loop": 3, "loop": 3, "input_a": "d", "_result": True},218 },219 act_3.id: {220 "inputs": {"_loop": 3, "_inner_loop": 3, "input_a": "d"},221 "outputs": {"_loop": 3, "_inner_loop": 3, "loop": 3, "input_a": "d", "_result": True},222 },223 act_4.id: {224 "inputs": {"_loop": 3, "_inner_loop": 3, "input_a": "d"},225 "outputs": {"count": 2, "_loop": 3, "_inner_loop": 3, "loop": 3, "input_a": "d", "_result": True},226 },227 }228 )229 histories = runtime.get_histories(act_1.id)230 assert len(histories) == 2231 assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1}232 assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "_result": True}233 assert histories[0].loop == 1234 assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2}235 assert histories[1].outputs == {"_loop": 2, "_inner_loop": 2, "_result": True}236 assert histories[1].loop == 2237 histories = runtime.get_histories(act_2.id)238 assert len(histories) == 2239 assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b"}240 assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "loop": 1, "input_a": "b", "_result": True}241 assert histories[0].loop == 1242 assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2, "input_a": "c"}243 assert histories[1].outputs == {"_loop": 2, "_inner_loop": 2, "loop": 2, "input_a": "c", "_result": True}244 assert histories[1].loop == 2245 histories = runtime.get_histories(act_3.id)246 assert len(histories) == 2247 assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b"}248 assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "loop": 1, "input_a": "b", "_result": True}249 assert histories[0].loop == 1250 assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2, "input_a": "c"}251 assert histories[1].outputs == {"_loop": 2, "_inner_loop": 2, "loop": 2, "input_a": "c", "_result": True}252 assert histories[1].loop == 2253 histories = runtime.get_histories(act_4.id)254 assert len(histories) == 2255 assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "input_a": "b"}256 assert histories[0].outputs == {257 "count": 2,258 "_loop": 1,259 "_inner_loop": 1,260 "loop": 1,261 "input_a": "b",262 "_result": True,263 }264 assert histories[0].loop == 1265 assert histories[1].inputs == {"_loop": 2, "_inner_loop": 2, "input_a": "c"}266 assert histories[1].outputs == {267 "count": 2,268 "_loop": 2,269 "_inner_loop": 2,270 "loop": 2,271 "input_a": "c",272 "_result": True,273 }274 assert histories[1].loop == 2275def test_rerun_in_branch():276 start = EmptyStartEvent()277 act_1 = ServiceActivity(component_code="debug_node")278 pg = ParallelGateway()279 # branch 1280 act_2 = ServiceActivity(component_code="loop_count_node")281 eg_1 = ExclusiveGateway(conditions={0: "${l_2} < 2", 1: "${l_2} >= 2"})282 # branch 2283 act_3 = ServiceActivity(component_code="loop_count_node")284 act_4 = ServiceActivity(component_code="loop_count_node")285 eg_2 = ExclusiveGateway(conditions={0: "${l_3} < 2", 1: "${l_3} >= 2"})286 # branch 3287 act_5 = ServiceActivity(component_code="loop_count_node")288 cg = ConvergeGateway()289 end = EmptyEndEvent()290 start.extend(act_1).extend(pg).connect(act_2, act_3, act_5)291 act_2.extend(eg_1).connect(act_2, cg)292 act_3.extend(act_4).extend(eg_2).connect(act_3, cg)293 act_5.extend(cg).extend(end)294 pipeline = build_tree(295 start,296 data={297 "inputs": {298 "${l_2}": {"source_act": act_2.id, "source_key": "_loop", "type": "splice", "value": ""},299 "${l_3}": {"source_act": act_3.id, "source_key": "_loop", "type": "splice", "value": ""},300 },301 "outputs": [],302 },303 )304 runtime = BambooDjangoRuntime()305 engine = Engine(runtime)306 engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)307 assert_all_finish([pipeline["id"]])308 state = runtime.get_state(act_2.id)309 assert state.name == states.FINISHED310 assert state.loop == 2311 state = runtime.get_state(act_3.id)312 assert state.name == states.FINISHED313 assert state.loop == 2314 state = runtime.get_state(act_4.id)315 assert state.name == states.FINISHED316 assert state.loop == 2317 state = runtime.get_state(eg_1.id)318 assert state.name == states.FINISHED319 assert state.loop == 2320 state = runtime.get_state(eg_2.id)321 assert state.name == states.FINISHED322 assert state.loop == 2323def test_retry_rerun():324 start = EmptyStartEvent()325 act_1 = ServiceActivity(component_code="fail_at_second_node")326 eg = ExclusiveGateway(conditions={0: "${a_i} < ${c}", 1: "${a_i} >= ${c}"})327 end = EmptyEndEvent()328 act_1.component.inputs.key_1 = Var(type=Var.PLAIN, value="val_1")329 act_1.component.inputs.key_2 = Var(type=Var.PLAIN, value="val_2")330 start.extend(act_1).extend(eg).connect(act_1, end)331 pipeline = build_tree(332 start,333 data={334 "inputs": {335 "${a_i}": {"source_act": act_1.id, "source_key": "_loop", "type": "splice", "value": ""},336 "${c}": {"type": "plain", "value": "4"},337 },338 "outputs": [],339 },340 )341 runtime = BambooDjangoRuntime()342 engine = Engine(runtime)343 engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)344 assert_all_failed([act_1.id])345 engine.retry_node(act_1.id, {})346 assert_all_failed([act_1.id])347 engine.retry_node(act_1.id, {"can_go": True})348 assert_all_finish([pipeline["id"]])349 state = runtime.get_state(act_1.id)350 assert state.name == states.FINISHED351 assert state.loop == 4352 state = runtime.get_state(eg.id)353 assert state.name == states.FINISHED354 assert state.loop == 4355 assert_exec_data_equal(356 {357 act_1.id: {358 "inputs": {"_loop": 4, "_inner_loop": 4, "can_go": True},359 "outputs": {"loop": 4, "_loop": 4, "_inner_loop": 4, "can_go": True, "_result": True},360 }361 }362 )363 histories = runtime.get_histories(act_1.id)364 assert len(histories) == 5365 assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "key_1": "val_1", "key_2": "val_2"}366 assert histories[0].outputs == {"_loop": 1, "_inner_loop": 1, "_result": False}367 assert histories[0].retry == 0368 assert histories[0].loop == 1369 assert histories[1].inputs == {"_loop": 1, "_inner_loop": 1}370 assert histories[1].outputs == {"_loop": 1, "_inner_loop": 1, "_result": False}371 assert histories[1].retry == 1372 assert histories[1].loop == 1373 assert histories[2].inputs == {"_loop": 1, "_inner_loop": 1, "can_go": True}374 assert histories[2].outputs == {"loop": 1, "_loop": 1, "_inner_loop": 1, "can_go": True, "_result": True}375 assert histories[2].retry == 2376 assert histories[2].loop == 1377 assert histories[3].inputs == {"_loop": 2, "_inner_loop": 2, "can_go": True}378 assert histories[3].outputs == {"loop": 2, "_loop": 2, "_inner_loop": 2, "can_go": True, "_result": True}379 assert histories[3].loop == 2380 assert histories[4].inputs == {"_loop": 3, "_inner_loop": 3, "can_go": True}381 assert histories[4].outputs == {"loop": 3, "_loop": 3, "_inner_loop": 3, "can_go": True, "_result": True}382 assert histories[4].loop == 3383def test_skip_rerun():384 start = EmptyStartEvent()385 act_1 = ServiceActivity(component_code="fail_at_second_node")386 eg = ExclusiveGateway(conditions={0: "${a_i} < ${c}", 1: "${a_i} >= ${c}"})387 end = EmptyEndEvent()388 act_1.component.inputs.key_1 = Var(type=Var.PLAIN, value="val_1")389 act_1.component.inputs.key_2 = Var(type=Var.PLAIN, value="val_2")390 start.extend(act_1).extend(eg).connect(act_1, end)391 pipeline = build_tree(392 start,393 data={394 "inputs": {395 "${a_i}": {"source_act": act_1.id, "source_key": "_loop", "type": "splice", "value": ""},396 "${c}": {"type": "plain", "value": "4"},397 },398 "outputs": [],399 },400 )401 runtime = BambooDjangoRuntime()402 engine = Engine(runtime)403 engine.run_pipeline(pipeline=pipeline, root_pipeline_data={}, cycle_tolerate=True)404 assert_all_failed([act_1.id])405 engine.skip_node(act_1.id)406 assert_all_finish([pipeline["id"]])407 state = runtime.get_state(act_1.id)408 assert state.name == states.FINISHED409 assert state.loop == 4410 state = runtime.get_state(eg.id)411 assert state.name == states.FINISHED412 assert state.loop == 4413 assert_exec_data_equal(414 {415 act_1.id: {416 "inputs": {"_loop": 4, "_inner_loop": 4, "key_1": "val_1", "key_2": "val_2"},417 "outputs": {418 "loop": 4,419 "_loop": 4,420 "_inner_loop": 4,421 "key_1": "val_1",422 "key_2": "val_2",423 "_result": True,424 },425 }426 }427 )428 histories = runtime.get_histories(act_1.id)429 assert len(histories) == 4430 assert histories[0].inputs == {"_loop": 1, "_inner_loop": 1, "key_1": "val_1", "key_2": "val_2"}431 assert histories[0].skip is False432 assert histories[0].outputs == {"_result": False, "_inner_loop": 1, "_loop": 1}433 assert histories[0].loop == 1434 assert histories[1].inputs == {"_loop": 1, "_inner_loop": 1, "key_1": "val_1", "key_2": "val_2"}435 assert histories[1].skip is True436 assert histories[1].outputs == {"_result": False, "_inner_loop": 1, "_loop": 1}437 assert histories[1].loop == 1438 assert histories[2].inputs == {"_loop": 2, "_inner_loop": 2, "key_1": "val_1", "key_2": "val_2"}439 assert histories[2].outputs == {440 "loop": 2,441 "_loop": 2,442 "_inner_loop": 2,443 "key_1": "val_1",444 "key_2": "val_2",445 "_result": True,446 }447 assert histories[2].skip is False448 assert histories[2].loop == 2449 assert histories[3].inputs == {"_loop": 3, "_inner_loop": 3, "key_1": "val_1", "key_2": "val_2"}450 assert histories[3].outputs == {451 "loop": 3,452 "_loop": 3,453 "_inner_loop": 3,454 "key_1": "val_1",455 "key_2": "val_2",456 "_result": True,457 }458 assert histories[3].skip is False...
test.py
Source:test.py
...7HOST = "192.168.80.43"8PORT = 809class TestLogin(aiounittest.AsyncTestCase):10 def setUp(self):11 self._loop = asyncio.new_event_loop()12 self.addCleanup(self._loop.close)13 self._user = USER14 self._password = PASSWORD15 self._host = HOST16 self._port = PORT17 def tearDown(self):18 self._loop.close()19 def test_succes(self):20 host = Host(21 host = self._host,22 port = self._port,23 username = self._user,24 password = self._password,25 )26 assert self._loop.run_until_complete(host.login())27 assert host.session_active28 self._loop.run_until_complete(host.logout())29 def test_wrong_password(self):30 host = Host(31 host = self._host,32 port = self._port,33 username = self._user,34 password = "wrongpass"35 )36 assert not self._loop.run_until_complete(host.login())37 assert not host.session_active38 assert not self._loop.run_until_complete(host.get_host_data())39 assert not self._loop.run_until_complete(host.get_states())40 assert not self._loop.run_until_complete(host.get_motion_state(0))41 assert not self._loop.run_until_complete(host.get_stream_source(0))42 assert not self._loop.run_until_complete(host.set_ftp(0, False))43 def test_wrong_user(self):44 host = Host(45 host=self._host,46 port=self._port,47 username="wronguser",48 password=self._password,49 )50 assert not self._loop.run_until_complete(host.login())51 assert not host.session_active52 def test_wrong_host(self):53 host = Host(54 host="192.168.1.0",55 port=self._port,56 username=self._user,57 password=self._password,58 )59 assert not self._loop.run_until_complete(host.login())60 assert not host.session_active61#endof class TestLogin62class TestGetData(aiounittest.AsyncTestCase):63 def setUp(self):64 self._loop = asyncio.new_event_loop()65 self.addCleanup(self._loop.close)66 self._user = USER67 self._password = PASSWORD68 self._host = HOST69 self._port = PORT70 self._host_device = Host(71 host = self._host,72 port = self._port,73 username = self._user,74 password = self._password,75 )76 assert self._loop.run_until_complete(self._host_device.login())77 assert self._host_device.session_active78 def test1_settings(self):79 assert self._loop.run_until_complete(self._host_device.get_host_data())80 self._host_device.is_admin81 assert self._host_device.host is not None82 assert self._host_device.port is not None83 assert self._host_device.channels is not None84 assert self._host_device.onvif_port is not None85 assert self._host_device.mac_address is not None86 assert self._host_device.serial is not None87 assert self._host_device.nvr_name is not None88 assert self._host_device.sw_version is not None89 assert self._host_device.model is not None90 assert self._host_device.manufacturer is not None91 assert self._host_device.rtmp_port is not None92 assert self._host_device.rtsp_port is not None93 assert self._host_device.stream is not None94 assert self._host_device.protocol is not None95 assert self._host_device.hdd_info is not None96 assert self._host_device.ptz_supported is not None97 self._host_device._users.append({"level": "guest", "userName": "guest"})98 self._host_device._username = "guest"99 assert not self._host_device.is_admin100 def test2_states(self):101 assert self._loop.run_until_complete(self._host_device.get_states())102 assert self._loop.run_until_complete(self._host_device.get_motion_state(0)) is not None103 self._host_device._ptz_support[0] = True104 self._host_device._ptz_presets[0]["test"] = 123105 assert (106 self._loop.run_until_complete(self._host_device.get_switchable_capabilities(0))107 is not None108 )109 def test3_images(self):110 assert self._loop.run_until_complete(self._host_device.get_snapshot(0)) is not None111 assert self._loop.run_until_complete(self._host_device.get_snapshot(100)) is None112 assert self._loop.run_until_complete(self._host_device.get_snapshot(0)) is not None113 assert self._loop.run_until_complete(self._host_device.get_stream_source(0)) is not None114 def test4_properties(self):115 assert self._loop.run_until_complete(self._host_device.get_states())116 assert self._host_device.motion_detection_state(0) is not None117 assert self._host_device.is_ia_enabled(0) is not None118 assert self._host_device.ftp_enabled(0) is not None119 assert self._host_device.email_enabled(0) is not None120 assert self._host_device.ir_enabled(0) is not None121 assert self._host_device.whiteled_enabled(0) is not None122 assert self._host_device.daynight_state(0) is not None123 assert self._host_device.recording_enabled(0) is not None124 assert self._host_device.audio_alarm_enabled(0) is not None125 assert self._host_device.ptz_presets(0) == {} # Cam has no ptz126 assert self._host_device.sensititivy_presets(0) is not None127 get_ptz_response = [128 {129 "cmd": "GetPtzPreset",130 "code": 0,131 "value": {132 "PtzPreset": [133 {"enable": 0, "name": "Preset_1", "id": 0},134 {"enable": 1, "name": "Preset_2", "id": 1},135 ]136 },137 }138 ]139 self._host_device.map_channel_json_response(get_ptz_response, 0)140 assert self._host_device._ptz_presets[0] is not None141 assert self._host_device._ptz_presets_settings[0] is not None142 assert not self._loop.run_until_complete(143 self._host_device.send_setting([{"cmd": "wrong_command"}])144 )145 for _ in range(1):146 """FTP state."""147 assert self._loop.run_until_complete(self._host_device.set_ftp(0, True))148 assert self._host_device.ftp_enabled(0)149 assert self._loop.run_until_complete(self._host_device.set_ftp(0, False))150 assert not self._host_device.ftp_enabled(0)151 """Email state."""152 assert self._loop.run_until_complete(self._host_device.set_email(0, True))153 assert self._host_device.email_enabled(0)154 assert self._loop.run_until_complete(self._host_device.set_email(0, False))155 assert not self._host_device.email_enabled(0)156 """Audio state."""157 assert self._loop.run_until_complete(self._host_device.set_audio(0, True))158 assert self._host_device.audio_alarm_enabled(0)159 assert self._loop.run_until_complete(self._host_device.set_audio(0, False))160 assert not self._host_device.audio_alarm_enabled(0)161 """ir state."""162 assert self._loop.run_until_complete(self._host_device.set_ir_lights(0, True))163 assert self._host_device.ir_enabled(0)164 assert self._loop.run_until_complete(self._host_device.set_ir_lights(0, False))165 assert not self._host_device.ir_enabled(0)166 """Daynight state."""167 assert self._loop.run_until_complete(self._host_device.set_daynight(0, "Auto"))168 assert self._host_device.daynight_state(0)169 assert self._loop.run_until_complete(self._host_device.set_daynight(0, "Color"))170 assert not self._host_device.daynight_state(0)171 """Recording state."""172 assert self._loop.run_until_complete(self._host_device.set_recording(0, True))173 assert self._host_device.recording_enabled(0)174 assert self._loop.run_until_complete(self._host_device.set_recording(0, False))175 assert not self._host_device.recording_enabled(0)176 """Motion detection state."""177 assert self._loop.run_until_complete(self._host_device.set_motion_detection(0, True))178 assert self._host_device.motion_detection_state(0) is not None # Ignore state179 assert self._loop.run_until_complete(self._host_device.set_motion_detection(0, False))180 assert self._loop.run_until_complete(self._host_device.get_states())181 assert (182 self._loop.run_until_complete(self._host_device.get_stream_source(0)) is not None183 )184 assert (185 self._loop.run_until_complete(186 self._host_device.set_ptz_command(0, "RIGHT", speed=10)187 )188 == False189 )190 assert (191 self._loop.run_until_complete(192 self._host_device.set_ptz_command(0, "GOTO", preset=1)193 )194 == False195 )196 assert (197 self._loop.run_until_complete(self._host_device.set_ptz_command(0, "STOP"))198 == False199 )200 assert self._loop.run_until_complete(self._host_device.set_sensitivity(0, value=10))201 assert self._loop.run_until_complete(202 self._host_device.set_sensitivity(0, value=45, preset=0)203 )204 """ White Led State (Spotlight ) """205 """ required tests """206 """ turn off , night mode off """207 """ turn on, night mode off """208 """ turn off, , night mode on """209 """ turn on, night mode on , auto mode """210 """ turn off, night mode on, scheduled """211 """ turn on, night mode on, scheduled mode """212 """ Turn on, NM on, auto Bright = 0 """213 """ Turn on, NM on, auto Bright = 100 """214 """ incorrect mode not 0,1,3 """215 """ incorrect brightness < 0 """216 """ incorrect brightness > 100 """217 assert self._loop.run_until_complete(self._host_device.set_whiteled(0, False,50,0))218 assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,50,0))219 assert self._loop.run_until_complete(self._host_device.set_whiteled(0, False,50,1))220 assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,50,1))221 assert self._loop.run_until_complete(self._host_device.set_whiteled(0, False,50,3))222 assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,50,3))223 """ so that effect can be seen on spotlight wait 2 seconds between changes """224 time.sleep(2)225 assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,0,1))226 time.sleep(2)227 assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,100,1))228 assert self._loop.run_until_complete(self._host_device.set_whiteled(0, True,100,2))229 time.sleep(2)230 """ now turn off light - does not require an assert """231 self.loop.run_until_complete(self._host_device.set_whiteled(0, False,50,0))232 """ with incorrect values the routine should return a False """233 assert not self._loop.run_until_complete(self._host_device.set_whiteled(0, True,-10,1))234 assert not self._loop.run_until_complete(self._host_device.set_whiteled(0, True,1000,1))235 """ now tests for setting the schedule for spotlight when night mode non auto"""236 assert self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 5, 30, 17, 30))237 assert self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 7, 30, 19, 30))238 # invalid parameters239 assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, -1, 0, 18, 0))240 assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 24, 0, 18, 0))241 assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, -2, 18, 0))242 assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, 60, 18, 0))243 assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, 0, -3, 0))244 assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, 0, 24, 0))245 assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, 0, 18, -4))246 assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 18, 59, 19, 0))247 assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 18, 29, 18, 30))248 # query should end time equals start time be an error249 assert not self._loop.run_until_complete(self._host_device.set_spotlight_lighting_schedule(0, 6, 0, 6, 0))250 #251 # check simplified call252 assert self._loop.run_until_complete(self._host_device.set_spotlight(0, True))253 assert self._loop.run_until_complete(self._host_device.set_spotlight(0, False))254 # test of siren255 assert self._loop.run_until_complete(self._host_device.set_siren(0, True))256 assert self._loop.run_until_complete(self._host_device.set_siren(0, False))257 def tearDown(self):258 self._loop.run_until_complete(self._host_device.logout())259 self._loop.close()260#endof class TestGetData261class TestSubscription(aiounittest.AsyncTestCase):262 def setUp(self):263 self._loop = asyncio.new_event_loop()264 self.addCleanup(self._loop.close)265 self._user = USER266 self._password = PASSWORD267 self._host = HOST268 self._port = PORT269 def tearDown(self):270 self._loop.close()271 def test_succes(self):272 host = Host(273 host=self._host,274 port=self._port,275 username=self._user,276 password=self._password,277 )...
receiver.js
Source:receiver.js
1'use strict';2const { Writable } = require('stream');3const PerMessageDeflate = require('./permessage-deflate');4const {5 BINARY_TYPES,6 EMPTY_BUFFER,7 kStatusCode,8 kWebSocket9} = require('./constants');10const { concat, toArrayBuffer, unmask } = require('./buffer-util');11const { isValidStatusCode, isValidUTF8 } = require('./validation');12const GET_INFO = 0;13const GET_PAYLOAD_LENGTH_16 = 1;14const GET_PAYLOAD_LENGTH_64 = 2;15const GET_MASK = 3;16const GET_DATA = 4;17const INFLATING = 5;18/**19 * HyBi Receiver implementation.20 *21 * @extends stream.Writable22 */23class Receiver extends Writable {24 /**25 * Creates a Receiver instance.26 *27 * @param {String} binaryType The type for binary data28 * @param {Object} extensions An object containing the negotiated extensions29 * @param {Number} maxPayload The maximum allowed message length30 */31 constructor(binaryType, extensions, maxPayload) {32 super();33 this._binaryType = binaryType || BINARY_TYPES[0];34 this[kWebSocket] = undefined;35 this._extensions = extensions || {};36 this._maxPayload = maxPayload | 0;37 this._bufferedBytes = 0;38 this._buffers = [];39 this._compressed = false;40 this._payloadLength = 0;41 this._mask = undefined;42 this._fragmented = 0;43 this._masked = false;44 this._fin = false;45 this._opcode = 0;46 this._totalPayloadLength = 0;47 this._messageLength = 0;48 this._fragments = [];49 this._state = GET_INFO;50 this._loop = false;51 }52 /**53 * Implements `Writable.prototype._write()`.54 *55 * @param {Buffer} chunk The chunk of data to write56 * @param {String} encoding The character encoding of `chunk`57 * @param {Function} cb Callback58 */59 _write(chunk, encoding, cb) {60 if (this._opcode === 0x08 && this._state == GET_INFO) return cb();61 this._bufferedBytes += chunk.length;62 this._buffers.push(chunk);63 this.startLoop(cb);64 }65 /**66 * Consumes `n` bytes from the buffered data.67 *68 * @param {Number} n The number of bytes to consume69 * @return {Buffer} The consumed bytes70 * @private71 */72 consume(n) {73 this._bufferedBytes -= n;74 if (n === this._buffers[0].length) return this._buffers.shift();75 if (n < this._buffers[0].length) {76 const buf = this._buffers[0];77 this._buffers[0] = buf.slice(n);78 return buf.slice(0, n);79 }80 const dst = Buffer.allocUnsafe(n);81 do {82 const buf = this._buffers[0];83 if (n >= buf.length) {84 this._buffers.shift().copy(dst, dst.length - n);85 } else {86 buf.copy(dst, dst.length - n, 0, n);87 this._buffers[0] = buf.slice(n);88 }89 n -= buf.length;90 } while (n > 0);91 return dst;92 }93 /**94 * Starts the parsing loop.95 *96 * @param {Function} cb Callback97 * @private98 */99 startLoop(cb) {100 var err;101 this._loop = true;102 do {103 switch (this._state) {104 case GET_INFO:105 err = this.getInfo();106 break;107 case GET_PAYLOAD_LENGTH_16:108 err = this.getPayloadLength16();109 break;110 case GET_PAYLOAD_LENGTH_64:111 err = this.getPayloadLength64();112 break;113 case GET_MASK:114 this.getMask();115 break;116 case GET_DATA:117 err = this.getData(cb);118 break;119 default:120 // `INFLATING`121 this._loop = false;122 return;123 }124 } while (this._loop);125 cb(err);126 }127 /**128 * Reads the first two bytes of a frame.129 *130 * @return {(RangeError|undefined)} A possible error131 * @private132 */133 getInfo() {134 if (this._bufferedBytes < 2) {135 this._loop = false;136 return;137 }138 const buf = this.consume(2);139 if ((buf[0] & 0x30) !== 0x00) {140 this._loop = false;141 return error(RangeError, 'RSV2 and RSV3 must be clear', true, 1002);142 }143 const compressed = (buf[0] & 0x40) === 0x40;144 if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {145 this._loop = false;146 return error(RangeError, 'RSV1 must be clear', true, 1002);147 }148 this._fin = (buf[0] & 0x80) === 0x80;149 this._opcode = buf[0] & 0x0f;150 this._payloadLength = buf[1] & 0x7f;151 if (this._opcode === 0x00) {152 if (compressed) {153 this._loop = false;154 return error(RangeError, 'RSV1 must be clear', true, 1002);155 }156 if (!this._fragmented) {157 this._loop = false;158 return error(RangeError, 'invalid opcode 0', true, 1002);159 }160 this._opcode = this._fragmented;161 } else if (this._opcode === 0x01 || this._opcode === 0x02) {162 if (this._fragmented) {163 this._loop = false;164 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);165 }166 this._compressed = compressed;167 } else if (this._opcode > 0x07 && this._opcode < 0x0b) {168 if (!this._fin) {169 this._loop = false;170 return error(RangeError, 'FIN must be set', true, 1002);171 }172 if (compressed) {173 this._loop = false;174 return error(RangeError, 'RSV1 must be clear', true, 1002);175 }176 if (this._payloadLength > 0x7d) {177 this._loop = false;178 return error(179 RangeError,180 `invalid payload length ${this._payloadLength}`,181 true,182 1002183 );184 }185 } else {186 this._loop = false;187 return error(RangeError, `invalid opcode ${this._opcode}`, true, 1002);188 }189 if (!this._fin && !this._fragmented) this._fragmented = this._opcode;190 this._masked = (buf[1] & 0x80) === 0x80;191 if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;192 else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;193 else return this.haveLength();194 }195 /**196 * Gets extended payload length (7+16).197 *198 * @return {(RangeError|undefined)} A possible error199 * @private200 */201 getPayloadLength16() {202 if (this._bufferedBytes < 2) {203 this._loop = false;204 return;205 }206 this._payloadLength = this.consume(2).readUInt16BE(0);207 return this.haveLength();208 }209 /**210 * Gets extended payload length (7+64).211 *212 * @return {(RangeError|undefined)} A possible error213 * @private214 */215 getPayloadLength64() {216 if (this._bufferedBytes < 8) {217 this._loop = false;218 return;219 }220 const buf = this.consume(8);221 const num = buf.readUInt32BE(0);222 //223 // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned224 // if payload length is greater than this number.225 //226 if (num > Math.pow(2, 53 - 32) - 1) {227 this._loop = false;228 return error(229 RangeError,230 'Unsupported WebSocket frame: payload length > 2^53 - 1',231 false,232 1009233 );234 }235 this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);236 return this.haveLength();237 }238 /**239 * Payload length has been read.240 *241 * @return {(RangeError|undefined)} A possible error242 * @private243 */244 haveLength() {245 if (this._payloadLength && this._opcode < 0x08) {246 this._totalPayloadLength += this._payloadLength;247 if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {248 this._loop = false;249 return error(RangeError, 'Max payload size exceeded', false, 1009);250 }251 }252 if (this._masked) this._state = GET_MASK;253 else this._state = GET_DATA;254 }255 /**256 * Reads mask bytes.257 *258 * @private259 */260 getMask() {261 if (this._bufferedBytes < 4) {262 this._loop = false;263 return;264 }265 this._mask = this.consume(4);266 this._state = GET_DATA;267 }268 /**269 * Reads data bytes.270 *271 * @param {Function} cb Callback272 * @return {(Error|RangeError|undefined)} A possible error273 * @private274 */275 getData(cb) {276 var data = EMPTY_BUFFER;277 if (this._payloadLength) {278 if (this._bufferedBytes < this._payloadLength) {279 this._loop = false;280 return;281 }282 data = this.consume(this._payloadLength);283 if (this._masked) unmask(data, this._mask);284 }285 if (this._opcode > 0x07) return this.controlMessage(data);286 if (this._compressed) {287 this._state = INFLATING;288 this.decompress(data, cb);289 return;290 }291 if (data.length) {292 //293 // This message is not compressed so its lenght is the sum of the payload294 // length of all fragments.295 //296 this._messageLength = this._totalPayloadLength;297 this._fragments.push(data);298 }299 return this.dataMessage();300 }301 /**302 * Decompresses data.303 *304 * @param {Buffer} data Compressed data305 * @param {Function} cb Callback306 * @private307 */308 decompress(data, cb) {309 const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];310 perMessageDeflate.decompress(data, this._fin, (err, buf) => {311 if (err) return cb(err);312 if (buf.length) {313 this._messageLength += buf.length;314 if (this._messageLength > this._maxPayload && this._maxPayload > 0) {315 return cb(316 error(RangeError, 'Max payload size exceeded', false, 1009)317 );318 }319 this._fragments.push(buf);320 }321 const er = this.dataMessage();322 if (er) return cb(er);323 this.startLoop(cb);324 });325 }326 /**327 * Handles a data message.328 *329 * @return {(Error|undefined)} A possible error330 * @private331 */332 dataMessage() {333 if (this._fin) {334 const messageLength = this._messageLength;335 const fragments = this._fragments;336 this._totalPayloadLength = 0;337 this._messageLength = 0;338 this._fragmented = 0;339 this._fragments = [];340 if (this._opcode === 2) {341 var data;342 if (this._binaryType === 'nodebuffer') {343 data = concat(fragments, messageLength);344 } else if (this._binaryType === 'arraybuffer') {345 data = toArrayBuffer(concat(fragments, messageLength));346 } else {347 data = fragments;348 }349 this.emit('message', data);350 } else {351 const buf = concat(fragments, messageLength);352 if (!isValidUTF8(buf)) {353 this._loop = false;354 return error(Error, 'invalid UTF-8 sequence', true, 1007);355 }356 this.emit('message', buf.toString());357 }358 }359 this._state = GET_INFO;360 }361 /**362 * Handles a control message.363 *364 * @param {Buffer} data Data to handle365 * @return {(Error|RangeError|undefined)} A possible error366 * @private367 */368 controlMessage(data) {369 if (this._opcode === 0x08) {370 this._loop = false;371 if (data.length === 0) {372 this.emit('conclude', 1005, '');373 this.end();374 } else if (data.length === 1) {375 return error(RangeError, 'invalid payload length 1', true, 1002);376 } else {377 const code = data.readUInt16BE(0);378 if (!isValidStatusCode(code)) {379 return error(RangeError, `invalid status code ${code}`, true, 1002);380 }381 const buf = data.slice(2);382 if (!isValidUTF8(buf)) {383 return error(Error, 'invalid UTF-8 sequence', true, 1007);384 }385 this.emit('conclude', code, buf.toString());386 this.end();387 }388 } else if (this._opcode === 0x09) {389 this.emit('ping', data);390 } else {391 this.emit('pong', data);392 }393 this._state = GET_INFO;394 }395}396module.exports = Receiver;397/**398 * Builds an error object.399 *400 * @param {(Error|RangeError)} ErrorCtor The error constructor401 * @param {String} message The error message402 * @param {Boolean} prefix Specifies whether or not to add a default prefix to403 * `message`404 * @param {Number} statusCode The status code405 * @return {(Error|RangeError)} The error406 * @private407 */408function error(ErrorCtor, message, prefix, statusCode) {409 const err = new ErrorCtor(410 prefix ? `Invalid WebSocket frame: ${message}` : message411 );412 Error.captureStackTrace(err, error);413 err[kStatusCode] = statusCode;414 return err;...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!