How to use monitor_instances method in localstack

Best Python code snippet using localstack_python

os_custom_ml.py

Source:os_custom_ml.py Github

copy

Full Screen

1#!/usr/bin/env python2# -*- coding: utf-8 -*-3"""4ibm-wos-utils==2.1.15Python 3.76"""7import warnings8warnings.filterwarnings('ignore')9import random10import time11import uuid12import matplotlib.pyplot as plt13import pandas as pd14import requests15from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator16from ibm_watson_openscale import APIClient17from ibm_watson_openscale.base_classes.watson_open_scale_v2 import *18from ibm_watson_openscale.supporting_classes.enums import *19from ibm_watson_openscale.supporting_classes.payload_record import PayloadRecord20from ibm_wos_utils.drift.drift_trainer import DriftTrainer21def get_scoring_payload(df, cols_to_remove, no_of_records_to_score=1):22 for col in cols_to_remove:23 if col in df.columns:24 del df[col]25 fields = df.columns.tolist()26 values = df[fields].values.tolist()27 payload_scoring = {"fields": fields, "values": values[:no_of_records_to_score]}28 return payload_scoring29def custom_ml_scoring(scoring_url, payload_scoring):30 header = {"Content-Type": "application/json", "x": "y"}31 scoring_response = requests.post(scoring_url, json=payload_scoring, headers=header, verify=False)32 jsonify_scoring_response = scoring_response.json()33 return jsonify_scoring_response34def payload_logging(payload_scoring, scoring_response, wos_client, payload_data_set_id):35 scoring_id = str(uuid.uuid4())36 records_list = []37 # manual PL logging for custom ml provider38 pl_record = PayloadRecord(scoring_id=scoring_id, request=payload_scoring, response=scoring_response,39 response_time=int(460))40 records_list.append(pl_record)41 wos_client.data_sets.store_records(data_set_id=payload_data_set_id, request_body=records_list)42 time.sleep(5)43 pl_records_count = wos_client.data_sets.get_records_count(payload_data_set_id)44 print("Number of records in the payload logging table: {}".format(pl_records_count))45 return scoring_id46def auth_cpd(WOS_CREDENTIALS):47 authenticator = CloudPakForDataAuthenticator(48 url=WOS_CREDENTIALS['url'],49 username=WOS_CREDENTIALS['username'],50 password=WOS_CREDENTIALS['password'],51 disable_ssl_verification=True52 )53 wos_client = APIClient(service_url=WOS_CREDENTIALS['url'], authenticator=authenticator)54 print(wos_client.version)55 print(wos_client.data_marts.show())56 return wos_client57def remove_existing_service_provider(wos_client, SERVICE_PROVIDER_NAME):58 service_providers = wos_client.service_providers.list().result.service_providers59 for service_provider in service_providers:60 service_instance_name = service_provider.entity.name61 if service_instance_name == SERVICE_PROVIDER_NAME:62 service_provider_id = service_provider.metadata.id63 wos_client.service_providers.delete(service_provider_id)64 print("Deleted existing service_provider for WML instance: {}".format(service_provider_id))65def add_service_provider(SERVICE_PROVIDER_NAME, SERVICE_PROVIDER_DESCRIPTION, ):66 request_headers = {"Content-Type": "application/json", "Custom_header_X": "Custom_header_X_value_Y"}67 MLCredentials = {}68 added_service_provider_result = wos_client.service_providers.add(69 name=SERVICE_PROVIDER_NAME,70 description=SERVICE_PROVIDER_DESCRIPTION,71 service_type=ServiceTypes.CUSTOM_MACHINE_LEARNING,72 request_headers=request_headers,73 operational_space_id="production",74 credentials=MLCredentials,75 background_mode=False76 ).result77 service_provider_id = added_service_provider_result.metadata.id78 print(wos_client.service_providers.get(service_provider_id).result)79 print('Service Provider ID : ' + service_provider_id)80 return service_provider_id81def remove_existing_subscription(wos_client, SUBSCRIPTION_NAME):82 subscriptions = wos_client.subscriptions.list().result.subscriptions83 for subscription in subscriptions:84 if subscription.entity.asset.name == "[asset] " + SUBSCRIPTION_NAME:85 sub_model_id = subscription.metadata.id86 wos_client.subscriptions.delete(subscription.metadata.id)87 print('Deleted existing subscription for model', sub_model_id)88def create_monitor(data_mart_id, target, parameters, thresholds, type, wos_client):89 type_dict = {'fairness': wos_client.monitor_definitions.MONITORS.FAIRNESS.ID,90 'quality': wos_client.monitor_definitions.MONITORS.QUALITY.ID,91 'drift': wos_client.monitor_definitions.MONITORS.DRIFT.ID}92 monitor_details = wos_client.monitor_instances.create(93 data_mart_id=data_mart_id,94 background_mode=False,95 monitor_definition_id=type_dict[type],96 target=target,97 parameters=parameters,98 thresholds=thresholds).result99 monitor_instance_id = monitor_details.metadata.id100 return monitor_instance_id101def finish_explanation_tasks(wos_client, explanation_task_ids, sample_size):102 finished_explanations = []103 finished_explanation_task_ids = []104 # Check for the explanation task status for finished status.105 # If it is in-progress state, then sleep for some time and check again.106 # Perform the same for couple of times, so that all tasks get into finished state.107 for i in range(0, 5):108 # for each explanation109 print('iteration ' + str(i))110 # check status for all explanation tasks111 for explanation_task_id in explanation_task_ids:112 if explanation_task_id not in finished_explanation_task_ids:113 result = wos_client.monitor_instances.get_explanation_tasks(114 explanation_task_id=explanation_task_id).result115 print(result)116 print(explanation_task_id + ' : ' + result.entity.status.state)117 if (118 result.entity.status.state == 'finished' or result.entity.status.state == 'error') and explanation_task_id not in finished_explanation_task_ids:119 finished_explanation_task_ids.append(explanation_task_id)120 finished_explanations.append(result)121 # if there is altest one explanation task that is not yet completed, then sleep for sometime,122 # and check for all those tasks, for which explanation is not yet completeed.123 if len(finished_explanation_task_ids) != sample_size:124 print('sleeping for some time..')125 time.sleep(10)126 else:127 break128 return finished_explanations129def construct_explanation_features_map(feature_name, feature_weight, explanation_features_map):130 if feature_name in explanation_features_map:131 explanation_features_map[feature_name].append(feature_weight)132 else:133 explanation_features_map[feature_name] = [feature_weight]134def score(training_data_frame):135 # The data type of the label column and prediction column should be same .136 # User needs to make sure that label column and prediction column array should have the same unique class labels137 prediction_column_name = "prediction"138 probability_column_name = "probability"139 feature_columns = list(training_data_frame.columns)140 training_data_rows = training_data_frame[feature_columns].values.tolist()141 payload_scoring_records = {142 "fields": feature_columns,143 "values": [x for x in training_data_rows]144 }145 header = {"Content-Type": "application/json", "x": "y"}146 scoring_response_raw = requests.post(scoring_url, json=payload_scoring_records, headers=header, verify=False)147 scoring_response = scoring_response_raw.json()148 prob_col_index = list(scoring_response.get('fields')).index(probability_column_name)149 predict_col_index = list(scoring_response.get('fields')).index(prediction_column_name)150 if prob_col_index < 0 or predict_col_index < 0:151 raise Exception("Missing prediction/probability column in the scoring response")152 import numpy as np153 probability_array = np.array([value[prob_col_index] for value in scoring_response.get('values')])154 prediction_vector = np.array([value[predict_col_index] for value in scoring_response.get('values')])155 return probability_array, prediction_vector156def generating_drift_model(training_df, drift_detection_input, scoring_method):157 drift_trainer = DriftTrainer(training_df, drift_detection_input)158 if model_type != "regression":159 drift_trainer.generate_drift_detection_model(scoring_method, batch_size=training_df.shape[0])160 # Note: Two column constraints are not computed beyond two_column_learner_limit(default set to 200)161 drift_trainer.learn_constraints(two_column_learner_limit=200)162 drift_trainer.create_archive()163def remove_drift_monitor_for_subscription(wos_client, subscription_id):164 monitor_instances = wos_client.monitor_instances.list().result.monitor_instances165 for monitor_instance in monitor_instances:166 monitor_def_id = monitor_instance.entity.monitor_definition_id167 if monitor_def_id == "drift" and monitor_instance.entity.target.target_id == subscription_id:168 wos_client.monitor_instances.delete(monitor_instance.metadata.id)169 print('Deleted existing drift monitor instance with id: ', monitor_instance.metadata.id)170def fairness(wos_client, data_mart_id, subscription_id):171 # ===========fairness min100/hourly172 target = Target(173 target_type=TargetTypes.SUBSCRIPTION,174 target_id=subscription_id175 )176 parameters = {177 "features": [178 {"feature": "Sex",179 "majority": ['male'],180 "minority": ['female']181 },182 {"feature": "Age",183 "majority": [[26, 75]],184 "minority": [[18, 25]]185 }186 ],187 "favourable_class": ["No Risk"],188 "unfavourable_class": ["Risk"],189 "min_records": 100190 }191 thresholds = [{192 "metric_id": "fairness_value",193 "specific_values": [{194 "applies_to": [{195 "key": "feature",196 "type": "tag",197 "value": "Age"198 }],199 "value": 95200 },201 {202 "applies_to": [{203 "key": "feature",204 "type": "tag",205 "value": "Sex"206 }],207 "value": 95208 }209 ],210 "type": "lower_limit",211 "value": 80.0212 }]213 fairness_monitor_instance_id = create_monitor(data_mart_id, target, parameters, thresholds, 'fairness',214 wos_client)215 ### Get Fairness Monitor Instance216 wos_client.monitor_instances.show()217 ### Get run details218 runs = wos_client.monitor_instances.list_runs(fairness_monitor_instance_id, limit=1).result.to_dict()219 fairness_monitoring_run_id = runs["runs"][0]["metadata"]["id"]220 run_status = None221 while (run_status not in ["finished", "error"]):222 run_details = wos_client.monitor_instances.get_run_details(fairness_monitor_instance_id,223 fairness_monitoring_run_id).result.to_dict()224 run_status = run_details["entity"]["status"]["state"]225 print('run_status: ', run_status)226 if run_status in ["finished", "error"]:227 break228 time.sleep(10)229 ### Fairness run output230 wos_client.monitor_instances.get_run_details(fairness_monitor_instance_id,231 fairness_monitoring_run_id).result.to_dict()232 wos_client.monitor_instances.show_metrics(monitor_instance_id=fairness_monitor_instance_id)233def explainability(wos_client, data_mart_id, subscription_id, sample_size=2):234 # ====================Explainability235 target = Target(236 target_type=TargetTypes.SUBSCRIPTION,237 target_id=subscription_id238 )239 parameters = {240 "enabled": True241 }242 explain_monitor_details = wos_client.monitor_instances.create(243 data_mart_id=data_mart_id,244 background_mode=False,245 monitor_definition_id=wos_client.monitor_definitions.MONITORS.EXPLAINABILITY.ID,246 target=target,247 parameters=parameters248 ).result249 scoring_ids = []250 for i in range(0, sample_size):251 n = random.randint(1, 100)252 scoring_ids.append(scoring_id + '-' + str(n))253 print("Running explanations on scoring IDs: {}".format(scoring_ids))254 explanation_types = ["lime", "contrastive"]255 result = wos_client.monitor_instances.explanation_tasks(scoring_ids=scoring_ids,256 explanation_types=explanation_types).result257 print(result)258 ### Explanation tasks259 explanation_task_ids = result.metadata.explanation_task_ids260 return explanation_task_ids261def explanation_feature_map_plot(finished_explanations):262 explanation_features_map = {}263 for result in finished_explanations:264 print('\n>>>>>>>>>>>>>>>>>>>>>>\n')265 print(266 'explanation task: ' + str(result.metadata.explanation_task_id) + ', perturbed:' + str(267 result.entity.perturbed))268 if result.entity.explanations is not None:269 explanations = result.entity.explanations270 for explanation in explanations:271 if 'predictions' in explanation:272 predictions = explanation['predictions']273 for prediction in predictions:274 predicted_value = prediction['value']275 probability = prediction['probability']276 print('prediction : ' + str(predicted_value) + ', probability : ' + str(probability))277 if 'explanation_features' in prediction:278 explanation_features = prediction['explanation_features']279 for explanation_feature in explanation_features:280 feature_name = explanation_feature['feature_name']281 feature_weight = explanation_feature['weight']282 if (feature_weight >= 0):283 feature_weight_percent = round(feature_weight * 100, 2)284 print(str(feature_name) + ' : ' + str(feature_weight_percent))285 task_feature_weight_map = {}286 task_feature_weight_map[287 result.metadata.explanation_task_id] = feature_weight_percent288 construct_explanation_features_map(feature_name, feature_weight_percent,289 explanation_features_map)290 print('\n>>>>>>>>>>>>>>>>>>>>>>\n')291 for key in explanation_features_map.keys():292 # plot_graph(key, explanation_features_map[key])293 values = explanation_features_map[key]294 plt.title(key)295 plt.ylabel('Weight')296 plt.bar(range(len(values)), values)297 plt.show()298def quality(wos_client, subscription_id, feedback_file_path):299 # =========================Quality monitoring and feedback logging300 target = Target(301 target_type=TargetTypes.SUBSCRIPTION,302 target_id=subscription_id303 )304 parameters = {305 "min_feedback_data_size": 90306 }307 thresholds = [308 {309 "metric_id": "area_under_roc",310 "type": "lower_limit",311 "value": .80312 }313 ]314 quality_monitor_instance_id = create_monitor(data_mart_id, target, parameters, thresholds, 'quality',315 wos_client)316 ## Feedback logging317 ## Get feedback logging dataset ID318 feedback_dataset_id = None319 feedback_dataset = wos_client.data_sets.list(type=DataSetTypes.FEEDBACK,320 target_target_id=subscription_id,321 target_target_type=TargetTypes.SUBSCRIPTION).result322 feedback_dataset_id = feedback_dataset.data_sets[0].metadata.id323 if feedback_dataset_id is None:324 print("Feedback data set not found. Please check quality monitor status.")325 with open(feedback_file_path) as feedback_file:326 additional_feedback_data = json.load(feedback_file)327 wos_client.data_sets.store_records(feedback_dataset_id, request_body=additional_feedback_data,328 background_mode=False)329 wos_client.data_sets.get_records_count(data_set_id=feedback_dataset_id)330 run_details = wos_client.monitor_instances.run(monitor_instance_id=quality_monitor_instance_id,331 background_mode=False).result332 wos_client.monitor_instances.show_metrics(monitor_instance_id=quality_monitor_instance_id)333def drift(wos_client, data_mart_id, training_file_path, subscription_id, score_method, drift_detection_input):334 # =======================Drift335 # Drift detection model generation336 df = pd.read_csv(training_file_path)337 ### Define the drift detection input338 ### Generate drift detection model339 filename = 'drift_detection_model.tar.gz'340 generating_drift_model(df, drift_detection_input, score_method)341 ### Upload the drift detection model to OpenScale subscription342 wos_client.monitor_instances.upload_drift_model(343 model_path=filename,344 archive_name=filename,345 data_mart_id=data_mart_id,346 subscription_id=subscription_id,347 enable_data_drift=True,348 enable_model_drift=True349 )350 ### Delete the existing drift monitor instance for the subscription351 remove_drift_monitor_for_subscription(wos_client, subscription_id)352 target = Target(353 target_type=TargetTypes.SUBSCRIPTION,354 target_id=subscription_id355 )356 parameters = {357 "min_samples": 100,358 "drift_threshold": 0.1,359 "train_drift_model": False,360 "enable_model_drift": True,361 "enable_data_drift": True362 }363 drift_monitor_instance_id = create_monitor(data_mart_id, target, parameters, {}, 'drift', wos_client)364 ### Drift run365 drift_run_details = wos_client.monitor_instances.run(monitor_instance_id=drift_monitor_instance_id,366 background_mode=False)367 time.sleep(5)368 wos_client.monitor_instances.show_metrics(monitor_instance_id=drift_monitor_instance_id)369if __name__ == '__main__':370 WOS_CPD_TECHZONE = {371 "url": "https://services-uscentral.skytap.com:8586/",372 "username": "admin",373 "password": "password",374 "version": "3.5"375 }376 WOS_CPD_LUBAN = {377 "url": "https://zen-cpd-zen.apps.bj-prod-2.luban.cdl.ibm.com/",378 "username": "admin",379 "password": "password",380 "version": "3.5"381 }382 DB2_TECHZONE = {'table_name': 'GERMAN_CREDIT',383 'schema_name': 'aiopenscale00',384 'hostname': '10.1.1.1',385 'username': 'admin',386 'password': 'password',387 'database_name': 'aiopenscale00'}388 DB2_LUBAN = {'table_name': 'GERMAN_CREDIT',389 'schema_name': 'OPENSCALE',390 'hostname': 'worker19.bj-prod-2.luban.cdl.ibm.com:30157',391 'username': 'admin',392 'password': 'password',393 'database_name': 'aiopenscale00'}394 VM_SCORING_URL = 'http://169.62.165.235:8880/predict/'395 GBS_SCORING_URL = 'https://9.112.255.149/edi/service199/api'396 general_header = {397 "Content-Type": "application/json",398 "Custom_header_X": "Custom_header_X_value_Y"399 }400 GBS_request_header = {401 'Content-Type': 'application/json',402 'apikey': 'eyJhbGciOiJSUzI1NiIsInppcCI6IkdaSVAifQ.H4sIAAAAAAAAAC2MWwrCMBQF93K_cyFN0oZ0A-Iy8rhKtG1KHqKIezcWvw7DGeYNpTmYYU2BlrQXrOlOG15po2xrTBu6F_rFxrUAg1hKd72lH2QqqWVPp5zafg4wD5xBofyIng40hgE9d5iVNpyLaZQjg9aN4-01W_tOclL_71ZjzwclSWjucXBOorpYhXYUGjtpsloMRnj4fAE1qncluQAAAA.LcoU4zK_a1qq8mxZAc1iBD0wQ9XuHkgWvjOy5As4Ob1Zb2_uZ1MHPu_nYnoPnRi07EMC-p-kqP8ahjlATXvIElxhxkUXHJP11TbkAQGnwVgfYX_iH4VN8Pft8Ma8eIjpnZ-QH5XM9iYige9u9dFDR2RHcD94LPVHHwLs2CNMX_U'403 }404 # TECHZONE405 wos_config = WOS_CPD_TECHZONE406 db2_config = DB2_TECHZONE407 scoring_url = VM_SCORING_URL408 scoring_request_headers = general_header409 # # GBS410 # wos_config = WOS_CPD_LUBAN411 # db2_config = DB2_LUBAN412 # scoring_url = GBS_SCORING_URL413 # scoring_request_headers=GBS_request_header414 label_column = "Risk"415 model_type = "binary"416 training_file_path = "../data/german_credit_data_biased_training.csv"417 feedback_file_path = '../data/additional_feedback_data_v2.json'418 df = pd.read_csv(training_file_path)419 cols_to_remove = [label_column]420 payload_scoring = get_scoring_payload(df, cols_to_remove, 1)421 scoring_id = None422 wos_client = auth_cpd(wos_config)423 data_marts = wos_client.data_marts.list().result.data_marts424 if len(data_marts) == 0:425 raise Exception("Missing data mart.")426 data_mart_id = data_marts[0].metadata.id427 print('Using existing datamart {}'.format(data_mart_id))428 data_mart_details = wos_client.data_marts.list().result.data_marts[0]429 data_mart_details.to_dict()430 print(wos_client.service_providers.show())431 SERVICE_PROVIDER_NAME = "Custom ML Provider Demo - All Monitors"432 SERVICE_PROVIDER_DESCRIPTION = "Added by tutorial WOS notebook to showcase monitoring Fairness, Quality, Drift and Explainability against a Custom ML provider."433 remove_existing_service_provider(wos_client, SERVICE_PROVIDER_NAME)434 service_provider_id = add_service_provider(SERVICE_PROVIDER_NAME, SERVICE_PROVIDER_DESCRIPTION, )435 print('Data Mart ID : ' + data_mart_id)436 wos_client.subscriptions.show()437 SUBSCRIPTION_NAME = "Custom ML Subscription - All Monitors"438 remove_existing_subscription(wos_client, SUBSCRIPTION_NAME)439 feature_columns = ["CheckingStatus", "LoanDuration", "CreditHistory", "LoanPurpose", "LoanAmount",440 "ExistingSavings",441 "EmploymentDuration", "InstallmentPercent", "Sex", "OthersOnLoan", "CurrentResidenceDuration",442 "OwnsProperty", "Age", "InstallmentPlans", "Housing", "ExistingCreditsCount", "Job",443 "Dependents",444 "Telephone", "ForeignWorker"]445 cat_features = ["CheckingStatus", "CreditHistory", "LoanPurpose", "ExistingSavings", "EmploymentDuration", "Sex",446 "OthersOnLoan", "OwnsProperty", "InstallmentPlans", "Housing", "Job", "Telephone", "ForeignWorker"]447 asset_id = str(uuid.uuid4())448 asset_name = '[asset] ' + SUBSCRIPTION_NAME449 url = ''450 asset_deployment_id = str(uuid.uuid4())451 asset_deployment_name = asset_name452 asset_deployment_scoring_url = scoring_url453 scoring_endpoint_url = scoring_url454 subscription_details = wos_client.subscriptions.add(455 data_mart_id=data_mart_id,456 service_provider_id=service_provider_id,457 asset=Asset(458 asset_id=asset_id,459 name=asset_name,460 url=url,461 asset_type=AssetTypes.MODEL,462 input_data_type=InputDataType.STRUCTURED,463 problem_type=ProblemType.BINARY_CLASSIFICATION464 ),465 deployment=AssetDeploymentRequest(466 deployment_id=asset_deployment_id,467 name=asset_deployment_name,468 deployment_type=DeploymentTypes.ONLINE,469 scoring_endpoint=ScoringEndpointRequest(470 url=scoring_endpoint_url,471 request_headers=scoring_request_headers472 )473 ),474 asset_properties=AssetPropertiesRequest(475 label_column=label_column,476 probability_fields=["probability"],477 prediction_field="prediction",478 feature_fields=feature_columns,479 categorical_fields=cat_features,480 training_data_reference=TrainingDataReference(type="db2",481 location=DB2TrainingDataReferenceLocation(482 table_name=db2_config['table_name'],483 schema_name=db2_config['schema_name']),484 connection=DB2TrainingDataReferenceConnection.from_dict({485 'hostname': db2_config['hostname'],486 'username': db2_config['username'],487 'password': db2_config['password'],488 'database_name': db2_config['database_name']}))489 )490 ).result491 subscription_id = subscription_details.metadata.id492 print('Subscription ID: ' + subscription_id)493 time.sleep(5)494 payload_data_set_id = None495 payload_data_set_id = wos_client.data_sets.list(type=DataSetTypes.PAYLOAD_LOGGING,496 target_target_id=subscription_id,497 target_target_type=TargetTypes.SUBSCRIPTION).result.data_sets[498 0].metadata.id499 if payload_data_set_id is None:500 print("Payload data set not found. Please check subscription status.")501 else:502 print("Payload data set id:", payload_data_set_id)503 wos_client.subscriptions.get(subscription_id).result.to_dict()504 # send a request to the model before we configure OpenScale.505 # This allows OpenScale to create a payload log in the datamart with the correct schema, so it can capture data coming into and out of the model.506 payload_scoring = get_scoring_payload(df, cols_to_remove, no_of_records_to_score=100)507 scoring_response = custom_ml_scoring(scoring_url, payload_scoring)508 scoring_id = payload_logging(payload_scoring, scoring_response, wos_client, payload_data_set_id)509 print('scoring_id: ' + str(scoring_id))510 fairness(wos_client, data_mart_id, subscription_id)511 sample_size = 2512 # explanation_task_ids = explainability(wos_client, data_mart_id, subscription_id, sample_size=2)513 # finished_explanations = finish_explanation_tasks(wos_client, explanation_task_ids, sample_size=2)514 # explanation_task_ids = ['8e71821a-cb9c-453e-a08f-ab8e3395aa11', '39bc9469-c956-43b8-be83-1a344be39367']515 # finished_explanations = []516 # while len(finished_explanations) < sample_size:517 # finished_explanations = finish_explanation_tasks(wos_client, explanation_task_ids, sample_size=2)518 # explanation_feature_map_plot(finished_explanations)519 quality(wos_client, subscription_id, feedback_file_path)520 drift_detection_input = {521 "feature_columns": feature_columns,522 "categorical_columns": cat_features,523 "label_column": label_column,524 "problem_type": model_type525 }...

Full Screen

Full Screen

monitor.py

Source:monitor.py Github

copy

Full Screen

1import logging2from asyncio import CancelledError3from exceptions import (4 JobAlreadyStarted,5 JobNotStarted,6 InvalidInterval,7 InvalidName,8 InvalidType,9 TooMuchArgument,10)11from mattermost.notify import notify, NOTIFICATION_SUCCESS12from models.monitor import MonitorModel13from monitors.implems import all_monitors14import scheduler15class Monitor:16 monitor_instances = {}17 @classmethod18 async def create(cls, monitor_conf, custom_conf):19 Monitor.validate_monitor_conf(monitor_conf)20 monitor_class = all_monitors[monitor_conf["type"]]21 await monitor_class.validate_custom_conf(custom_conf)22 model = await MonitorModel.create(monitor_conf, custom_conf)23 monitor = monitor_class(model)24 Monitor.monitor_instances[monitor_conf["name"]] = monitor25 return monitor26 @classmethod27 async def load_all(cls):28 for model in await MonitorModel.get_all():29 monitor_class = all_monitors[(await model.monitor_conf())["type"]]30 monitor = monitor_class(model)31 Monitor.monitor_instances[(await model.monitor_conf())["name"]] = monitor32 return Monitor.monitor_instances33 def __init__(self, model):34 self.job = None35 self.model = model36 async def notify(self, data, notification_type=NOTIFICATION_SUCCESS):37 monitor_conf = await self.get_monitor_conf()38 username = monitor_conf["name"]39 channel = monitor_conf["channel"] if "channel" in monitor_conf else None40 await notify(41 data,42 notification_type=notification_type,43 username=username,44 channel=channel,45 )46 async def get_custom_conf(self):47 return await self.model.custom_conf()48 async def set_custom_conf(self, conf):49 await self.validate_custom_conf(conf)50 await self.model.custom_conf(conf)51 async def get_monitor_conf(self):52 return await self.model.monitor_conf()53 async def set_monitor_conf(self, conf):54 return await self.model.monitor_conf(conf)55 async def get_state(self):56 return await self.model.state()57 async def set_state(self, state):58 await self.model.state(state)59 async def set_channel(self, channel=None):60 monitor_conf = await self.get_monitor_conf()61 if channel:62 monitor_conf["channel"] = channel63 elif "channel" in monitor_conf:64 del monitor_conf["channel"]65 await self.set_monitor_conf(monitor_conf)66 @classmethod67 def validate_monitor_conf(cls, conf):68 if (69 "interval" not in conf70 or type(conf["interval"]) is not int71 or conf["interval"] < 072 ):73 raise InvalidInterval74 if (75 "name" not in conf76 or conf["name"] in Monitor.monitor_instances77 or not conf["name"].isalnum()78 ):79 raise InvalidName80 if "type" not in conf or conf["type"] not in all_monitors:81 raise InvalidType82 if len(conf) > 3:83 raise TooMuchArgument84 async def job_start(self):85 if self.job_is_started():86 raise JobAlreadyStarted87 interval = (await self.get_monitor_conf())["interval"]88 self.job = scheduler.scheduler.add_job(89 self.do_job, "interval", seconds=interval90 )91 def job_stop(self):92 if not self.job_is_started():93 raise JobNotStarted94 self.job.remove()95 self.job = None96 def job_is_started(self):97 return self.job is not None98 async def do_job(self):99 # Todo: log it100 # Todo: try catch101 # Todo: Execution time102 name = (await self.get_monitor_conf())["name"]103 logging.warning(f"Executing job {name}")104 try:105 old_state = await self.get_state()106 new_state = await self.refresh()107 await self.compare(old_state, new_state)108 await self.set_state(new_state)109 except CancelledError: # pragma: no cover110 pass111 except Exception as e: # pragma: no cover112 if self.job is not None:113 raise e114 print("Exception on job due to killing it")115 async def remove(self):116 try:117 self.job_stop()118 except JobNotStarted:119 pass120 names = []121 for name in Monitor.monitor_instances:122 if Monitor.monitor_instances[name] is self:123 names.append(name)124 for name in names:125 del Monitor.monitor_instances[name]126 await self.model.remove()127 # To be implemented in implems128 async def refresh(self): # pragma: no cover129 raise NotImplemented130 async def compare(self, old_state, new_state): # pragma: no cover131 raise NotImplemented132 @classmethod133 async def validate_custom_conf(cls, conf): # pragma: no cover134 raise NotImplemented135 @classmethod136 async def test_me(cls, test_case): # pragma: no cover...

Full Screen

Full Screen

tromino.py

Source:tromino.py Github

copy

Full Screen

1import asyncio2from exceptions import JobAlreadyStarted3from mattermost.notify import notify4from monitors.monitor import Monitor5from scheduler import scheduler6from server import run_server7import monitors8async def main():9 await notify("Tromino: Starting...")10 scheduler.start()11 await notify("Tromino: Scheduler up")12 monitor_types = monitors.load_monitors()13 await notify(f"Tromino: Loading plugins - {', '.join([m for m in monitor_types])}")14 # job = scheduler.add_job(lambda: print(1), "interval", seconds=5)15 monitor_instances = await Monitor.load_all()16 await notify(f"Tromino: Loading monitor instances - {len(monitor_instances)}")17 for m in monitor_instances:18 try:19 await monitor_instances[m].job_start()20 await notify(f"Tromino: Starting {m}")21 except JobAlreadyStarted:22 pass23 await run_server()24 await notify("Tromino: HTTP Server stared")25 await notify("Tromino: Hello ! What could I do for you ?")26asyncio.ensure_future(main())...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful