Best Python code snippet using localstack_python
tracker_store.py
Source:tracker_store.py
1import logging2import os3import time4import datetime5import hashlib6import json7from rasa_core.tracker_store import InMemoryTrackerStore8from rasa_core.events import ActionExecuted, BotUttered, UserUttered9from elasticsearch import Elasticsearch10try:11 from nltk.corpus import stopwords12except Exception as e:13 import nltk14 nltk.download('stopwords')15 from nltk.corpus import stopwords16 pass17logger = logging.getLogger(__name__)18ENABLE_ANALYTICS = os.getenv('ENABLE_ANALYTICS', 'False').lower() == 'true'19ENVIRONMENT_NAME = os.getenv('ENVIRONMENT_NAME', 'locahost')20BOT_VERSION = os.getenv('BOT_VERSION', 'notdefined')21HASH_GEN = hashlib.md5()22def gen_id(timestamp):23 HASH_GEN.update(str(timestamp).encode('utf-8'))24 _id = HASH_GEN.hexdigest()[10:]25 return _id26class ElasticTrackerStore(InMemoryTrackerStore):27 def __init__(self, domain,28 user=None, password=None, scheme='http', scheme_port=80):29 if user is None:30 self.es = Elasticsearch([domain])31 else:32 self.es = Elasticsearch(33 ['{}://{}:{}@{}:{}'.format(scheme, user, password,34 domain, scheme_port)],35 )36 super(ElasticTrackerStore, self).__init__(domain)37 def save_user_message(self, tracker):38 if not tracker.latest_message.text:39 return40 ts = time.time()41 timestamp = datetime.datetime.strftime(42 datetime.datetime.fromtimestamp(ts),43 '%Y/%m/%d %H:%M:%S'44 )45 #Bag of words46 tags = []47 for word in tracker.latest_message.text.replace('. ',' ').replace(',',' ').replace('"','').replace("'",'').replace('*','').replace('(','').replace(')','').split(' '):48 if word.lower() not in stopwords.words('portuguese') and len(word) > 1:49 tags.append(word)50 message = {51 'environment': ENVIRONMENT_NAME,52 'version': BOT_VERSION,53 'user_id': tracker.sender_id,54 'is_bot': False,55 'timestamp': timestamp,56 'text': tracker.latest_message.text,57 'tags': tags,58 'entities': tracker.latest_message.entities,59 'intent_name': tracker.latest_message.intent['name'],60 'intent_confidence': tracker.latest_message.intent['confidence'],61 'utter_name': '',62 'is_fallback': False,63 }64 self.es.index(index='messages', doc_type='message',65 id='{}_user_{}'.format(ENVIRONMENT_NAME, gen_id(ts)),66 body=json.dumps(message))67 def save_bot_message(self, tracker):68 if not tracker.latest_message.text:69 return70 utters = []71 index = len(tracker.events) - 172 while True:73 evt = tracker.events[index]74 if isinstance(evt, UserUttered):75 break76 elif isinstance(evt, BotUttered):77 while not isinstance(evt, ActionExecuted):78 index -= 179 evt = tracker.events[index]80 utters.append(evt.action_name)81 index -= 182 time_offset = 083 for utter in utters[::-1]:84 time_offset += 10085 ts = (86 datetime.datetime.now() +87 datetime.timedelta(milliseconds=time_offset)88 ).timestamp()89 timestamp = datetime.datetime.strftime(90 datetime.datetime.fromtimestamp(ts),91 '%Y/%m/%d %H:%M:%S'92 )93 user_text = ''94 if utter == 'action_default_fallback':95 index = len(tracker.events) - 196 while index > 0:97 evt = tracker.events[index]98 if isinstance(evt, UserUttered):99 user_text = evt.text100 break101 index -= 1102 else:103 user_text = tracker.latest_message.text104 message = {105 'environment': ENVIRONMENT_NAME,106 'version': BOT_VERSION,107 'user_id': tracker.sender_id,108 'is_bot': True,109 'text': user_text,110 'tags': [],111 'timestamp': timestamp,112 'entities': [],113 'intent_name': '',114 'intent_confidence': '',115 'utter_name': utter,116 'is_fallback': utter == 'action_default_fallback',117 }118 self.es.index(index='messages', doc_type='message',119 id='{}_bot_{}'.format(ENVIRONMENT_NAME, gen_id(ts)),120 body=json.dumps(message))121 def save(self, tracker):122 if ENABLE_ANALYTICS:123 try:124 self.save_user_message(tracker)125 self.save_bot_message(tracker)126 except Exception as ex:127 logger.error('Could not track messages '128 'for user {}'.format(tracker.sender_id))129 logger.error(str(ex))...
run-rocketchat.py
Source:run-rocketchat.py
1import os2import sys3import logging4from rasa_core.utils import configure_colored_logging, AvailableEndpoints5from rasa_core.run import start_server, load_agent6from rasa_core.interpreter import NaturalLanguageInterpreter7from rasa_core.tracker_store import InMemoryTrackerStore8from rasa_core.broker import PikaProducer9from connector import RocketChatInput10logger = logging.getLogger(__name__)11configure_colored_logging(loglevel="DEBUG")12url = os.getenv("BROKER_URL", "")13username = os.getenv("BROKER_USERNAME", "")14password = os.getenv("BROKER_PASSWORD", "")15queue = os.getenv("QUEUE_NAME", "")16ENABLE_ANALYTICS = os.getenv("ENABLE_ANALYTICS", "False").lower() == "true"17# WorkAround to import /policies as a module and be able to run the bot18homedir = os.path.expanduser("/")19sys.path.append(homedir)20def run(core_dir, nlu_dir):21 pika_broker = None22 if ENABLE_ANALYTICS:23 pika_broker = PikaProducer(url, username, password, queue=queue)24 configs = {25 "user": os.getenv("ROCKETCHAT_BOT_USERNAME"),26 "password": os.getenv("ROCKETCHAT_BOT_PASSWORD"),27 "server_url": os.getenv("ROCKETCHAT_URL"),28 }29 input_channel = RocketChatInput(30 user=configs["user"],31 password=configs["password"],32 server_url=configs["server_url"],33 )34 _tracker_store = InMemoryTrackerStore(35 domain=None, event_broker=pika_broker)36 _endpoints = AvailableEndpoints.read_endpoints(None)37 _interpreter = NaturalLanguageInterpreter.create(nlu_dir)38 _agent = load_agent(39 core_dir,40 interpreter=_interpreter,41 tracker_store=_tracker_store,42 endpoints=_endpoints,43 )44 http_server = start_server([input_channel], "", "", 5005, _agent)45 try:46 http_server.serve_forever()47 except Exception as exc:48 logger.exception(exc)49if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!