How to use make_log method in Lemoncheesecake

Best Python code snippet using lemoncheesecake

broker.py

Source:broker.py Github

copy

Full Screen

...41EVENT_BROKER_CLIENT_DISCONNECTED = 'broker_client_disconnected'42EVENT_BROKER_CLIENT_SUBSCRIBED = 'broker_client_subscribed'43EVENT_BROKER_CLIENT_UNSUBSCRIBED = 'broker_client_unsubscribed'44EVENT_BROKER_MESSAGE_RECEIVED = 'broker_message_received'45def make_log(string):46 try:47 f=open("BrokerStatus.txt", "a+")48 except IOError:49 f = open("BrokerStatus.txt", "w")50 ts= time.time()51 st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')52 data_raw= "["+st+"] "+ string +'\n'53 f.write(data_raw)54 f.flush()55class BrokerException(BaseException):56 pass57class RetainedApplicationMessage:58 def __init__(self, source_session, topic, data, qos=None):59 self.source_session = source_session60 self.topic = topic61 self.data = data62 self.qos = qos63class Server:64 def __init__(self, listener_name, server_instance, max_connections=-1, loop=None):65 self.logger = logging.getLogger(__name__)66 self.instance = server_instance67 self.conn_count = 068 self.listener_name = listener_name69 if loop is not None:70 self._loop = loop71 else:72 self._loop = asyncio.get_event_loop()73 self.max_connections = max_connections74 if self.max_connections > 0:75 self.semaphore = asyncio.Semaphore(self.max_connections, loop=self._loop)76 else:77 self.semaphore = None78 @asyncio.coroutine79 def acquire_connection(self):80 if self.semaphore:81 yield from self.semaphore.acquire()82 self.conn_count += 183 if self.max_connections > 0:84 make_log("Listener '%s': %d/%d connections acquired" %85 (self.listener_name, self.conn_count, self.max_connections))86 else:87 make_log("Listener '%s': %d connections acquired" %88 (self.listener_name, self.conn_count))89 def release_connection(self):90 if self.semaphore:91 self.semaphore.release()92 self.conn_count -= 193 if self.max_connections > 0:94 make_log("Listener '%s': %d/%d connections acquired" %95 (self.listener_name, self.conn_count, self.max_connections))96 else:97 make_log("Listener '%s': %d connections acquired" %98 (self.listener_name, self.conn_count))99 @asyncio.coroutine100 def close_instance(self):101 if self.instance:102 self.instance.close()103 yield from self.instance.wait_closed()104class BrokerContext(BaseContext):105 """106 BrokerContext is used as the context passed to plugins interacting with the broker.107 It act as an adapter to broker services from plugins developed for HBMQTT broker108 """109 def __init__(self, broker):110 super().__init__()111 self.config = None112 self._broker_instance = broker113 @asyncio.coroutine114 def broadcast_message(self, topic, data, qos=None):115 yield from self._broker_instance.internal_message_broadcast(topic, data, qos)116 def retain_message(self, topic_name, data, qos=None):117 self._broker_instance.retain_message(None, topic_name, data, qos)118 @property119 def sessions(self):120 for k, session in self._broker_instance._sessions.items():121 yield session[0]122 @property123 def retained_messages(self):124 return self._broker_instance._retained_messages125 @property126 def subscriptions(self):127 return self._broker_instance._subscriptions128class Broker:129 """130 MQTT 3.1.1 compliant broker implementation131 :param config: Example Yaml config132 :param loop: asyncio loop to use. Defaults to ``asyncio.get_event_loop()`` if none is given133 :param plugin_namespace: Plugin namespace to use when loading plugin entry_points. Defaults to ``hbmqtt.broker.plugins``134 """135 states = ['new', 'starting', 'started', 'not_started', 'stopping', 'stopped', 'not_stopped', 'stopped']136 def __init__(self, config=None, loop=None, plugin_namespace=None):137 self.logger = logging.getLogger(__name__)138 self.config = _defaults139 if config is not None:140 self.config.update(config)141 self._build_listeners_config(self.config)142 if loop is not None:143 self._loop = loop144 else:145 self._loop = asyncio.get_event_loop()146 self._servers = dict()147 self._init_states()148 self._sessions = dict()149 self._subscriptions = dict()150 self._retained_messages = dict()151 self._broadcast_queue = asyncio.Queue(loop=self._loop)152 self._broadcast_task = None153 self.ClientID = ""154 # Init plugins manager155 context = BrokerContext(self)156 context.config = self.config157 if plugin_namespace:158 namespace = plugin_namespace159 else:160 namespace = 'hbmqtt.broker.plugins'161 self.plugins_manager = PluginManager(namespace, context, self._loop)162 def _build_listeners_config(self, broker_config):163 self.listeners_config = dict()164 try:165 listeners_config = broker_config['listeners']166 defaults = listeners_config['default']167 for listener in listeners_config:168 config = dict(defaults)169 config.update(listeners_config[listener])170 self.listeners_config[listener] = config171 except KeyError as ke:172 raise BrokerException("Listener config not found invalid: %s" % ke)173 def _init_states(self):174 self.transitions = Machine(states=Broker.states, initial='new')175 self.transitions.add_transition(trigger='start', source='new', dest='starting')176 self.transitions.add_transition(trigger='starting_fail', source='starting', dest='not_started')177 self.transitions.add_transition(trigger='starting_success', source='starting', dest='started')178 self.transitions.add_transition(trigger='shutdown', source='started', dest='stopping')179 self.transitions.add_transition(trigger='stopping_success', source='stopping', dest='stopped')180 self.transitions.add_transition(trigger='stopping_failure', source='stopping', dest='not_stopped')181 self.transitions.add_transition(trigger='start', source='stopped', dest='starting')182 @asyncio.coroutine183 def start(self):184 """185 Start the broker to serve with the given configuration186 Start method opens network sockets and will start listening for incoming connections.187 This method is a *coroutine*.188 """189 try:190 self._sessions = dict()191 self._subscriptions = dict()192 self._retained_messages = dict()193 self.transitions.start()194 make_log("Broker starting")195 except MachineError as me:196 make_log("[WARN-0001] Invalid method call at this moment: %s" % me)197 raise BrokerException("Broker instance can't be started: %s" % me)198 yield from self.plugins_manager.fire_event(EVENT_BROKER_PRE_START)199 try:200 # Start network listeners201 for listener_name in self.listeners_config:202 listener = self.listeners_config[listener_name]203 if 'bind' not in listener:204 make_log("Listener configuration '%s' is not bound" % listener_name)205 else:206 # Max connections207 try:208 max_connections = listener['max_connections']209 except KeyError:210 max_connections = -1211 # SSL Context212 sc = None213 # accept string "on" / "off" or boolean214 ssl_active = listener.get('ssl', False)215 if isinstance(ssl_active, str):216 ssl_active = ssl_active.upper() == 'ON'217 if ssl_active:218 try:219 sc = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)220 sc.load_cert_chain(listener['certfile'], listener['keyfile'])221 sc.verify_mode = ssl.CERT_OPTIONAL222 except KeyError as ke:223 raise BrokerException("'certfile' or 'keyfile' configuration parameter missing: %s" % ke)224 except FileNotFoundError as fnfe:225 raise BrokerException("Can't read cert files '%s' or '%s' : %s" %226 (listener['certfile'], listener['keyfile'], fnfe))227 address, s_port = listener['bind'].split(':')228 port = 0229 try:230 port = int(s_port)231 except ValueError as ve:232 raise BrokerException("Invalid port value in bind value: %s" % listener['bind'])233 if listener['type'] == 'tcp':234 cb_partial = partial(self.stream_connected, listener_name=listener_name)235 instance = yield from asyncio.start_server(cb_partial,236 address,237 port,238 ssl=sc,239 loop=self._loop)240 self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)241 elif listener['type'] == 'ws':242 cb_partial = partial(self.ws_connected, listener_name=listener_name)243 instance = yield from websockets.serve(cb_partial, address, port, ssl=sc, loop=self._loop,244 subprotocols=['mqtt'])245 self._servers[listener_name] = Server(listener_name, instance, max_connections, self._loop)246 make_log("Listener '%s' bind to %s (max_connections=%d)" %247 (listener_name, listener['bind'], max_connections))248 self.transitions.starting_success()249 yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_START)250 #Start broadcast loop251 self._broadcast_task = ensure_future(self._broadcast_loop(), loop=self._loop)252 make_log("Broker started")253 except Exception as e:254 make_log("Broker startup failed: %s" % e)255 self.transitions.starting_fail()256 raise BrokerException("Broker instance can't be started: %s" % e)257 @asyncio.coroutine258 def shutdown(self):259 """260 Stop broker instance.261 Closes all connected session, stop listening on network socket and free resources.262 """263 try:264 self._sessions = dict()265 self._subscriptions = dict()266 self._retained_messages = dict()267 self.transitions.shutdown()268 except MachineError as me:269 make_log("Invalid method call at this moment: %s" % me)270 raise BrokerException("Broker instance can't be stopped: %s" % me)271 # Fire broker_shutdown event to plugins272 yield from self.plugins_manager.fire_event(EVENT_BROKER_PRE_SHUTDOWN)273 # Stop broadcast loop274 if self._broadcast_task:275 self._broadcast_task.cancel()276 if self._broadcast_queue.qsize() > 0:277 make_log("%d messages not broadcasted" % self._broadcast_queue.qsize())278 for listener_name in self._servers:279 server = self._servers[listener_name]280 yield from server.close_instance()281 make_log("Broker closing")282 make_log("Broker closed")283 yield from self.plugins_manager.fire_event(EVENT_BROKER_POST_SHUTDOWN)284 self.transitions.stopping_success()285 @asyncio.coroutine286 def internal_message_broadcast(self, topic, data, qos=None):287 return (yield from self._broadcast_message(None, topic, data))288 @asyncio.coroutine289 def ws_connected(self, websocket, uri, listener_name):290 yield from self.client_connected(listener_name, WebSocketsReader(websocket), WebSocketsWriter(websocket))291 @asyncio.coroutine292 def stream_connected(self, reader, writer, listener_name):293 yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))294 @asyncio.coroutine295 def client_connected(self, listener_name, reader: ReaderAdapter, writer: WriterAdapter):296 # Wait for connection available on listener297 server = self._servers.get(listener_name, None)298 if not server:299 raise BrokerException("Invalid listener name '%s'" % listener_name)300 yield from server.acquire_connection()301 remote_address, remote_port = writer.get_peer_info()302 make_log("Connection from %s:%d on listener '%s'" % (remote_address, remote_port, listener_name))303 # Wait for first packet and expect a CONNECT304 try:305 handler, client_session = yield from BrokerProtocolHandler.init_from_connect(reader, writer, self.plugins_manager, loop=self._loop)306 except HBMQTTException as exc:307 make_log("[MQTT-3.1.0-1] %s: Can't read first packet an CONNECT: %s" %308 (format_client_message(address=remote_address, port=remote_port), exc))309 #yield from writer.close()310 make_log("Connection closed")311 return312 except MQTTException as me:313 make_log('Invalid connection from %s : %s' %314 (format_client_message(address=remote_address, port=remote_port), me))315 yield from writer.close()316 make_log("Connection closed")317 return318 ############################### Checking Read & Write Permissions ##############################319 #self.ClientID = client_session.client_id320 #if self.ClientID[0]=="P":321 #with open("mqtt/write.txt", "r") as f:322 #arr = f.read().splitlines()323 #if self.ClientID[2:] not in arr:324 #return325 #if self.ClientID[0]=="S":326 #with open("mqtt/read.txt", "r") as f:327 #arr = f.read().splitlines()328 #if self.ClientID[2:] not in arr:329 #return330 ################################################################################################331 if client_session.clean_session:332 # Delete existing session and create a new one333 if client_session.client_id is not None:334 self.delete_session(client_session.client_id)335 else:336 client_session.client_id = gen_client_id()337 client_session.parent = 0338 else:339 # Get session from cache340 if client_session.client_id in self._sessions:341 make_log("Found old session %s" % repr(self._sessions[client_session.client_id]))342 (client_session,h) = self._sessions[client_session.client_id]343 client_session.parent = 1344 else:345 client_session.parent = 0346 if client_session.keep_alive > 0:347 client_session.keep_alive += self.config['timeout-disconnect-delay']348 make_log("Keep-alive timeout=%d" % client_session.keep_alive)349 handler.attach(client_session, reader, writer)350 self._sessions[client_session.client_id] = (client_session, handler)351 authenticated = yield from self.authenticate(client_session, self.listeners_config[listener_name])352 if not authenticated:353 yield from writer.close()354 return355 while True:356 try:357 client_session.transitions.connect()358 break359 except MachineError:360 make_log("Client %s is reconnecting too quickly, make it wait" % client_session.client_id)361 # Wait a bit may be client is reconnecting too fast362 yield from asyncio.sleep(1, loop=self._loop)363 yield from handler.mqtt_connack_authorize(authenticated)364 yield from self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_CONNECTED, client_id=client_session.client_id)365 make_log("%s Start messages handling" % client_session.client_id)366 yield from handler.start()367 make_log("Retained messages queue size: %d" % client_session.retained_messages.qsize())368 yield from self.publish_session_retained_messages(client_session)369 # Init and start loop for handling client messages (publish, subscribe/unsubscribe, disconnect)370 disconnect_waiter = ensure_future(handler.wait_disconnect(), loop=self._loop)371 subscribe_waiter = ensure_future(handler.get_next_pending_subscription(), loop=self._loop)372 unsubscribe_waiter = ensure_future(handler.get_next_pending_unsubscription(), loop=self._loop)373 wait_deliver = ensure_future(handler.mqtt_deliver_next_message(), loop=self._loop)374 connected = True375 while connected:376 try:377 done, pending = yield from asyncio.wait(378 [disconnect_waiter, subscribe_waiter, unsubscribe_waiter, wait_deliver],379 return_when=asyncio.FIRST_COMPLETED, loop=self._loop)380 if disconnect_waiter in done:381 result = disconnect_waiter.result()382 make_log("%s Result from wait_diconnect: %s" % (client_session.client_id, result))383 if result is None:384 make_log("Will flag: %s" % client_session.will_flag)385 # Connection closed anormally, send will message386 if client_session.will_flag:387 make_log("Client %s disconnected abnormally, sending will message" %388 format_client_message(client_session))389 yield from self._broadcast_message(390 client_session,391 client_session.will_topic,392 client_session.will_message,393 client_session.will_qos)394 if client_session.will_retain:395 self.retain_message(client_session,396 client_session.will_topic,397 client_session.will_message,398 client_session.will_qos)399 make_log("%s Disconnecting session" % client_session.client_id)400 yield from self._stop_handler(handler)401 client_session.transitions.disconnect()402 yield from self.plugins_manager.fire_event(EVENT_BROKER_CLIENT_DISCONNECTED, client_id=client_session.client_id)403 connected = False404 if unsubscribe_waiter in done:405 make_log("%s handling unsubscription" % client_session.client_id)406 unsubscription = unsubscribe_waiter.result()407 for topic in unsubscription['topics']:408 self._del_subscription(topic, client_session)409 yield from self.plugins_manager.fire_event(410 EVENT_BROKER_CLIENT_UNSUBSCRIBED,411 client_id=client_session.client_id,412 topic=topic)413 yield from handler.mqtt_acknowledge_unsubscription(unsubscription['packet_id'])414 unsubscribe_waiter = asyncio.Task(handler.get_next_pending_unsubscription(), loop=self._loop)415 if subscribe_waiter in done:416 make_log("%s handling subscription" % client_session.client_id)417 subscriptions = subscribe_waiter.result()418 return_codes = []419 for subscription in subscriptions['topics']:420 return_codes.append(self.add_subscription(subscription, client_session))421 yield from handler.mqtt_acknowledge_subscription(subscriptions['packet_id'], return_codes)422 for index, subscription in enumerate(subscriptions['topics']):423 if return_codes[index] != 0x80:424 yield from self.plugins_manager.fire_event(425 EVENT_BROKER_CLIENT_SUBSCRIBED,426 client_id=client_session.client_id,427 topic=subscription[0],428 qos=subscription[1])429 yield from self.publish_retained_messages_for_subscription(subscription, client_session)430 subscribe_waiter = asyncio.Task(handler.get_next_pending_subscription(), loop=self._loop)431 make_log(repr(self._subscriptions))432 if wait_deliver in done:433 if self.logger.isEnabledFor(logging.DEBUG):434 make_log("%s handling message delivery" % client_session.client_id)435 app_message = wait_deliver.result()436 if not app_message.topic:437 self.logger.warn("[MQTT-4.7.3-1] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id)438 break439 if "#" in app_message.topic or "+" in app_message.topic:440 self.logger.warn("[MQTT-3.3.2-2] - %s invalid TOPIC sent in PUBLISH message, closing connection" % client_session.client_id)441 break442 yield from self.plugins_manager.fire_event(EVENT_BROKER_MESSAGE_RECEIVED,443 client_id=client_session.client_id,444 message=app_message)445 yield from self._broadcast_message(client_session, app_message.topic, app_message.data)446 if app_message.publish_packet.retain_flag:447 self.retain_message(client_session, app_message.topic, app_message.data, app_message.qos)448 wait_deliver = asyncio.Task(handler.mqtt_deliver_next_message(), loop=self._loop)449 except asyncio.CancelledError:450 make_log("Client loop cancelled")451 break452 disconnect_waiter.cancel()453 subscribe_waiter.cancel()454 unsubscribe_waiter.cancel()455 wait_deliver.cancel()456 make_log("%s Client disconnected" % client_session.client_id)457 server.release_connection()458 def _init_handler(self, session, reader, writer):459 """460 Create a BrokerProtocolHandler and attach to a session461 :return:462 """463 handler = BrokerProtocolHandler(self.plugins_manager, self._loop)464 handler.attach(session, reader, writer)465 return handler466 @asyncio.coroutine467 def _stop_handler(self, handler):468 """469 Stop a running handler and detach if from the session470 :param handler:471 :return:472 """473 try:474 yield from handler.stop()475 except Exception as e:476 self.logger.error(e)477 @asyncio.coroutine478 def authenticate(self, session: Session, listener):479 """480 This method call the authenticate method on registered plugins to test user authentication.481 User is considered authenticated if all plugins called returns True.482 Plugins authenticate() method are supposed to return :483 - True if user is authentication succeed484 - False if user authentication fails485 - None if authentication can't be achieved (then plugin result is then ignored)486 :param session:487 :param listener:488 :return:489 """490 auth_plugins = None491 auth_config = self.config.get('auth', None)492 if auth_config:493 auth_plugins = auth_config.get('plugins', None)494 returns = yield from self.plugins_manager.map_plugin_coro(495 "authenticate",496 session=session,497 filter_plugins=auth_plugins)498 auth_result = True499 if returns:500 for plugin in returns:501 res = returns[plugin]502 if res is False:503 auth_result = False504 make_log("Authentication failed due to '%s' plugin result: %s" % (plugin.name, res))505 else:506 make_log("'%s' plugin result: %s" % (plugin.name, res))507 # If all plugins returned True, authentication is success508 return auth_result509 def retain_message(self, source_session, topic_name, data, qos=None):510 if data is not None and data != b'':511 # If retained flag set, store the message for further subscriptions512 make_log("Retaining message on topic %s" % topic_name)513 retained_message = RetainedApplicationMessage(source_session, topic_name, data, qos)514 self._retained_messages[topic_name] = retained_message515 else:516 # [MQTT-3.3.1-10]517 if topic_name in self._retained_messages:518 make_log("Clear retained messages for topic '%s'" % topic_name)519 del self._retained_messages[topic_name]520 def add_subscription(self, subscription, session):521 import re522 wildcard_pattern = re.compile('.*?/?\+/?.*?')523 try:524 a_filter = subscription[0]525 if '#' in a_filter and not a_filter.endswith('#'):526 # [MQTT-4.7.1-2] Wildcard character '#' is only allowed as last character in filter527 return 0x80528 if a_filter != "+":529 if '+' in a_filter:530 if "/+" not in a_filter and "+/" not in a_filter:531 # [MQTT-4.7.1-3] + wildcard character must occupy entire level532 return 0x80533 qos = subscription[1]534 if 'max-qos' in self.config and qos > self.config['max-qos']:535 qos = self.config['max-qos']536 if a_filter not in self._subscriptions:537 self._subscriptions[a_filter] = []538 already_subscribed = next(539 (s for (s,qos) in self._subscriptions[a_filter] if s.client_id == session.client_id), None)540 if not already_subscribed:541 self._subscriptions[a_filter].append((session, qos))542 else:543 make_log("Client %s has already subscribed to %s" % (format_client_message(session=session), a_filter))544 return qos545 except KeyError:546 return 0x80547 def _del_subscription(self, a_filter, session):548 """549 Delete a session subscription on a given topic550 :param a_filter:551 :param session:552 :return:553 """554 deleted = 0555 try:556 subscriptions = self._subscriptions[a_filter]557 for index, (sub_session, qos) in enumerate(subscriptions):558 if sub_session.client_id == session.client_id:559 make_log("Removing subscription on topic '%s' for client %s" %560 (a_filter, format_client_message(session=session)))561 subscriptions.pop(index)562 deleted += 1563 break564 except KeyError:565 # Unsubscribe topic not found in current subscribed topics566 pass567 finally:568 return deleted569 def _del_all_subscriptions(self, session):570 """571 Delete all topic subscriptions for a given session572 :param session:573 :return:574 """575 filter_queue = deque()576 for topic in self._subscriptions:577 if self._del_subscription(topic, session):578 filter_queue.append(topic)579 for topic in filter_queue:580 if not self._subscriptions[topic]:581 del self._subscriptions[topic]582 def matches(self, topic, a_filter):583 if "#" not in a_filter and "+" not in a_filter:584 # if filter doesn't contain wildcard, return exact match585 return a_filter == topic586 else:587 # else use regex588 match_pattern = re.compile(a_filter.replace('#', '.*').replace('$', '\$').replace('+', '[/\$\s\w\d]+'))589 return match_pattern.match(topic)590 @asyncio.coroutine591 def _broadcast_loop(self):592 running_tasks = deque()593 try:594 while True:595 while running_tasks and running_tasks[0].done():596 running_tasks.popleft()597 broadcast = yield from self._broadcast_queue.get()598 if self.logger.isEnabledFor(logging.DEBUG):599 make_log("broadcasting %r" % broadcast)600 for k_filter in self._subscriptions:601 if broadcast['topic'].startswith("$") and (k_filter.startswith("+") or k_filter.startswith("#")):602 make_log("[MQTT-4.7.2-1] - ignoring brodcasting $ topic to subscriptions starting with + or #")603 elif self.matches(broadcast['topic'], k_filter):604 subscriptions = self._subscriptions[k_filter]605 for (target_session, qos) in subscriptions:606 if 'qos' in broadcast:607 qos = broadcast['qos']608 if target_session.transitions.state == 'connected':609 make_log("broadcasting application message from %s on topic '%s' to %s" %610 (format_client_message(session=broadcast['session']),611 broadcast['topic'], format_client_message(session=target_session)))612 handler = self._get_handler(target_session)613 task = ensure_future(614 handler.mqtt_publish(broadcast['topic'], broadcast['data'], qos, retain=False),615 loop=self._loop)616 running_tasks.append(task)617 else:618 make_log("retaining application message from %s on topic '%s' to client '%s'" %619 (format_client_message(session=broadcast['session']),620 broadcast['topic'], format_client_message(session=target_session)))621 retained_message = RetainedApplicationMessage(622 broadcast['session'], broadcast['topic'], broadcast['data'], qos)623 yield from target_session.retained_messages.put(retained_message)624 except CancelledError:625 # Wait until current broadcasting tasks end626 if running_tasks:627 yield from asyncio.wait(running_tasks, loop=self._loop)628 @asyncio.coroutine629 def _broadcast_message(self, session, topic, data, force_qos=None):630 broadcast = {631 'session': session,632 'topic': topic,633 'data': data634 }635 #print(topic)636 if force_qos:637 broadcast['qos'] = force_qos638 try:639 f=open(topic + ".txt", "a+")640 ts = time.time()641 st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')642 data = data.decode('utf-8')643 #print (type(data))w644 data_raw = "["+st+"] " + data + '\n'645 #print(data_raw)646 f.write(str(data_raw))647 f.flush()648 f.close()649 except:650 try:651 f = open(topic + ".txt", "w")652 ts = time.time()653 st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')654 data = data.decode('utf-8')655 #print (type(data))656 data_raw = "["+st+"] " + data + '\n'657 #print(data_raw)658 f.write(data_raw)659 f.flush()660 f.close()661 except:662 pass663 yield from self._broadcast_queue.put(broadcast)664 @asyncio.coroutine665 def publish_session_retained_messages(self, session):666 make_log("Publishing %d messages retained for session %s" %667 (session.retained_messages.qsize(), format_client_message(session=session))668 )669 publish_tasks = []670 handler = self._get_handler(session)671 while not session.retained_messages.empty():672 retained = yield from session.retained_messages.get()673 publish_tasks.append(ensure_future(674 handler.mqtt_publish(675 retained.topic, retained.data, retained.qos, True), loop=self._loop))676 if publish_tasks:677 yield from asyncio.wait(publish_tasks, loop=self._loop)678 @asyncio.coroutine679 def publish_retained_messages_for_subscription(self, subscription, session):680 make_log("Begin broadcasting messages retained due to subscription on '%s' from %s" %681 (subscription[0], format_client_message(session=session)))682 publish_tasks = []683 handler = self._get_handler(session)684 for d_topic in self._retained_messages:685 make_log("matching : %s %s" % (d_topic, subscription[0]))686 if self.matches(d_topic, subscription[0]):687 make_log("%s and %s match" % (d_topic, subscription[0]))688 retained = self._retained_messages[d_topic]689 publish_tasks.append(asyncio.Task(690 handler.mqtt_publish(691 retained.topic, retained.data, subscription[1], True), loop=self._loop))692 if publish_tasks:693 yield from asyncio.wait(publish_tasks, loop=self._loop)694 make_log("End broadcasting messages retained due to subscription on '%s' from %s" %695 (subscription[0], format_client_message(session=session)))696 def delete_session(self, client_id):697 """698 Delete an existing session data, for example due to clean session set in CONNECT699 :param client_id:700 :return:701 """702 try:703 session = self._sessions[client_id][0]704 except KeyError:705 session = None706 if session is None:707 make_log("Delete session : session %s doesn't exist" % client_id)708 return709 # Delete subscriptions710 make_log("deleting session %s subscriptions" % repr(session))711 self._del_all_subscriptions(session)712 make_log("deleting existing session %s" % repr(self._sessions[client_id]))713 del self._sessions[client_id]714 def _get_handler(self, session):715 client_id = session.client_id716 if client_id:717 try:718 return self._sessions[client_id][1]719 except KeyError:720 pass...

Full Screen

Full Screen

table_test.py

Source:table_test.py Github

copy

Full Screen

...31 '%(levelname)s'32 '\x1b[0m'33 ' '34 '%(message)s', _TIMESTAMP_FORMAT)35def make_log(**kwargs):36 """Create a LogLine instance."""37 # Construct a LogRecord38 attributes = dict(name='testlogger',39 levelno=logging.INFO,40 levelname='INF',41 msg='[%s] %.3f %s',42 args=('MOD1', 3.14159, 'Real message here'),43 created=_TIMESTAMP_SAMPLE.timestamp(),44 filename='test.py',45 lineno=42,46 pathname='/home/user/test.py')47 # Override any above attrs that are passed in.48 attributes.update(kwargs)49 # Create the record50 record = logging.makeLogRecord(dict(attributes))51 # Format52 formatted_message = formatter.format(record)53 log_line = LogLine(record=record,54 formatted_log=formatted_message,55 ansi_stripped_log='')56 log_line.update_metadata()57 return log_line58class TestTableView(unittest.TestCase):59 """Tests for rendering log lines into tables."""60 def setUp(self):61 # Show large diffs62 self.maxDiff = None # pylint: disable=invalid-name63 @parameterized.expand([64 (65 'Correct column widths with all fields set',66 [67 make_log(68 args=('M1', 1.2345, 'Something happened'),69 extra_metadata_fields=dict(module='M1', time=12)),70 make_log(71 args=('MD2', 567.5, 'Another cool event'),72 extra_metadata_fields=dict(module='MD2', time=123)),73 ],74 dict(module=len('MD2'), time=len('123')),75 ),76 (77 'Missing metadata fields on some rows',78 [79 make_log(80 args=('M1', 54321.2, 'Something happened'),81 extra_metadata_fields=dict(module='M1', time=54321.2)),82 make_log(83 args=('MOD2', 567.5, 'Another cool event'),84 extra_metadata_fields=dict(module='MOD2')),85 ],86 dict(module=len('MOD2'), time=len('54321.200')),87 ),88 ]) # yapf: disable89 def test_column_widths(self, _name, logs, expected_widths) -> None:90 """Test colum widths calculation."""91 table = TableView()92 for log in logs:93 table.update_metadata_column_widths(log)94 # update_metadata_column_widths shoulp populate self.metadata.fields95 self.assertEqual(log.metadata.fields,96 log.record.extra_metadata_fields)97 # Check expected column widths98 self.assertEqual(dict(table.column_widths), expected_widths)99 @parameterized.expand([100 (101 'Build header adding fields incrementally',102 [103 make_log(104 args=('MODULE2', 567.5, 'Another cool event'),105 extra_metadata_fields=dict(106 # timestamp missing107 module='MODULE2')),108 make_log(109 args=('MODULE1', 54321.2, 'Something happened'),110 extra_metadata_fields=dict(111 # timestamp added in112 module='MODULE1', timestamp=54321.2)),113 ],114 [115 [('bold', 'Time '), _TABLE_PADDING,116 ('bold', 'Lvl'), _TABLE_PADDING,117 ('bold', 'Module '),118 ('bold', 'Message')],119 [('bold', 'Time '), _TABLE_PADDING,120 ('bold', 'Lvl'), _TABLE_PADDING,121 # timestamp added in122 ('bold', 'Timestamp '),123 ('bold', 'Module '),124 ('bold', 'Message')],125 ],126 ),127 ]) # yapf: disable128 def test_formatted_header(self, _name, logs, expected_headers) -> None:129 """Test colum widths calculation."""130 table = TableView()131 for log, header in zip(logs, expected_headers):132 table.update_metadata_column_widths(log)133 self.assertEqual(table.formatted_header(), header)134 @parameterized.expand([135 (136 'Build rows adding fields incrementally',137 [138 make_log(139 args=('MODULE2', 567.5, 'Another cool event'),140 extra_metadata_fields=dict(141 # timestamp missing142 module='MODULE2',143 msg='Another cool event')),144 make_log(145 args=('MODULE2', 567.5, 'Another cool event'),146 extra_metadata_fields=dict(147 # timestamp and msg missing148 module='MODULE2')),149 make_log(150 args=('MODULE1', 54321.2, 'Something happened'),151 extra_metadata_fields=dict(152 # timestamp added in153 module='MODULE1', timestamp=54321.2,154 msg='Something happened')),155 ],156 [157 [158 ('class:log-time', _TIMESTAMP_SAMPLE_STRING),159 _TABLE_PADDING,160 ('class:log-level-20', 'INF'),161 _TABLE_PADDING,162 ('class:log-table-column-3', 'MODULE2 '),163 ('', 'Another cool event'),...

Full Screen

Full Screen

kadai02_URL_mynav.py

Source:kadai02_URL_mynav.py Github

copy

Full Screen

...35 else:36 driver = webdriver.Firefox(executable_path=GeckoDriverManager().install())37 return driver38#log出力関数39def make_log(txt):40 now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')41 logStr = '[%s:%s] %s' % ('log', now, txt)42 #ファイルに出力43 with open(log_file_path, 'a', encoding='utf-8_sig') as f:44 f.write(logStr + '\n')45 print(logStr)46#URLに付加する検索キーワードを生成する関数47def make_paramaters(words):48 word_list = words.split()49 num = len(word_list)50 count = 151 words_search = ''52 for word in word_list:53 if num == count:54 words_search = words_search + f'kw{word}'55 else:56 words_search = words_search + f'kw{word}_'57 count += 158 return words_search59def find_table_target_word(table_ele, word):60 th_list = table_ele.find_elements_by_css_selector("th")61 td_list = table_ele.find_elements_by_css_selector("td")62 target = ''63 for th, td in zip(th_list, td_list):64 if word == th.text:65 target=td.text66 break67 68 return target69# main処理70def main():71 # driverを起動72 driver = set_driver(BROWSER,False)73 make_log('ブラウザ起動')74 key_word_search = input('検索したいキーワードを入力してください。>>') 75 # Webサイトを開く76 driver.get(TARGET_SITE)77 time.sleep(4)78 # ポップアップを閉じる79 driver.execute_script('document.querySelector(".karte-close").click()')80 time.sleep(4)81 # ポップアップを閉じる82 driver.execute_script('document.querySelector(".karte-close").click()')83 #検索キーワードをパラメータとしてURLを生成して検索する84 paramaters_search = make_paramaters(key_word_search)85 driver.get(TARGET_SITE + 'list/' + paramaters_search)86 make_log(f'検索キーワード({key_word_search})を付加しURLを生成')87 # 検索結果 件数を取得、表示88 total_names = driver.find_element_by_css_selector(89 "span.js__searchRecruit--count").text90 # print(f'キーワードに該当する企業数={total_names}社')91 make_log(f'検索結果取得成功:キーワードに該当する企業数={total_names}社')92 page = 193 counts_companies = 194 df = pd.DataFrame()95 while True:96 contents = driver.find_elements_by_css_selector('.cassetteRecruit')97 # print(f'{page}ページ目の企業数={len(contents)}')98 make_log(f'{page}ページ目の企業数={len(contents)}')99 for content in contents:100 try:101 name_catch =content.find_element_by_css_selector("h3").text102 name_catch = name_catch.strip().split('|')103 104 if(len(name_catch) > 1):105 name = name_catch[0]106 company_catch = name_catch[1]107 else:108 name = name_catch[0]109 company_catch = ''110 title = content.find_element_by_css_selector(111 ".cassetteRecruit__copy > a").text112 link = content.find_element_by_css_selector(113 ".cassetteRecruit__copy > a").get_attribute("href")114 update_date = content.find_element_by_css_selector(115 ".cassetteRecruit__updateDate > span").text116 table = content.find_element_by_css_selector(".tableCondition")117 salary1styear = find_table_target_word(table,"初年度年収")118 df = df.append({119 'No.': counts_companies,120 '企業名': name,121 '募集内容': title,122 '情報更新日': update_date,123 '募集内容詳細': link,124 '企業紹介': company_catch,125 '初年度年収': salary1styear126 }, ignore_index=True)127 make_log(f'{counts_companies}社目 情報取得成功')128 except Exception as e:129 make_log(f'{counts_companies}社目 情報取得失敗')130 make_log(traceback.format_exc())131 finally:132 counts_companies += 1133 #次ページへ遷移134 element_click = driver.find_elements_by_css_selector(135 ".iconFont--arrowLeft")136 if len(element_click) >= 1:137 page += 1138 driver.execute_script(139 "arguments[0].scrollIntoView(true);", element_click[0])140 element_click[0].click()141 make_log(f'{page}ページ目へ遷移')142 else:143 print("検索結果を全て取得しました。")144 driver.close()145 break146 147 # 結果をcsvファイルに保存148 today = datetime.datetime.now().strftime('%Y-%m-%d')149 file_name_path = './results/' + \150 f'検索結果(キーワード={key_word_search})_{today}'+'.csv'151 df.to_csv(file_name_path, encoding="utf-8_sig", index=False)152 make_log(f'ファイルに保存_ファイル名「検索結果(キーワード={key_word_search}).csv」')153 154# 直接起動された場合はmain()を起動(モジュールとして呼び出された場合は起動しないようにするため)155if __name__ == "__main__":...

Full Screen

Full Screen

kadai02_mynav.py

Source:kadai02_mynav.py Github

copy

Full Screen

...34 else:35 driver = webdriver.Firefox(executable_path=GeckoDriverManager().install())36 return driver37#log出力関数38def make_log(txt):39 now = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')40 logStr = '[%s:%s] %s' %('log', now, txt)41 #ファイルに出力42 with open(log_file_path, 'a', encoding='utf-8_sig') as f:43 f.write(logStr + '\n')44 print(logStr)45 # main処理46def main():47 # driverを起動48 driver = set_driver(BROWSER,False)49 make_log('ブラウザ起動')50 key_word_search = input('検索したいキーワードを入力してください。>>') 51 # Webサイトを開く52 driver.get(TARGET_SITE)53 time.sleep(4)54 # ポップアップを閉じる55 driver.execute_script('document.querySelector(".karte-close").click()')56 time.sleep(4)57 # ポップアップを閉じる58 driver.execute_script('document.querySelector(".karte-close").click()')59 #キーワードを検索窓に入力する60 driver.find_element_by_css_selector(61 "input.topSearch__text").send_keys(key_word_search)62 make_log(f'キーワード({key_word_search})を検索窓に入力')63 # 検索ボタンクリック64 driver.find_element_by_class_name("topSearch__button").click()65 # 検索結果 件数を取得、表示66 total_names = driver.find_element_by_css_selector(67 "span.js__searchRecruit--count").text68 print(f'キーワードに該当する企業数={total_names}社')69 make_log(f'検索結果取得成功:キーワードに該当する企業数={total_names}社')70 page = 171 counts_companies = 172 df = pd.DataFrame()73 while True:74 contents = driver.find_elements_by_css_selector('.cassetteRecruit')75 print(f'{page}ページ目の企業数={len(contents)}')76 make_log(f'{page}ページ目の企業数={len(contents)}')77 for content in contents:78 try:79 #企業名と企業キャッチコピーをリストに格納→それぞれを変数に取り出す80 name_catch =content.find_element_by_css_selector("h3").text81 name_catch = name_catch.strip().split('|')82 if(len(name_catch) > 1):83 name = name_catch[0]84 company_catch = name_catch[1]85 else:86 name = name_catch[0]87 company_catch=''88 #求人件名、詳細URL、更新日を取得89 title = content.find_element_by_css_selector(90 ".cassetteRecruit__copy > a").text91 link = content.find_element_by_css_selector(92 ".cassetteRecruit__copy > a").get_attribute("href")93 update_date = content.find_element_by_css_selector(94 ".cassetteRecruit__updateDate > span").text95 # print(counts_companies, name,update_date,link)96 #情報をデータフレームに格納97 df = df.append({98 'No.': counts_companies,99 '企業名': name,100 '募集内容': title,101 '情報更新日': update_date,102 '募集内容詳細': link,103 '企業紹介': company_catch104 }, ignore_index=True)105 make_log(f'{counts_companies}社目 情報取得成功')106 except Exception as e:107 make_log(f'{counts_companies}社目 情報取得失敗')108 make_log(traceback.format_exc())109 finally:110 counts_companies += 1111 #次ページへ遷移112 element_click = driver.find_elements_by_css_selector(113 ".iconFont--arrowLeft")114 if len(element_click) >=1:115 page += 1116 driver.execute_script(117 "arguments[0].scrollIntoView(true);", element_click[0])118 element_click[0].click()119 make_log(f'{page}ページ目へ遷移')120 else:121 print("検索結果を全て取得しました。")122 driver.close()123 break124 # 結果をcsvファイルに保存125 today = datetime.datetime.now().strftime('%Y-%m-%d')126 file_name_path = './results/' + f'検索結果(キーワード={key_word_search})_{today}'+'.csv'127 df.to_csv(file_name_path, encoding="utf-8_sig", index=False)128 make_log(f'ファイルに保存_ファイル名「検索結果(キーワード={key_word_search}).csv」')129 130# 直接起動された場合はmain()を起動(モジュールとして呼び出された場合は起動しないようにするため)131if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Lemoncheesecake automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful