Best Python code snippet using Contexts
test_harness_class.py
Source:test_harness_class.py
1from collections import defaultdict2from datetime import datetime3import os4import json5import time6import pandas as pd7import matplotlib.pyplot as plt8from six import string_types9from statistics import mean10import joblib11from copy import copy, deepcopy12from harness.run_classes import _BaseRun13from harness.test_harness_models_abstract_classes import ClassificationModel, RegressionModel14from harness.unique_id import get_id15from harness.utils.names import Names16from harness.utils.object_type_modifiers_and_checkers import is_list_of_strings, make_list_if_not_list17plt.switch_backend('agg')18pd.set_option('display.max_columns', 500)19pd.set_option('display.width', 10000)20pd.set_option('display.max_colwidth', -1)21# CSS classes applied to the Pandas Dataframes when written as HTML22css_classes = ["table-bordered", "table-striped", "table-compact"]23PWD = os.getcwd()24HERE = os.path.realpath(__file__)25PARENT = os.path.dirname(HERE)26DEFAULT_DATA_PATH = os.path.join(PWD, 'versioned_data/asap/')27OUTPUT = Names.NORMAL_OUTPUT28'''29NOTE: If a class variable is going to be modified (e.g. feature_cols_to_use is modified by sparse col functionality),30then you must make sure that a COPY of the variable is passed in! Otherwise the original variable will be modified too, leading to issues.31'''32# TODO: add ran-by (user) column to leaderboards33# TODO: add md5hashes of data to leaderboard as sorting tool34# TODO: add cross validation35# TODO: if test set doesn't include col_to_predict, carry out prediction instead?36# TODO: add more checks for correct inputs using assert37# TODO: add filelock or writing-scheduler so leaderboards are not overwritten at the same time. Might need to use SQL38# TODO: separate data description from split description39class TestHarness:40 def __init__(self, output_location=os.path.dirname(os.path.realpath(__file__)), output_csvs_of_leaderboards=False):41 # Note: loo stands for leave-one-out42 self.output_path = output_location43 self.output_csvs_of_leaderboards = output_csvs_of_leaderboards44 self.results_folder_path = os.path.join(self.output_path, 'test_harness_results')45 self.runs_folder_path = os.path.join(self.results_folder_path, 'runs')46 if not os.path.exists(self.results_folder_path):47 os.makedirs(self.results_folder_path, exist_ok=True)48 if not os.path.exists(self.runs_folder_path):49 os.makedirs(self.runs_folder_path, exist_ok=True)50 # add metrics here:51 self.classification_metrics = [Names.NUM_CLASSES, Names.ACCURACY, Names.BALANCED_ACCURACY, Names.AUC_SCORE,52 Names.AVERAGE_PRECISION, Names.F1_SCORE, Names.PRECISION, Names.RECALL]53 self.mean_classification_metrics = ["Mean " + cm for cm in self.classification_metrics]54 self.regression_metrics = [Names.R_SQUARED, Names.RMSE]55 self.mean_regression_metrics = ["Mean " + rm for rm in self.regression_metrics]56 self.metric_to_sort_classification_results_by = Names.AVERAGE_PRECISION57 self.metric_to_sort_regression_results_by = Names.R_SQUARED58 custom_cols_1 = [Names.RUN_ID, Names.DATE, Names.TIME, Names.MODEL_NAME, Names.MODEL_AUTHOR]59 custom_cols_2 = [Names.SAMPLES_IN_TRAIN, Names.SAMPLES_IN_TEST, Names.MODEL_DESCRIPTION, Names.COLUMN_PREDICTED,60 Names.NUM_FEATURES_USED, Names.DATA_AND_SPLIT_DESCRIPTION, Names.NORMALIZED, Names.NUM_FEATURES_NORMALIZED,61 Names.FEATURE_EXTRACTION, Names.WAS_UNTESTED_PREDICTED]62 self.custom_classification_leaderboard_cols = custom_cols_1 + self.classification_metrics + custom_cols_263 self.custom_regression_leaderboard_cols = custom_cols_1 + self.regression_metrics + custom_cols_264 loo_cols_1 = [Names.LOO_ID] + custom_cols_165 loo_cols_2 = custom_cols_2[:]66 loo_cols_2.remove(Names.WAS_UNTESTED_PREDICTED)67 loo_cols_2.insert(5, Names.TEST_GROUP)68 self.loo_full_classification_leaderboard_cols = loo_cols_1 + self.classification_metrics + loo_cols_269 self.loo_full_regression_leaderboard_cols = loo_cols_1 + self.regression_metrics + loo_cols_270 summarized_cols_1 = loo_cols_1[:]71 summarized_cols_1.remove(Names.RUN_ID)72 summarized_cols_2 = [Names.MODEL_DESCRIPTION, Names.COLUMN_PREDICTED, Names.NUM_FEATURES_USED, Names.DATA_DESCRIPTION,73 Names.GROUPING_DESCRIPTION, Names.NORMALIZED, Names.NUM_FEATURES_NORMALIZED, Names.FEATURE_EXTRACTION]74 self.loo_summarized_classification_leaderboard_cols = summarized_cols_1 + self.mean_classification_metrics + summarized_cols_275 self.loo_summarized_regression_leaderboard_cols = summarized_cols_1 + self.mean_regression_metrics + summarized_cols_276 self.leaderboard_names_dict = {Names.CUSTOM_CLASS_LBOARD: self.custom_classification_leaderboard_cols,77 Names.CUSTOM_REG_LBOARD: self.custom_regression_leaderboard_cols,78 Names.LOO_SUMM_CLASS_LBOARD: self.loo_summarized_classification_leaderboard_cols,79 Names.LOO_SUMM_REG_LBOARD: self.loo_summarized_regression_leaderboard_cols,80 Names.LOO_FULL_CLASS_LBOARD: self.loo_full_classification_leaderboard_cols,81 Names.LOO_FULL_REG_LBOARD: self.loo_full_regression_leaderboard_cols}82 self.valid_feature_extraction_methods = [Names.ELI5_PERMUTATION,83 Names.RFPIMP_PERMUTATION,84 Names.BBA_AUDIT,85 Names.SHAP_AUDIT]86 self.list_of_this_instance_run_ids = []87 self.dict_of_instance_run_loo_ids = defaultdict(list)88 print()89 # TODO: add more normalization options: http://benalexkeen.com/feature-scaling-with-scikit-learn/90 def run_custom(self, function_that_returns_TH_model, dict_of_function_parameters, training_data, testing_data,91 data_and_split_description, cols_to_predict, feature_cols_to_use, index_cols=("dataset", "name"), normalize=False,92 feature_cols_to_normalize=None, feature_extraction=False, predict_untested_data=False, sparse_cols_to_use=None,93 interpret_complex_model=False, custom_metric=False):94 """95 Instantiates and runs a model on a custom train/test split96 If you pass in a list of columns to predict, a separate run will occur for each string in the list97 :param custom_metric: dict with string keys and values are functions that take two arguuments. Not tested with LOO runs.98 """99 cols_to_predict = make_list_if_not_list(cols_to_predict)100 assert is_list_of_strings(cols_to_predict), "cols_to_predict must be a string or a list of strings"101 feature_cols_to_use = make_list_if_not_list(feature_cols_to_use)102 if feature_cols_to_normalize:103 feature_cols_to_normalize = make_list_if_not_list(feature_cols_to_normalize)104 if sparse_cols_to_use:105 sparse_cols_to_use = make_list_if_not_list(sparse_cols_to_use)106 if custom_metric:107 assert isinstance(custom_metric, dict), "custom_metric must be a dict whose key is a string and value is a function"108 self.regression_metrics.extend(list(custom_metric.keys()))109 self.custom_regression_leaderboard_cols.extend(list(custom_metric.keys()))110 for col in cols_to_predict:111 self._execute_run(function_that_returns_TH_model, dict_of_function_parameters, training_data, testing_data,112 data_and_split_description, col, feature_cols_to_use, index_cols, normalize, feature_cols_to_normalize,113 feature_extraction, predict_untested_data, sparse_cols_to_use, loo_dict=False,114 interpret_complex_model=interpret_complex_model, custom_metric=custom_metric)115 def make_grouping_df(self, grouping, data):116 # if grouping is a string, turn it into a list containing that one string117 if isinstance(grouping, string_types):118 grouping = make_list_if_not_list(grouping)119 # if grouping is a list of strings:120 # 1. check if those strings exist as column names in the data Dataframe121 # 2. then create a grouping Dataframe based on the unique values in those columns122 data_cols = data.columns.values.tolist()123 if is_list_of_strings(grouping):124 # this for loop check is similar to the one for the grouping_df, but I like to have this one too for a clearer error message125 for col_name in grouping:126 assert (col_name in data_cols), \127 "{} does not exist as a column in the data Dataframe. " \128 "If you pass in a list of strings to the 'grouping' argument, " \129 "then all of those strings must exist as columns in the data Dataframe.".format(col_name)130 grouping_df = data.groupby(by=grouping, as_index=False).first()[grouping]131 grouping_df[Names.GROUP_INDEX] = grouping_df.index132 elif isinstance(grouping, pd.DataFrame):133 grouping_df = grouping.copy()134 else:135 raise ValueError("grouping must be a list of column names in the data Dataframe, "136 "or a Pandas Dataframe that defines custom groupings (see the Test Harness README for an example).")137 # TODO: add example grouping_df to README138 # grouping_df checks:139 # 1. "group_index" must exist as a column in grouping_df140 # 2. every other column in grouping_df must also be a column in the data Dataframe141 grouping_df_cols = grouping_df.columns.values.tolist()142 assert (Names.GROUP_INDEX in grouping_df_cols), "grouping_df must have a '{}' column.".format(143 Names.GROUP_INDEX)144 cols_to_group_on = [col for col in grouping_df_cols if col != Names.GROUP_INDEX]145 for col_name in cols_to_group_on:146 assert (col_name in data_cols,147 "{} is a column in grouping_df but does not exist as a column in the data Dataframe. " \148 "Every column in grouping_df (other than '{}') must also be a column in the data Dataframe.".format(149 col_name,150 Names.GROUP_INDEX))151 return grouping_df, data_cols, cols_to_group_on152 # TODO: add sparse cols to leave one out153 def run_leave_one_out(self, function_that_returns_TH_model, dict_of_function_parameters, data, data_description, grouping,154 grouping_description, cols_to_predict, feature_cols_to_use, index_cols=("dataset", "name"), normalize=False,155 feature_cols_to_normalize=None, feature_extraction=False,sparse_cols_to_use=None):156 """157 Splits the data into appropriate train/test splits according to the grouping dataframe, and then runs a separate instantiation of158 the passed-in model on each split.159 """160 date_loo_ran = datetime.now().strftime("%Y-%m-%d")161 time_loo_ran = datetime.now().strftime("%H:%M:%S")162 cols_to_predict = make_list_if_not_list(cols_to_predict)163 feature_cols_to_use = make_list_if_not_list(feature_cols_to_use)164 if feature_cols_to_normalize:165 feature_cols_to_normalize = make_list_if_not_list(feature_cols_to_normalize)166 num_features_normalized = len(feature_cols_to_normalize)167 else:168 num_features_normalized = 0169 assert isinstance(data, pd.DataFrame), "data must be a Pandas Dataframe"170 assert isinstance(data_description, string_types), "data_description must be a string"171 assert isinstance(grouping_description, string_types), "grouping_description must be a string"172 assert is_list_of_strings(cols_to_predict), "cols_to_predict must be a string or a list of strings"173 grouping_df, data_cols, cols_to_group_on = self.make_grouping_df(grouping, data)174 # Append a "group_index" column to the all_data Dataframe. This column contains the group number of each row.175 # The values of the "group_index" column are determined from the grouping Dataframe (grouping_df)176 all_data = data.copy()177 all_data = pd.merge(left=all_data, right=grouping_df, how="left", on=cols_to_group_on)178 for col in cols_to_predict:179 loo_id = get_id()180 loo_folder_path = os.path.join(self.runs_folder_path, '{}_{}'.format("loo", loo_id))181 os.makedirs(loo_folder_path, exist_ok=False)182 data.to_csv(os.path.join(loo_folder_path, "data.csv"), index=False)183 grouping_df.to_csv(os.path.join(loo_folder_path, "grouping_df.csv"), index=False)184 dummy_th_model = function_that_returns_TH_model(**dict_of_function_parameters)185 if isinstance(dummy_th_model, ClassificationModel):186 task_type = "Classification"187 elif isinstance(dummy_th_model, RegressionModel):188 task_type = "Regression"189 else:190 raise ValueError("function_that_returns_TH_model must return a ClassificationModel or a RegressionModel.")191 # iterate through the groups (determined by "group_index" column) in the all_data Dataframe:192 for i, group_index in enumerate(list(set(all_data[Names.GROUP_INDEX]))):193 data_and_split_description = "{}".format(data_description)194 group_rows = grouping_df.loc[grouping_df[Names.GROUP_INDEX] == group_index]195 group_info = group_rows.to_dict(orient='list')196 print("Creating test split based on {} {}".format(Names.GROUP_INDEX, group_index))197 print("example groupingdf row for the loo group: {}".format(group_rows.iloc[0]))198 if OUTPUT == Names.VERBOSE_OUTPUT:199 print("Defined by: {}".format(group_info))200 train_split = all_data.copy()201 test_split = all_data.copy()202 train_split = train_split.loc[train_split[Names.GROUP_INDEX] != group_index]203 test_split = test_split.loc[test_split[Names.GROUP_INDEX] == group_index]204 print("Number of samples in train split:", train_split.shape)205 print("Number of samples in test split:", test_split.shape)206 loo_dict = {"loo_id": loo_id, "task_type": task_type, "data_description": data_description,207 "grouping_description": grouping_description, "group_info": group_info}208 self._execute_run(function_that_returns_TH_model=function_that_returns_TH_model,209 dict_of_function_parameters=dict_of_function_parameters,210 training_data=train_split,211 testing_data=test_split,212 data_and_split_description=data_and_split_description,213 col_to_predict=col,214 feature_cols_to_use=feature_cols_to_use,215 index_cols=index_cols,216 normalize=normalize,217 feature_cols_to_normalize=feature_cols_to_normalize,218 feature_extraction=feature_extraction,219 predict_untested_data=False,220 sparse_cols_to_use=sparse_cols_to_use,221 loo_dict=loo_dict,222 interpret_complex_model=False)223 # summary results are calculated here, and summary leaderboards are updated224 summary_values = {Names.LOO_ID: loo_id, Names.DATE: date_loo_ran, Names.TIME: time_loo_ran,225 Names.MODEL_NAME: dummy_th_model.model_name, Names.MODEL_AUTHOR: dummy_th_model.model_author,226 Names.MODEL_DESCRIPTION: dummy_th_model.model_description, Names.COLUMN_PREDICTED: col,227 Names.NUM_FEATURES_USED: len(feature_cols_to_use), Names.DATA_DESCRIPTION: data_description,228 Names.GROUPING_DESCRIPTION: grouping_description, Names.NORMALIZED: normalize,229 Names.NUM_FEATURES_NORMALIZED: num_features_normalized, Names.FEATURE_EXTRACTION: feature_extraction}230 if task_type == "Classification":231 self.output_classification_leaderboard_to_csv(summary_values, loo_id)232 elif task_type == "Regression":233 self.output_regression_leaderboard_to_csv(summary_values, loo_id)234 else:235 raise TypeError("task_type must be 'Classification' or 'Regression'.")236 def output_classification_leaderboard_to_csv(self, summary_values, loo_id):237 detailed_leaderboard_name = Names.LOO_FULL_CLASS_LBOARD238 detailed_leaderboard_path = os.path.join(self.results_folder_path, "{}.html".format(detailed_leaderboard_name))239 detailed_leaderboard = pd.read_html(detailed_leaderboard_path)[0]240 this_loo_results = detailed_leaderboard.loc[detailed_leaderboard[Names.LOO_ID] == loo_id]241 summary_metrics = {}242 for metric, mean_metric in zip(self.classification_metrics, self.mean_classification_metrics):243 summary_metrics[mean_metric] = mean(this_loo_results[metric])244 # TODO: add standard deviation with pstdev245 summary_values.update(summary_metrics)246 # Update summary leaderboard247 summary_leaderboard_name = Names.LOO_SUMM_CLASS_LBOARD248 summary_leaderboard_cols = self.loo_summarized_classification_leaderboard_cols249 # first check if leaderboard exists and create empty leaderboard if it doesn't250 html_path = os.path.join(self.results_folder_path, "{}.html".format(summary_leaderboard_name))251 try:252 summary_leaderboard = pd.read_html(html_path)[0]253 except (IOError, ValueError):254 summary_leaderboard = pd.DataFrame(columns=summary_leaderboard_cols)255 # update leaderboard with new entry (row_of_results) and sort it based on run type256 summary_leaderboard = summary_leaderboard.append(summary_values, ignore_index=True, sort=False)257 sort_metric = "Mean " + self.metric_to_sort_classification_results_by258 summary_leaderboard.sort_values(sort_metric, inplace=True, ascending=False)259 summary_leaderboard.reset_index(inplace=True, drop=True)260 # overwrite old leaderboard with updated leaderboard261 summary_leaderboard.to_html(html_path, index=False, classes=summary_leaderboard_name)262 if self.output_csvs_of_leaderboards is True:263 csv_path = os.path.join(self.results_folder_path, "{}.csv".format(summary_leaderboard_name))264 summary_leaderboard.to_csv(csv_path, index=False)265 def output_regression_leaderboard_to_csv(self, summary_values, loo_id):266 detailed_leaderboard_name = Names.LOO_FULL_REG_LBOARD267 detailed_leaderboard_path = os.path.join(self.results_folder_path, "{}.html".format(detailed_leaderboard_name))268 detailed_leaderboard = pd.read_html(detailed_leaderboard_path)[0]269 this_loo_results = detailed_leaderboard.loc[detailed_leaderboard[Names.LOO_ID] == loo_id]270 summary_metrics = {}271 for metric, mean_metric in zip(self.regression_metrics, self.mean_regression_metrics):272 summary_metrics[mean_metric] = mean(this_loo_results[metric])273 # TODO: add standard deviation with pstdev274 summary_values.update(summary_metrics)275 # Update summary leaderboard276 summary_leaderboard_name = Names.LOO_SUMM_REG_LBOARD277 summary_leaderboard_cols = self.loo_summarized_regression_leaderboard_cols278 # first check if leaderboard exists and create empty leaderboard if it doesn't279 html_path = os.path.join(self.results_folder_path, "{}.html".format(summary_leaderboard_name))280 try:281 summary_leaderboard = pd.read_html(html_path)[0]282 except (IOError, ValueError):283 summary_leaderboard = pd.DataFrame(columns=summary_leaderboard_cols)284 # update leaderboard with new entry (row_of_results) and sort it based on run type285 summary_leaderboard = summary_leaderboard.append(summary_values, ignore_index=True, sort=False)286 sort_metric = "Mean " + self.metric_to_sort_regression_results_by287 print("Leave-One-Out Summary Leaderboard:\n")288 print(summary_leaderboard)289 summary_leaderboard.sort_values(sort_metric, inplace=True, ascending=False)290 summary_leaderboard.reset_index(inplace=True, drop=True)291 # overwrite old leaderboard with updated leaderboard292 summary_leaderboard.to_html(html_path, index=False, classes=summary_leaderboard_name)293 if self.output_csvs_of_leaderboards is True:294 csv_path = os.path.join(self.results_folder_path, "{}.csv".format(summary_leaderboard_name))295 summary_leaderboard.to_csv(csv_path, index=False)296 def validate_execute_run_inputs(self, function_that_returns_TH_model, dict_of_function_parameters, training_data, testing_data,297 data_and_split_description, col_to_predict, feature_cols_to_use, index_cols, normalize,298 feature_cols_to_normalize, feature_extraction, predict_untested_data, sparse_cols_to_use, custom_metric):299 # Single strings are included in the assert error messages because the make_list_if_not_list function was used300 assert callable(function_that_returns_TH_model), \301 "function_that_returns_TH_model must be a function that returns a TestHarnessModel object"302 assert isinstance(dict_of_function_parameters, dict), \303 "dict_of_function_parameters must be a dictionary of parameters for the function_that_returns_TH_model function."304 assert isinstance(training_data, pd.DataFrame), "training_data must be a Pandas Dataframe"305 assert isinstance(testing_data, pd.DataFrame), "testing_data must be a Pandas Dataframe"306 assert isinstance(data_and_split_description, string_types), "data_and_split_description must be a string"307 assert isinstance(col_to_predict, string_types), "col_to_predict must be a string"308 assert is_list_of_strings(feature_cols_to_use), "feature_cols_to_use must be a string or a list of strings"309 assert isinstance(normalize, bool), "normalize must be True or False"310 assert (feature_cols_to_normalize is None) or is_list_of_strings(feature_cols_to_normalize), \311 "feature_cols_to_normalize must be None, a string, or a list of strings"312 assert isinstance(feature_extraction, bool) or (feature_extraction in self.valid_feature_extraction_methods), \313 "feature_extraction must be a bool or one of the following strings: {}".format(self.valid_feature_extraction_methods)314 assert (predict_untested_data is False) or (isinstance(predict_untested_data, pd.DataFrame)), \315 "predict_untested_data must be False or a Pandas Dataframe"316 assert (sparse_cols_to_use is None) or is_list_of_strings(sparse_cols_to_use), \317 "sparse_cols_to_use must be None, a string, or a list of strings"318 assert (index_cols is None) or (isinstance(index_cols, list)) or (isinstance(index_cols, tuple)), \319 "index_cols must be None or a list (or tuple) of index column names in the passed-in training, testing, and prediction data."320 if isinstance(index_cols, tuple):321 index_cols = list(index_cols)322 if isinstance(index_cols, list):323 assert is_list_of_strings(index_cols), "if index_cols is a tuple or list, it must contain only strings."324 if custom_metric:325 assert type(custom_metric) is dict, 'Custom metric must be of type dict. Key should be string, and value should a be a function that takes in two arguuments.'326 # check if index_cols exist in training, testing, and prediction dataframes:327 assert (set(index_cols).issubset(training_data.columns.tolist())), \328 "the strings in index_cols are not valid columns in training_data."329 assert (set(index_cols).issubset(testing_data.columns.tolist())), \330 "the strings in index_cols are not valid columns in testing_data."331 if isinstance(predict_untested_data, pd.DataFrame):332 assert (set(index_cols).issubset(predict_untested_data.columns.tolist())), \333 "the strings in index_cols are not valid columns in predict_untested_data."334 # TODO: replace loo_dict with type_dict --> first entry is run type --> this will allow for more types in the future335 def _execute_run(self, function_that_returns_TH_model, dict_of_function_parameters, training_data, testing_data,336 data_and_split_description, col_to_predict, feature_cols_to_use, index_cols=("dataset", "name"), normalize=False,337 feature_cols_to_normalize=None, feature_extraction=False, predict_untested_data=False, sparse_cols_to_use=None,338 loo_dict=False, interpret_complex_model=False, custom_metric=False):339 """340 1. Instantiates the TestHarnessModel object341 2. Creates a _BaseRun object and calls their train_and_test_model and calculate_metrics methods342 3. Calls _output_results(Run Object)343 """344 # TODO: add checks to ensure index_cols represent unique values in training, testing, and prediction dataframes345 self.validate_execute_run_inputs(function_that_returns_TH_model, dict_of_function_parameters, training_data, testing_data,346 data_and_split_description, col_to_predict, feature_cols_to_use, index_cols, normalize,347 feature_cols_to_normalize, feature_extraction, predict_untested_data, sparse_cols_to_use,custom_metric)348 train_df, test_df = training_data.copy(), testing_data.copy()349 if isinstance(predict_untested_data, pd.DataFrame):350 pred_df = predict_untested_data.copy()351 else:352 pred_df = False353 # for each col in index_cols, create a copy with and "unchanged_" prefix added, because later we want to354 # output the original column that hasn't been changed by operations such as normalization355 for col in index_cols:356 train_df["unchanged_{}".format(col)] = train_df[col]357 test_df["unchanged_{}".format(col)] = test_df[col]358 if isinstance(pred_df, pd.DataFrame):359 pred_df["unchanged_{}".format(col)] = pred_df[col]360 test_harness_model = function_that_returns_TH_model(**dict_of_function_parameters)361 # This is the one and only time _BaseRun is invoked362 run_object = _BaseRun(test_harness_model, train_df, test_df, data_and_split_description, col_to_predict,363 copy(feature_cols_to_use), copy(index_cols), normalize, copy(feature_cols_to_normalize), feature_extraction,364 pred_df, copy(sparse_cols_to_use), loo_dict, interpret_complex_model, custom_metric)365 # tracking the run_ids of all the runs that were kicked off in this TestHarness instance366 loo_id = None367 if loo_dict:368 loo_id = run_object.loo_dict.get('loo_id')369 if loo_id is not None:370 self.dict_of_instance_run_loo_ids[loo_id].append(run_object.run_id)371 else:372 self.list_of_this_instance_run_ids.append(run_object.run_id)373 # call run object methods374 start = time.time()375 # this adds a line of dashes to signify the beginning of the model run376 print('-' * 100)377 print('Starting run of model {} at time {}'.format(datetime.now().strftime("%H:%M:%S"), function_that_returns_TH_model.__name__))378 run_object.train_and_test_model()379 run_object.calculate_metrics()380 if run_object.feature_extraction is not False:381 from harness.feature_extraction import FeatureExtractor382 feature_extractor = FeatureExtractor(base_run_instance=run_object)383 feature_extractor.feature_extraction_method(method=run_object.feature_extraction)384 else:385 feature_extractor = None386 # ----------------------------------387 # model on model388 if interpret_complex_model:389 run_object.interpret_model(390 complex_model=run_object.test_harness_model.model,391 training_df=run_object.training_data,392 feature_col=run_object.feature_cols_to_use,393 predict_col=run_object.col_to_predict,394 simple_model=None)395 # ----------------------------------396 # output results of run object by updating the appropriate leaderboard(s) and writing files to disk397 # Pandas append docs: "Columns not in this frame are added as new columns" --> don't worry about adding new leaderboard cols398 self._update_leaderboard(run_object)399 if run_object.loo_dict is False:400 run_id_folder_path = os.path.join(self.runs_folder_path, '{}_{}'.format("run", run_object.run_id))401 os.makedirs(run_id_folder_path)402 self._output_run_files(run_object, run_id_folder_path, True, feature_extractor)403 else:404 loo_id = run_object.loo_dict['loo_id']405 loo_path = os.path.join(self.runs_folder_path, '{}_{}'.format("loo", loo_id))406 os.makedirs(loo_path, exist_ok=True)407 run_id_folder_path = os.path.join(loo_path, '{}_{}'.format("run", run_object.run_id))408 os.makedirs(run_id_folder_path)409 self._output_run_files(run_object, run_id_folder_path, True, feature_extractor)410 end = time.time()411 print('Run finished at {}.'.format(datetime.now().strftime("%H:%M:%S")), 'Total run time = {0:.2f} seconds'.format(end - start))412 # this adds a line of ^ to signify the end of of the model run413 print('^' * 100)414 print("\n\n\n")415 def _update_leaderboard(self, run_object):416 # find appropriate leaderboard to update based on run_object characteristics417 if run_object.loo_dict is False:418 if run_object.run_type == Names.CLASSIFICATION:419 leaderboard_name = Names.CUSTOM_CLASS_LBOARD420 elif run_object.run_type == Names.REGRESSION:421 leaderboard_name = Names.CUSTOM_REG_LBOARD422 else:423 raise TypeError("run_object.run_type must equal '{}' or '{}'".format(Names.CLASSIFICATION, Names.REGRESSION))424 else:425 if run_object.run_type == Names.CLASSIFICATION:426 leaderboard_name = Names.LOO_FULL_CLASS_LBOARD427 elif run_object.run_type == Names.REGRESSION:428 leaderboard_name = Names.LOO_FULL_REG_LBOARD429 else:430 raise TypeError("run_object.run_type must equal '{}' or '{}'".format(Names.CLASSIFICATION, Names.REGRESSION))431 assert leaderboard_name in self.leaderboard_names_dict.keys(), "passed-in leaderboard_name is not valid."432 leaderboard_cols = self.leaderboard_names_dict[leaderboard_name]433 # first check if leaderboard exists and create empty leaderboard if it doesn't434 html_path = os.path.join(self.results_folder_path, "{}.html".format(leaderboard_name))435 try:436 leaderboard = pd.read_html(html_path)[0]437 except (IOError, ValueError):438 leaderboard = pd.DataFrame(columns=leaderboard_cols)439 # create leaderboard entry for this run and add two LOO-specific columns if loo_dict exists440 row_of_results = self._create_row_entry(run_object)441 if run_object.loo_dict is not False:442 row_of_results[Names.LOO_ID] = run_object.loo_dict["loo_id"]443 row_of_results[Names.TEST_GROUP] = str(run_object.loo_dict["group_info"])444 if OUTPUT == Names.VERBOSE_OUTPUT:445 print()446 print(row_of_results)447 print()448 # update leaderboard with new entry (row_of_results) and sort it based on run type449 leaderboard = leaderboard.append(row_of_results, ignore_index=True, sort=False) # sort=False prevents columns from reordering450 # If the custom metric is changed or removed,451 # then make sure you put NaN in the slot that you had before so that you don't lose that column452 if len(set(leaderboard.columns).symmetric_difference(row_of_results.columns)) > 0:453 cols = set(leaderboard.columns).symmetric_difference(row_of_results.columns)454 for col in cols:455 row_of_results[col] = 'NaN'456 leaderboard = leaderboard.reindex(row_of_results.columns, axis=1) # reindex will correct col order in case a new col is added457 if run_object.run_type == Names.CLASSIFICATION:458 leaderboard.sort_values(self.metric_to_sort_classification_results_by, inplace=True, ascending=False)459 elif run_object.run_type == Names.REGRESSION:460 # print(leaderboard[self.metric_to_sort_regression_results_by].value_counts(dropna=False))461 leaderboard.sort_values(self.metric_to_sort_regression_results_by, inplace=True, ascending=False)462 else:463 raise TypeError("run_object.run_type must equal '{}' or '{}'".format(Names.CLASSIFICATION, Names.REGRESSION))464 leaderboard.reset_index(inplace=True, drop=True)465 # overwrite old leaderboard with updated leaderboard466 leaderboard.to_html(html_path, index=False, classes=leaderboard_name)467 if self.output_csvs_of_leaderboards is True:468 csv_path = os.path.join(self.results_folder_path, "{}.csv".format(leaderboard_name))469 leaderboard.to_csv(csv_path, index=False)470 def _create_row_entry(self, run_object):471 row_values = {Names.RUN_ID: run_object.run_id, Names.DATE: run_object.date_ran, Names.TIME: run_object.time_ran,472 Names.SAMPLES_IN_TRAIN: run_object.metrics_dict[Names.SAMPLES_IN_TRAIN],473 Names.SAMPLES_IN_TEST: run_object.metrics_dict[Names.SAMPLES_IN_TEST],474 Names.MODEL_NAME: run_object.model_name, Names.MODEL_AUTHOR: run_object.model_author,475 Names.MODEL_DESCRIPTION: run_object.model_description, Names.COLUMN_PREDICTED: run_object.col_to_predict,476 Names.NUM_FEATURES_USED: run_object.metrics_dict[Names.NUM_FEATURES_USED],477 Names.DATA_AND_SPLIT_DESCRIPTION: run_object.data_and_split_description, Names.NORMALIZED: run_object.normalize,478 Names.NUM_FEATURES_NORMALIZED: run_object.metrics_dict[Names.NUM_FEATURES_NORMALIZED],479 Names.FEATURE_EXTRACTION: run_object.feature_extraction,480 Names.WAS_UNTESTED_PREDICTED: run_object.was_untested_data_predicted}481 if run_object.run_type == Names.CLASSIFICATION:482 # extract relevant metrics from run_object.metrics_dict and round to 3rd decimal place:483 metric_results = {metric: round(run_object.metrics_dict[metric], 3) for metric in self.classification_metrics}484 row_values.update(metric_results)485 row_of_results = pd.DataFrame(columns=self.custom_classification_leaderboard_cols)486 row_of_results = row_of_results.append(row_values, ignore_index=True, sort=False)487 elif run_object.run_type == Names.REGRESSION:488 # extract relevant metrics from run_object.metrics_dict and round to 3rd decimal place:489 metric_results = {metric: round(run_object.metrics_dict[metric], 3) for metric in self.regression_metrics}490 row_values.update(metric_results)491 row_of_results = pd.DataFrame(columns=self.custom_regression_leaderboard_cols)492 row_of_results = row_of_results.append(row_values, ignore_index=True, sort=False)493 else:494 raise ValueError("run_object.run_type must be {} or {}".format(Names.REGRESSION, Names.CLASSIFICATION))495 return row_of_results496 def _output_run_files(self, run_object, output_path, output_data_csvs=True, feature_extractor=None):497 if output_data_csvs:498 # using index_cols and prediction/ranking cols to only output subset of dataframe.499 # using unchanged_index_cols to get names of columns that were created in execute_run for later output.500 # thus what is output are the original input columns and not transformed input columns (e.g. if normalization is used)501 unchanged_index_cols = ["unchanged_{}".format(x) for x in run_object.index_cols]502 # creating list of cols to output for train, test, and pred outputs503 train_cols_to_output = unchanged_index_cols + [run_object.col_to_predict]504 if run_object.run_type == Names.CLASSIFICATION:505 test_cols_to_output = train_cols_to_output + [run_object.predictions_col, run_object.prob_predictions_col]506 pred_cols_to_output = unchanged_index_cols + [run_object.predictions_col, run_object.prob_predictions_col,507 run_object.rankings_col]508 elif run_object.run_type == Names.REGRESSION:509 test_cols_to_output = unchanged_index_cols + [run_object.predictions_col, run_object.residuals_col]510 pred_cols_to_output = unchanged_index_cols + [run_object.predictions_col, run_object.rankings_col]511 else:512 raise ValueError("run_object.run_type must be {} or {}".format(Names.REGRESSION, Names.CLASSIFICATION))513 train_df_to_output = run_object.training_data[train_cols_to_output].copy()514 for col in unchanged_index_cols:515 train_df_to_output.rename(columns={col: col.rsplit("unchanged_")[1]}, inplace=True)516 train_df_to_output.to_csv('{}/{}'.format(output_path, 'training_data.csv'), index=False)517 test_df_to_output = run_object.testing_data_predictions[test_cols_to_output].copy()518 for col in unchanged_index_cols:519 test_df_to_output.rename(columns={col: col.rsplit("unchanged_")[1]}, inplace=True)520 test_df_to_output.to_csv('{}/{}'.format(output_path, 'testing_data.csv'), index=False)521 if run_object.was_untested_data_predicted is not False:522 prediction_data_to_output = run_object.untested_data_predictions[pred_cols_to_output].copy()523 for col in unchanged_index_cols:524 prediction_data_to_output.rename(columns={col: col.rsplit("unchanged_")[1]}, inplace=True)525 prediction_data_to_output.to_csv('{}/{}'.format(output_path, 'predicted_data.csv'), index=False)526 if run_object.feature_extraction is not False:527 from harness.feature_extraction import FeatureExtractor528 assert isinstance(feature_extractor, FeatureExtractor), \529 "feature_extractor must be a FeatureExtractor object when run_object.feature_extraction is not False."530 feature_extractor.feature_importances.to_csv('{}/{}'.format(output_path, 'feature_importances.csv'), index=False)531 if run_object.feature_extraction == Names.SHAP_AUDIT:532 shap_path = os.path.join(output_path, 'SHAP')533 if not os.path.exists(shap_path):534 os.makedirs(shap_path)535 dependence_path = os.path.join(shap_path, 'feature_dependence_plots')536 if not os.path.exists(dependence_path):537 os.makedirs(dependence_path)538 # feature_extractor.shap_values.to_csv('{}/{}'.format(shap_path, 'shap_values.csv'), index=False)539 for name, plot in feature_extractor.shap_plots_dict.items():540 if "dependence_plot" in name:541 plot.savefig(os.path.join(dependence_path, name), bbox_inches="tight")542 else:543 plot.savefig(os.path.join(shap_path, name), bbox_inches="tight")544 if run_object.feature_extraction == Names.BBA_AUDIT:545 bba_path = os.path.join(output_path, 'BBA')546 if not os.path.exists(bba_path):547 os.makedirs(bba_path)548 for name, plot in feature_extractor.bba_plots_dict.items():549 plot.savefig(os.path.join(bba_path, name), bbox_inches="tight")550 # model on model 551 if run_object.interpret_complex_model is True:552 import pydotplus553 img_string_path = os.path.join(output_path, 'Complex_Model_Interpretation')554 if not os.path.exists(img_string_path):555 os.makedirs(img_string_path)556 img_string = run_object.model_interpretation_img.getvalue()557 with open(os.path.join(img_string_path, 'model_interpretation_string.txt'), 'w') as f:558 f.write(img_string)559 f.close()560 image_path = os.path.join(output_path, 'Complex_Model_Interpretation')561 if not os.path.exists(image_path):562 os.makedirs(image_path)563 img = pydotplus.graph_from_dot_data(run_object.model_interpretation_img.getvalue())564 img.write_png(os.path.join(image_path, 'model_interpretation.png'))565 test_file_name = os.path.join(output_path, 'model_information.txt')566 with open(test_file_name, "w") as f:567 f.write("%s\n" % run_object.model_name)568 f.write("Feature columns used by model: \n")569 json.dump(run_object.feature_cols_to_use, f)570 f.write("\n\n\n")571 f.write("Model Instantiation Trace:\n")572 for i, t in enumerate(run_object.model_stack_trace):573 f.write(" Level {}\n".format(i))574 path, line, func = t[1:4]575 f.write(' - Path: ' + path + '\n')576 f.write(' - Line: ' + str(line) + ', Function: ' + str(func) + '\n')577 f.write("\n")578 if run_object.normalization_scaler_object is not None:579 joblib.dump(run_object.normalization_scaler_object, os.path.join(output_path, "normalization_scaler_object.pkl"))580 def print_leaderboards(self):...
test_runs.py
Source:test_runs.py
1import json2from copy import deepcopy3from tests.base import BaseTestCase4sample_run_object = {5 "data": {6 "type": "run",7 "attributes": {8 "start_time": "2020-01-20T16:34:34.838199",9 "end_time": "2020-01-20T16:54:45.838199",10 "start_lat": "12.8947909",11 "start_lng": "77.6427151",12 "end_lat": "12.8986343",13 "end_lng": "77.656089",14 "distance": "3100"15 },16 "relationships": {17 "user": {18 "data": {19 "type": "user",20 "id": "user1"21 }22 }23 }24 }25 }26class TestRunsEndpoint(BaseTestCase):27 def test_create_new_run(self):28 user_id = "user1"29 self.create_user(user_id)30 run_object = deepcopy(sample_run_object)31 user_token = self.get_login_token(user_id)32 # Without user token33 response = self.make_post_request("/runs", run_object)34 self.assert_content_type_and_status(response, 401)35 message = response.get_json()['message']36 self.assertEqual(message, "Missing Authorization Header")37 response = self.make_post_request("/runs", run_object, user_token)38 self.assert_content_type_and_status(response, 201)39 json_response = response.get_json()40 # Check if weather info is present41 data = json_response['data']42 self.assertIsNotNone(json.loads(data['attributes'].get('weather_info')))43 # Check if relationships info is present44 self.assertEqual('/users/user1', data['relationships']['user']['links']['related'])45 # Without relationships46 del run_object['data']['relationships']47 response = self.make_post_request("/runs", run_object, user_token)48 self.assert_content_type_and_status(response, 403)49 self.assertIn(b'Please provide a User relationship for the Run', response.data)50 def test_create_new_run_user_mismatch(self):51 user_id = "user1"52 self.create_user(user_id)53 self.create_user("user2")54 run_object = deepcopy(sample_run_object)55 user_1_token = self.get_login_token(user_id)56 run_object['data']['relationships']['user']['data']['id'] = 'user2'57 response = self.make_post_request("/runs", run_object, user_1_token)58 self.assert_content_type_and_status(response, 403)59 self.assertIn(b"User doesn't have permission to create Run for another user", response.data)60 def test_create_new_run_by_admin(self):61 user_id = "user1"62 self.create_user(user_id)63 run_object = deepcopy(sample_run_object)64 admin_token = self.get_login_token("admin")65 response = self.make_post_request("/runs", run_object, admin_token)66 self.assert_content_type_and_status(response, 201)67 # Admin creating Run for a user that's non existent68 run_object['data']['relationships']['user']['data']['id'] = 'user2'69 response = self.make_post_request("/runs", run_object, admin_token)70 self.assert_content_type_and_status(response, 404)71 self.assertIn(b"user2 not found", response.data)72 def test_list_runs(self):73 user1_id = "user1"74 user2_id = "user2"75 self.create_user(user1_id)76 self.create_user(user2_id)77 run_object = deepcopy(sample_run_object)78 user1_token = self.get_login_token(user1_id)79 # Run 1 for user180 response = self.make_post_request("/runs", run_object, user1_token)81 self.assert_content_type_and_status(response, 201)82 # Run 2 for user183 response = self.make_post_request("/runs", run_object, user1_token)84 self.assert_content_type_and_status(response, 201)85 run_object['data']['relationships']['user']['data']['id'] = user2_id86 user2_token = self.get_login_token(user2_id)87 # Run 1 for user288 response = self.make_post_request("/runs", run_object, user2_token)89 # Without user token90 response = self.make_get_request("/runs")91 self.assert_content_type_and_status(response, 401)92 # List for user 193 response = self.make_get_request("/runs", user1_token)94 self.assert_content_type_and_status(response, 200)95 json_response = response.get_json()96 self.assertEqual(2, json_response['meta']['count'])97 # List for user 298 response = self.make_get_request("/runs", user2_token)99 self.assert_content_type_and_status(response, 200)100 json_response = response.get_json()101 self.assertEqual(1, json_response['meta']['count'])102 # List for admin103 admin_token = self.get_login_token("admin")104 response = self.make_get_request("/runs", admin_token)105 self.assert_content_type_and_status(response, 200)106 json_response = response.get_json()107 self.assertEqual(3, json_response['meta']['count'])108 # List for usermananger109 self.create_user("usermanager", admin_token, roles=["usermanager"])110 um_token = self.get_login_token("usermanager")111 response = self.make_get_request("/runs", um_token)112 self.assert_content_type_and_status(response, 200)113 json_response = response.get_json()114 self.assertEqual(0, json_response['meta']['count'])115 def create_user_with_run(self, user_id):116 self.create_user(user_id)117 run_object = deepcopy(sample_run_object)118 run_object['data']['relationships']['user']['data']['id'] = user_id119 user_token = self.get_login_token(user_id)120 response = self.make_post_request("/runs", run_object, user_token)121 self.assert_content_type_and_status(response, 201)122 return user_token123 def test_update_runs(self):124 user = "user1"125 user_token = self.create_user_with_run(user)126 patch_data = {127 "data": {128 "type": "run",129 "id": 1,130 "attributes": {131 "distance": "4000"132 }133 }134 }135 response = self.make_patch_request('/runs/1', patch_data, user_token)136 self.assert_content_type_and_status(response, 200)137 # user1 trying to update user2138 self.create_user_with_run("user2")139 patch_data['data']['id'] = 2140 response = self.make_patch_request('/runs/2', patch_data, user_token)141 self.assert_content_type_and_status(response, 403)142 def test_update_runs_admin(self):143 user = "user1"144 self.create_user_with_run(user)145 patch_data = {146 "data": {147 "type": "run",148 "id": 1,149 "attributes": {150 "distance": "4000"151 }152 }153 }154 admin_token = self.get_login_token("admin")155 response = self.make_patch_request('/runs/1', patch_data, admin_token)156 self.assert_content_type_and_status(response, 200)157 # user1 trying to update user2158 self.create_user_with_run("user2")159 patch_data['data']['id'] = 2160 response = self.make_patch_request('/runs/2', patch_data, admin_token)161 self.assert_content_type_and_status(response, 200)162 def test_delete_runs(self):163 user = "user1"164 user_token = self.create_user_with_run(user)165 response = self.make_delete_request('/runs/1', user_token)166 self.assert_content_type_and_status(response, 200)167 # user1 trying to update user2168 self.create_user_with_run("user2")169 response = self.make_delete_request('/runs/2', user_token)170 self.assert_content_type_and_status(response, 403)171 def test_delete_runs_admin(self):172 user = "user1"173 self.create_user_with_run(user)174 admin_token = self.get_login_token("admin")175 response = self.make_delete_request('/runs/1', admin_token)176 self.assert_content_type_and_status(response, 200)177 # user1 trying to update user2178 self.create_user_with_run("user2")179 response = self.make_delete_request('/runs/2', admin_token)...
experiment.py
Source:experiment.py
1from __future__ import absolute_import2from __future__ import division3from __future__ import print_function4import copy5import logging6import six7import types8from ray.tune.result import DEFAULT_RESULTS_DIR9from ray.tune.error import TuneError10from ray.tune.registry import register_trainable11logger = logging.getLogger(__name__)12class Experiment(object):13 """Tracks experiment specifications.14 Parameters:15 name (str): Name of experiment.16 run (function|class|str): The algorithm or model to train.17 This may refer to the name of a built-on algorithm18 (e.g. RLLib's DQN or PPO), a user-defined trainable19 function or class, or the string identifier of a20 trainable function or class registered in the tune registry.21 stop (dict): The stopping criteria. The keys may be any field in22 the return result of 'train()', whichever is reached first.23 Defaults to empty dict.24 config (dict): Algorithm-specific configuration for Tune variant25 generation (e.g. env, hyperparams). Defaults to empty dict.26 Custom search algorithms may ignore this.27 trial_resources (dict): Machine resources to allocate per trial,28 e.g. ``{"cpu": 64, "gpu": 8}``. Note that GPUs will not be29 assigned unless you specify them here. Defaults to 1 CPU and 030 GPUs in ``Trainable.default_resource_request()``.31 repeat (int): Deprecated and will be removed in future versions of32 Ray. Use `num_samples` instead.33 num_samples (int): Number of times to sample from the34 hyperparameter space. Defaults to 1. If `grid_search` is35 provided as an argument, the grid will be repeated36 `num_samples` of times.37 local_dir (str): Local dir to save training results to.38 Defaults to ``~/ray_results``.39 upload_dir (str): Optional URI to sync training results40 to (e.g. ``s3://bucket``).41 checkpoint_freq (int): How many training iterations between42 checkpoints. A value of 0 (default) disables checkpointing.43 checkpoint_at_end (bool): Whether to checkpoint at the end of the44 experiment regardless of the checkpoint_freq. Default is False.45 max_failures (int): Try to recover a trial from its last46 checkpoint at least this many times. Only applies if47 checkpointing is enabled. Defaults to 3.48 restore (str): Path to checkpoint. Only makes sense to set if49 running 1 trial. Defaults to None.50 Examples:51 >>> experiment_spec = Experiment(52 >>> "my_experiment_name",53 >>> my_func,54 >>> stop={"mean_accuracy": 100},55 >>> config={56 >>> "alpha": tune.grid_search([0.2, 0.4, 0.6]),57 >>> "beta": tune.grid_search([1, 2]),58 >>> },59 >>> trial_resources={60 >>> "cpu": 1,61 >>> "gpu": 062 >>> },63 >>> num_samples=10,64 >>> local_dir="~/ray_results",65 >>> upload_dir="s3://your_bucket/path",66 >>> checkpoint_freq=10,67 >>> max_failures=2)68 """69 def __init__(self,70 name,71 run,72 stop=None,73 config=None,74 trial_resources=None,75 repeat=1,76 num_samples=1,77 local_dir=None,78 upload_dir="",79 checkpoint_freq=0,80 checkpoint_at_end=False,81 max_failures=3,82 restore=None):83 spec = {84 "run": self._register_if_needed(run),85 "stop": stop or {},86 "config": config or {},87 "trial_resources": trial_resources,88 "num_samples": num_samples,89 "local_dir": local_dir or DEFAULT_RESULTS_DIR,90 "upload_dir": upload_dir,91 "checkpoint_freq": checkpoint_freq,92 "checkpoint_at_end": checkpoint_at_end,93 "max_failures": max_failures,94 "restore": restore95 }96 self.name = name97 self.spec = spec98 @classmethod99 def from_json(cls, name, spec):100 """Generates an Experiment object from JSON.101 Args:102 name (str): Name of Experiment.103 spec (dict): JSON configuration of experiment.104 """105 if "run" not in spec:106 raise TuneError("No trainable specified!")107 if "repeat" in spec:108 raise DeprecationWarning("The parameter `repeat` is deprecated; \109 converting to `num_samples`. `repeat` will be removed in \110 future versions of Ray.")111 spec["num_samples"] = spec["repeat"]112 del spec["repeat"]113 # Special case the `env` param for RLlib by automatically114 # moving it into the `config` section.115 if "env" in spec:116 spec["config"] = spec.get("config", {})117 spec["config"]["env"] = spec["env"]118 del spec["env"]119 spec = copy.deepcopy(spec)120 run_value = spec.pop("run")121 try:122 exp = cls(name, run_value, **spec)123 except TypeError:124 raise TuneError("Improper argument from JSON: {}.".format(spec))125 return exp126 def _register_if_needed(self, run_object):127 """Registers Trainable or Function at runtime.128 Assumes already registered if run_object is a string. Does not129 register lambdas because they could be part of variant generation.130 Also, does not inspect interface of given run_object.131 Arguments:132 run_object (str|function|class): Trainable to run. If string,133 assumes it is an ID and does not modify it. Otherwise,134 returns a string corresponding to the run_object name.135 Returns:136 A string representing the trainable identifier.137 """138 if isinstance(run_object, six.string_types):139 return run_object140 elif isinstance(run_object, types.FunctionType):141 if run_object.__name__ == "<lambda>":142 logger.warning(143 "Not auto-registering lambdas - resolving as variant.")144 return run_object145 else:146 name = run_object.__name__147 register_trainable(name, run_object)148 return name149 elif isinstance(run_object, type):150 name = run_object.__name__151 register_trainable(name, run_object)152 return name153 else:154 raise TuneError("Improper 'run' - not string nor trainable.")155def convert_to_experiment_list(experiments):156 """Produces a list of Experiment objects.157 Converts input from dict, single experiment, or list of158 experiments to list of experiments. If input is None,159 will return an empty list.160 Arguments:161 experiments (Experiment | list | dict): Experiments to run.162 Returns:163 List of experiments.164 """165 exp_list = experiments166 # Transform list if necessary167 if experiments is None:168 exp_list = []169 elif isinstance(experiments, Experiment):170 exp_list = [experiments]171 elif type(experiments) is dict:172 exp_list = [173 Experiment.from_json(name, spec)174 for name, spec in experiments.items()175 ]176 # Validate exp_list177 if (type(exp_list) is list178 and all(isinstance(exp, Experiment) for exp in exp_list)):179 if len(exp_list) > 1:180 logger.warning("All experiments will be "181 "using the same SearchAlgorithm.")182 else:183 raise TuneError("Invalid argument: {}".format(experiments))...
cassandra_runs_insert_commands_builder.py
Source:cassandra_runs_insert_commands_builder.py
1import logging2from datetime import datetime3import uuid4from ..utils import Utils5logger = logging.getLogger('repositories')6class CassandraRunsInsertCommandsBuilder:7 def __init__(self):8 pass9 # TODO create stored procedures as far as Cassandra supports10 def get_command_to_insert_into_all_tables(self, spot_object, run_object):11 run_id = uuid.uuid1()12 c0 = self.get_command_to_insert_into_runs_by_user_spot_date(spot_object, run_object, run_id)13 c1 = self.get_command_to_insert_into_runs_by_user_date(spot_object, run_object, run_id)14 c2 = self.get_command_to_insert_into_runs_by_user_segment_date(spot_object, run_object, run_id)15 c3 = self.get_command_to_insert_into_runs_by_spot_user_date(spot_object, run_object, run_id)16 c4 = self.get_command_to_insert_into_runs_by_segment_date_time(spot_object, run_object, run_id)17 c5 = self.get_command_to_insert_into_runs_by_segment_user_date(spot_object, run_object, run_id)18 c6 = self.get_command_to_insert_into_runs_by_segment_time(spot_object, run_object, run_id)19 c7 = self.get_command_to_insert_into_runs_by_id(spot_object, run_object, run_id)20 return run_id, [c0, c1, c2, c3, c4, c5, c6, c7]21 def get_command_to_insert_into_runs_by_user_spot_date(self, spot_object, run_object, run_id):22 run_object_def = self.get_run_object_def_query_part(spot_object, run_object, run_id)23 return ('INSERT INTO runs_by_user_spot_date(id, user_id, spot_id, time_start, run_info) ' +24 'VALUES (' + str(run_id) + ', ' +25 str(run_object.user_id) + ', ' +26 str(spot_object.id) + ', ' +27 Utils.str_to_cassandra_time(str(run_object.time_start)) + ', ' +28 run_object_def + 29 ');')30 def get_command_to_insert_into_runs_by_user_date(self, spot_object, run_object, run_id):31 run_object_def = self.get_run_object_def_query_part(spot_object, run_object, run_id)32 return ('INSERT INTO runs_by_user_date(id, user_id, time_start, run_info) ' +33 'VALUES (' + str(run_id) + ', ' +34 str(run_object.user_id) + ', ' +35 Utils.str_to_cassandra_time(str(run_object.time_start)) + ', ' +36 run_object_def +37 ');')38 def get_command_to_insert_into_runs_by_user_segment_date(self, spot_object, run_object, run_id):39 run_object_def = self.get_run_object_def_query_part(spot_object, run_object, run_id)40 return ('INSERT INTO runs_by_user_segment_date(id, user_id, segment_id, time_start, run_info) ' +41 'VALUES (' + str(run_id) + ', ' +42 str(run_object.user_id) + ', ' +43 str(run_object.segment.id) + ', ' +44 Utils.str_to_cassandra_time(str(run_object.time_start)) + ', ' +45 run_object_def +46 ');')47 def get_command_to_insert_into_runs_by_spot_user_date(self, spot_object, run_object, run_id):48 run_object_def = self.get_run_object_def_query_part(spot_object, run_object, run_id)49 return ('INSERT INTO runs_by_spot_user_date(id, spot_id, user_id, time_start, run_info) ' +50 'VALUES (' + str(run_id) + ', ' +51 str(spot_object.id) + ', ' +52 str(run_object.user_id) + ', ' +53 Utils.str_to_cassandra_time(str(run_object.time_start)) + ', ' +54 run_object_def + 55 ');')56 def get_command_to_insert_into_runs_by_segment_date_time(self, spot_object, run_object, run_id):57 run_object_def = self.get_run_object_def_query_part(spot_object, run_object, run_id)58 return ('INSERT INTO runs_by_segment_date_time(id, segment_id, time_start, time_span_ms, run_info) ' +59 'VALUES (' + str(run_id) + ', ' +60 str(run_object.segment.id) + ', ' +61 Utils.str_to_cassandra_time(str(run_object.time_start)) + ', ' +62 str(run_object.time_span_ms) + ', ' +63 run_object_def +64 ');')65 def get_command_to_insert_into_runs_by_segment_user_date(self, spot_object, run_object, run_id):66 run_object_def = self.get_run_object_def_query_part(spot_object, run_object, run_id)67 return ('INSERT INTO runs_by_segment_user_date(id, segment_id, user_id, time_start, run_info) ' +68 'VALUES (' + str(run_id) + ', ' +69 str(run_object.segment.id) + ', ' +70 str(run_object.user_id) + ', ' +71 Utils.str_to_cassandra_time(str(run_object.time_start)) + ', ' +72 run_object_def +73 ');')74 def get_command_to_insert_into_runs_by_segment_time(self, spot_object, run_object, run_id):75 run_object_def = self.get_run_object_def_query_part(spot_object, run_object, run_id)76 return ('INSERT INTO runs_by_segment_time(id, segment_id, time_span_ms, time_start, run_info) ' +77 'VALUES (' + str(run_id) + ', ' +78 str(run_object.segment.id) + ', ' +79 str(run_object.time_span_ms) + ', ' +80 Utils.str_to_cassandra_time(str(run_object.time_start)) + ', ' +81 run_object_def + 82 ');')83 def get_command_to_insert_into_runs_by_id(self, spot_object, run_object, run_id):84 run_object_def = self.get_run_object_def_query_part(spot_object, run_object, run_id)85 return ('INSERT INTO runs_by_id(id, run_info) ' +86 'VALUES (' + str(run_id) + ', ' +87 run_object_def + 88 ');')89 def get_run_object_def_query_part(self, spot_object, run_object, run_id):90 return ('{ ' +91 ' id: ' + str(run_id) + ',' +92 ' user_id: ' + str(run_object.user_id) + ', ' +93 ' user_bt_name: \'' + run_object.user_bt_name + '\', ' +94 ' segment: { ' +95 ' id: ' + str(run_object.segment.id) + ', ' +96 ' name: \'' + run_object.segment.name + '\', '97 ' location_start: ' +98 ' { ' +99 ' id: ' + str(run_object.segment.location_start.id) + ', ' +100 ' location: { la: ' + str(run_object.segment.location_start.location.la) + ', lo: ' + str(run_object.segment.location_start.location.lo) + ' } ' +101 ' }, ' +102 ' location_stop: ' +103 ' { ' +104 ' id: ' + str(run_object.segment.location_stop.id) + ', ' +105 ' location: { la: ' + str(run_object.segment.location_stop.location.la) + ', lo: ' + str(run_object.segment.location_stop.location.lo) + ' } ' +106 ' }, ' +107 ' valid_time_start: ' + Utils.str_to_cassandra_time(str(run_object.segment.valid_time_start)) + ', ' +108 ' valid_time_stop: 0 ' +109 ' }, ' +110 ' time_start: ' + Utils.str_to_cassandra_time(str(run_object.time_start)) + ', ' +111 ' time_stop: ' + str(Utils.str_to_cassandra_time(str(run_object.time_stop))) + ', ' +112 ' time_span_ms: ' + str(run_object.time_span_ms) +...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!