Best Python code snippet using lisa_python
malwareml.py
Source:malwareml.py
1#!/usr/bin/env python32from __future__ import print_function3import json4import os5import sys6from timeit import default_timer as timer7import logging8import numpy as np9import matplotlib.pyplot as plt10from sklearn.model_selection import train_test_split11from sklearn.metrics import classification_report12from sklearn.preprocessing import StandardScaler13from sklearn.preprocessing import Imputer14from sklearn.model_selection import GridSearchCV15from sklearn.svm import SVC, LinearSVC16from sklearn.gaussian_process.kernels import RBF17from sklearn.model_selection import cross_val_score18from sklearn.model_selection import cross_val_predict19from sklearn.pipeline import make_pipeline20from sklearn.externals import joblib21from sklearn.feature_extraction import DictVectorizer22from sklearn.manifold import TSNE23from matplotlib.colors import ListedColormap24logger = logging.getLogger('')25truths = ['true', 't', '1']26CV = os.environ.get('MML_CV', 'True').lower() in truths27SVM_C = float(os.environ.get('MML_SVM_C', 1000))28SVM_GAMMA = float(os.environ.get('MML_SVM_GAMMA', 0.02))29SVM_KERNEL = os.environ.get('MML_SVM_KERNEL', 'linear')30SVM_CACHE = int(os.environ.get('MML_SVM_CACHE_SIZE', '200'))31SVM_PROBABILITY = os.environ.get('MML_SVM_PROBABILITY', 'True').lower() in truths32EXCLUDE_FEATURE = os.environ.get('MML_EXCLUDE_FEATURE', None)33EXCLUDE_FEATURE_FILE = os.environ.get('MML_EXCLUDE_FEATURE_FILE', None)34CATEGORY_BENIGN = 035CATEGORY_MALICIOUS = 136log_dir = 'logs/'37class ML(object):38 def __init__(self):39 self.vec = None40 def build_feature_matrix(self, reports):41 # Pull all features from reports42 report_features = []43 for r in reports:44 # TODO: handle overwrite if report.name is the same45 report_features.append(r.features)46 # Vectorize into array47 self.vec = DictVectorizer()48 arr = self.vec.fit_transform(report_features).toarray()49 # Fill in missing values with avg value for that feature50 # TODO: Look into other(better?) ways to fill in values51 self.imp = Imputer(missing_values='NaN', strategy='mean', axis=0)52 arr = self.imp.fit_transform(arr)53 # Standardize data to approx normal (mean 0 and unit variance)54 # self.scaler = StandardScaler()55 # arr = self.scaler.fit_transform(arr)56 return arr57 def get_feature_matrix(self, reports):58 """59 Used for experimenting. Uses existing DictVectorizer to convert reports to a matrix.60 Returns a feature matrix61 """62 for attr in ['vec', 'imp', 'scaler']:63 if getattr(self, attr, None) is None:64 raise Exception('You must call build_feature_matrix before using get_feature_matrix!')65 report_features = [r.features for r in reports]66 arr = self.vec.transform(report_features).toarray()67 arr = self.imp.transform(arr)68 arr = self.scaler.transform(arr)69 return arr70 def get_classifier(self, classifier):71 return self.classifiers[classifier]72 def get_acc_fp_fn(self, predictions_arr, truth_arr):73 # Assuming 0 = neg(benign), 1 = pos(malware)74 if (len(predictions_arr) != len(truth_arr)):75 logger.error("get_overall_acc(): Error predictions_arr and truth_arr are different sizes.")76 return -177 correct_count = 078 false_pos_count = 079 false_neg_count = 080 for i in range(len(predictions_arr)):81 if predictions_arr[i] == truth_arr[i]:82 correct_count += 183 elif (predictions_arr[i] == 1) and (truth_arr[i] == 0):84 false_pos_count += 185 elif (predictions_arr[i] == 0) and (truth_arr[i] == 1):86 false_neg_count += 187 overall_acc = float(correct_count)/float(len(predictions_arr))88 return (overall_acc, false_pos_count, false_neg_count)89 def display_top_feature_weights(self, clf, n=15):90 feature_names = self.vec.get_feature_names()91 coefs_with_fns = sorted(zip(clf.coef_[0], feature_names))92 top = zip(coefs_with_fns[:n], coefs_with_fns[:-(n + 1):-1])93 logger.info("\tTop Benign\t\t\tTop Malware")94 for (coef_1, fn_1), (coef_2, fn_2) in top:95 logger.info("\t%.4f\t%.15s\t\t%.4f\t%.15s" % (coef_1, fn_1, coef_2, fn_2))96 def CV_ml_params(self, pipe_clf, X_train, y_train):97 # Cross-validation ML Params98 c_range = np.logspace(-2, 3, 5)99 if (pipe_clf.get_params()['svc__kernel'] == 'rbf'):100 gamma_range = np.logspace(-9, 3, 5)101 param_grid = dict(svc__gamma=gamma_range, svc__C=c_range)102 CV_clf = GridSearchCV(estimator=pipe_clf, param_grid=param_grid, cv=5, n_jobs=4, verbose=20)103 else:104 param_grid = dict(svc__C=c_range)105 CV_clf = GridSearchCV(pipe_clf, param_grid=param_grid, cv=5, n_jobs=4)106 logger.info('GridSearchCV instantiated')107 CV_clf.fit(X_train, y_train)108 logger.info('GridSearchCV fitted')109 # gridCV_scores = CV_clf.cv_results_['mean_test_score']110 logger.info('Done cross validation')111 logger.info('The best parameters are ' + str(CV_clf.best_params_) + ' with a score of ' + str(CV_clf.best_score_))112 return CV_clf.best_params_113class Report(object):114 def __init__(self, category=None):115 self.name = ""116 self.features = {}117 self.total = None118 self.positives = None119 self.scans = None120 self.category = category121 def load_report(self, json_file, file_name="unknown"):122 """Load JSON formatted malware report. It can handle both a path to123 JSON file and a dictionary object."""124 if isinstance(json_file, str):125 self.json_path = json_file126 with open(json_file, "r") as malware_report:127 try:128 self.report = json.load(malware_report)129 except ValueError as error:130 logger.error("Could not load file; {} is not a valid JSON file.".format(malware_report))131 logger.error("Exception: %s" % str(error))132 # sys.exit(1)133 return -1134 elif isinstance(json_file, dict):135 self.report = json_file136 else:137 # Unknown binary format138 logger.error("Could not load the data *{}* is of unknown type: {}.".format(json, type(json)))139 return -1140 # Could be extracted as features elsewhere...141 self.name = file_name142 # Get total and positives143 #self.total = self.report.get("virustotal", {}).get("total")144 #self.positives = self.report.get("virustotal", {}).get("positives")145 # Pull all VT normalised results146 #self.scans = self.report.get("virustotal", {}).get("scans")147 # Success148 return 1149 def get_features(self):150 self.features = self.report151 if EXCLUDE_FEATURE is not None and EXCLUDE_FEATURE in self.report:152 del(self.features[EXCLUDE_FEATURE])153 return154class ExcludedReport(Report):155 def __init__(self, *args, **kwargs):156 super().__init__()157 self.excluded_features = kwargs.pop('excluded_features', [])158 if self.excluded_features is None:159 self.excluded_features = []160 def get_features(self):161 super().get_features()162 if EXCLUDE_FEATURE is not None and EXCLUDE_FEATURE in self.report:163 del(self.features[EXCLUDE_FEATURE])164 for feat in self.excluded_features:165 if feat in self.features:166 del(self.features[feat])167def load_reports(directory_benign, directory_malware, excluded_features=None):168 reports = []169 reports_truth = []170 # Load in benign programs171 for file in os.listdir(directory_benign):172 # Ignore large report files...173 max_report_filesize = 20000000174 if (os.stat(os.path.join(directory_benign, file)).st_size > max_report_filesize):175 continue176 if excluded_features:177 new_report = ExcludedReport(category=CATEGORY_BENIGN, excluded_features=excluded_features)178 else:179 new_report = Report(category=CATEGORY_BENIGN)180 # Try to load report181 if(new_report.load_report(os.path.join(directory_benign, file), file) == -1):182 continue183 new_report.get_features()184 reports.append(new_report)185 reports_truth.append(CATEGORY_BENIGN)186 # Load in malware programs187 for file in os.listdir(directory_malware):188 # Ignore large report files...189 max_report_filesize = 20000000190 if (os.stat(os.path.join(directory_malware, file)).st_size > max_report_filesize):191 continue192 if excluded_features:193 new_report = ExcludedReport(category=CATEGORY_MALICIOUS, excluded_features=excluded_features)194 else:195 new_report = Report(category=CATEGORY_MALICIOUS)196 # Try to load report197 if(new_report.load_report(os.path.join(directory_malware, file), file) == -1):198 continue199 new_report.get_features()200 reports.append(new_report)201 reports_truth.append(CATEGORY_MALICIOUS)202 return (reports, reports_truth)203def plot_data(data_2d, labels):204 datasets = [(data_2d, labels)]205 names = ["Linear SVM"]206 classifiers = [SVC(kernel="linear", C=1.27)]207 h = .02208 #datasets = [(data_count, (X,y)), ...]209 figure = plt.figure(figsize=(27, 9))210 i = 1211 # iterate over datasets212 for ds_cnt, ds in enumerate(datasets):213 # preprocess dataset, split into training and test part214 X, y = ds215 X = StandardScaler().fit_transform(X)216 X_train, X_test, y_train, y_test = \217 train_test_split(X, y, test_size=.4, random_state=42)218 x_min, x_max = X[:, 0].min() - .5, X[:, 0].max() + .5219 y_min, y_max = X[:, 1].min() - .5, X[:, 1].max() + .5220 xx, yy = np.meshgrid(np.arange(x_min, x_max, h),221 np.arange(y_min, y_max, h))222 # just plot the dataset first223 cm = plt.cm.RdBu224 cm_bright = ListedColormap(['#FF0000', '#0000FF'])225 ax = plt.subplot(len(datasets), len(classifiers) + 1, i)226 if ds_cnt == 0:227 ax.set_title("Input data")228 # Plot the training points229 ax.scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=cm_bright)230 # and testing points231 ax.scatter(X_test[:, 0], X_test[:, 1], c=y_test, cmap=cm_bright, alpha=0.5)232 ax.set_xlim(xx.min(), xx.max())233 ax.set_ylim(yy.min(), yy.max())234 ax.set_xticks(())235 ax.set_yticks(())236 i += 1237 # iterate over classifiers238 for name, clf in zip(names, classifiers):239 ax = plt.subplot(len(datasets), len(classifiers) + 1, i)240 clf.fit(X_train, y_train)241 score = clf.score(X_test, y_test)242 # Plot the decision boundary. For that, we will assign a color to each243 # point in the mesh [x_min, x_max]x[y_min, y_max].244 if hasattr(clf, "decision_function"):245 Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])246 else:247 Z = clf.predict_proba(np.c_[xx.ravel(), yy.ravel()])[:, 1]248 # Put the result into a color plot249 Z = Z.reshape(xx.shape)250 ax.contourf(xx, yy, Z, cmap=cm, alpha=.8)251 # Plot also the training points252 ax.scatter(X_train[:, 0], X_train[:, 1], c=y_train, cmap=cm_bright)253 # and testing points254 ax.scatter(X_test[:, 0], X_test[:, 1], c=y_test, cmap=cm_bright,255 alpha=0.6)256 ax.set_xlim(xx.min(), xx.max())257 ax.set_ylim(yy.min(), yy.max())258 ax.set_xticks(())259 ax.set_yticks(())260 if ds_cnt == 0:261 ax.set_title(name)262 ax.text(xx.max() - .3, yy.min() + .3, ('%.2f' % score).lstrip('0'),263 size=15, horizontalalignment='right')264 i += 1265 plt.tight_layout()266 plt.show()267def main():268 logger.info('Starting program')269 directory_benign = sys.argv[1]270 directory_malware = sys.argv[2]271 # Exclude Features from file, if specified272 excluded_features = None273 if EXCLUDE_FEATURE_FILE is not None:274 with open(EXCLUDE_FEATURE_FILE) as f:275 excluded_features = [j for j in [l.split('#', 1)[0].strip() for l in f if l] if j]276 # Load in reports277 (reports, reports_truth) = load_reports(directory_benign, directory_malware, excluded_features)278 logger.info('Done loading reports')279 # Init ML280 ml = ML()281 # Build numerical array from report features282 feat_matrix = ml.build_feature_matrix(reports)283 logger.info('Built matrix')284 # Visualization285 # model = TSNE(n_components=2, random_state=0)286 # model_out = model.fit_transform(feat_matrix)287 # print model_out288 # plot_data(model_out, reports_truth)289 # Initialize ML classifier290 clf = SVC(kernel=SVM_KERNEL, C=SVM_C, gamma=SVM_GAMMA)291 pipe_clf = make_pipeline(StandardScaler(), clf)292 if CV:293 start = timer()294 # X_train, X_test, y_train, y_test = train_test_split(feat_matrix, reports_truth, test_size=0.25, random_state=42)295 # best_params = ml.CV_ml_params(SVM_KERNEL, X_train, y_train)296 best_params = ml.CV_ml_params(pipe_clf, feat_matrix, reports_truth)297 cv_time = timer() - start298 pipe_clf.set_params(**best_params)299 else:300 cv_time = None301 # Train and predict302 start = timer()303 logger.info('Starting cross-val predict...')304 # cv_score = cross_val_score(pipe_clf, feat_matrix, reports_truth, cv=4)305 # logger.info(cv_score)306 predictions = cross_val_predict(pipe_clf, feat_matrix, reports_truth, cv=4, n_jobs=-1)307 logger.info('Predictions made')308 fit_predict_time = timer() - start309 # Output Information310 logger.info('Done ml')311 num_malware = len(os.listdir(directory_malware))312 num_benign = len(os.listdir(directory_benign))313 num_total = num_benign + num_malware314 logger.info("Total Number of Reports: " + str(num_total))315 logger.info("Number of Reports Included: " + str(len(reports)))316 logger.info("Number Malware: %d - Number Benign: %d" % (num_malware, num_benign))317 (score, f_pos, f_neg) = ml.get_acc_fp_fn(predictions, reports_truth)318 logger.info("Overall Accuracy: %.6f" % (score))319 logger.info("False Neg: %.2f%% - %d/%d, False Pos: %.2f%% - %d/%d"320 % (100 * float(f_neg) / float(num_malware), f_neg, num_malware, 100 * float(f_pos) / float(num_benign), f_pos, num_benign))321 logger.info("Number of Features: " + str(len(feat_matrix[0])))322 logger.info('Parameters: ' + str(pipe_clf.get_params()))323 # logger.info("C Value: " + str(pipe_clf.get_params()['svc__C']))324 # if pipe_clf.get_params()['svc__kernel'] == 'rbf':325 # logger.info("Gamma Value: " + str(pipe_clf.get_params()['svc__gamma']))326 # logger.info("Kernel: " + str(pipe_clf.get_params()['svc__kernel']))327 if EXCLUDE_FEATURE:328 logger.info("Excluded feature: {}".format(EXCLUDE_FEATURE))329 if EXCLUDE_FEATURE_FILE:330 logger.info("Excluded feature file: {}".format(EXCLUDE_FEATURE_FILE))331 logger.info('')332 try:333 ml.display_top_feature_weights(clf)334 except Exception as e:335 logger.error('Could not display top features: {}'.format(e))336 logger.info('')337 stats = {338 'accuracy': score,339 'num_features': len(feat_matrix[0]),340 'kernel': clf.get_params()['kernel'],341 'C': clf.get_params()['C'],342 'gamma': clf.get_params().get('gamma', None),343 'exclude_feature': EXCLUDE_FEATURE,344 'exclude_feature_file': EXCLUDE_FEATURE_FILE,345 'exclude_features': excluded_features,346 'cv_time': str(cv_time),347 'fit_predict_time': str(fit_predict_time),348 }349 with open(os.path.join(log_dir, 'stats.json'), 'w') as f:350 json.dump(stats, f, indent=4)351"""352 results = {}353 for report in reports:354 if report in reports_train:355 res = 'train'356 else:357 prediction = predictions[reports_test.index(report)]358 res = '{} {}'.format(359 'true' if prediction == report.category else 'false',360 'positive' if prediction == CATEGORY_MALICIOUS else 'negative'361 )362 results[report.name] = res363 with open(os.path.join(log_dir, 'results.json'), 'w') as f:364 json.dump(results, f, indent=4)365 joblib.dump(clf, os.path.join(log_dir, 'clf.pkl'))366 joblib.dump(ml.vec, os.path.join(log_dir, 'vec.pkl'))367 joblib.dump(ml.imp, os.path.join(log_dir, 'imp.pkl'))368 #joblib.dump(ml.scaler, os.path.join(log_dir, 'scaler.pkl'))369"""370 # Precision = (# correctly predicted pos) / (# predicted pos), recall = (# correctly predicted pos) / (# actual positive)371 # logger.info(str(classification_report(y_test, predictions, target_names=['benign', 'malware'])))372if __name__ == '__main__':373 from logging_setup import setup_logging374 log_dir = setup_logging(from_file=__file__)...
construction.py
Source:construction.py
1import os2import pandas as pd3from sklearn.preprocessing import MinMaxScaler, StandardScaler, MaxAbsScaler, RobustScaler4from utility.folder_creator import folder_creator5# SCALING6"""Normalization is the process of scaling individual samples to have unit norm. 7This process can be useful if you plan to use a quadratic form such 8as the dot-product or any other kernel to quantify the similarity of any pair of samples.9This assumption is the base of the Vector Space Model often used in text classification and clustering contexts.10If you want to cluster based on similar shape in the cluster rather then similar variance (standardization)"""11def min_max_scaling(input_path, output_path):12 folder_creator(output_path, 1)13 excluded_features = ['Date']14 for crypto in os.listdir(input_path):15 df = pd.read_csv(input_path + crypto, delimiter=',', header=0)16 scaler = MinMaxScaler()17 for col in df.columns:18 if col not in excluded_features:19 normalized = scaler.fit_transform(df[col].values.reshape(-1, 1))20 df[col] = pd.Series(normalized.reshape(-1))21 # todo we have to round 8 since the neural network takes floating numbers with this limit (df.round(8))22 df.to_csv(output_path + crypto, sep=",", index=False)23# SCALING24"""Normalization is the process of scaling individual samples to have unit norm. 25This process can be useful if you plan to use a quadratic form such 26as the dot-product or any other kernel to quantify the similarity of any pair of samples.27This assumption is the base of the Vector Space Model often used in text classification and clustering contexts.28If you want to cluster based on similar shape in the cluster rather then similar variance (standardization)"""29def min_max_one_minusone_scaling(input_path, output_path):30 folder_creator(output_path, 1)31 excluded_features = ['Date']32 for crypto in os.listdir(input_path):33 df = pd.read_csv(input_path + crypto, delimiter=',', header=0)34 scaler = MinMaxScaler(feature_range=(-1, 1))35 for col in df.columns:36 if col not in excluded_features:37 normalized = scaler.fit_transform(df[col].values.reshape(-1, 1))38 df[col] = pd.Series(normalized.reshape(-1))39 # todo we have to round 8 since the neural network takes floating numbers with this limit (df.round(8))40 df.to_csv(output_path + crypto, sep=",", index=False)41# SCALING42def robust_scaling(input_path, output_path):43 folder_creator(output_path, 1)44 excluded_features = ['Date']45 for crypto in os.listdir(input_path):46 df = pd.read_csv(input_path + crypto, delimiter=',', header=0)47 scaler = RobustScaler()48 for col in df.columns:49 if col not in excluded_features:50 normalized = scaler.fit_transform(df[col].values.reshape(-1, 1))51 df[col] = pd.Series(normalized.reshape(-1))52 df.to_csv(output_path + crypto, sep=",", index=False)53def max_abs_scaling(input_path, output_path):54 folder_creator(output_path, 1)55 excluded_features = ['Date', 'trend']56 for crypto in os.listdir(input_path):57 splitted = crypto.split("_")58 crypto_name = splitted[0]59 folder_creator(os.path.join(output_path, crypto_name), 0)60 df = pd.read_csv(os.path.join(input_path, crypto), delimiter=',', header=0)61 day_to_predict = df.loc[len(df.Date) - 1]62 df = df[:-1] # remove the date to predict63 scaler = MaxAbsScaler()64 for col in df.columns:65 if col not in excluded_features:66 normalized = scaler.fit_transform(df[col].values.reshape(-1, 1))67 df[col] = pd.Series(normalized.reshape(-1))68 df = df.append(day_to_predict, ignore_index=True)69 df.to_csv(os.path.join(output_path, crypto_name, crypto), sep=",", index=False)70def standardization(input_path, output_path):71 folder_creator(output_path, 1)72 excluded_features = ['Date']73 for crypto in os.listdir(input_path):74 df = pd.read_csv(input_path + crypto, delimiter=',', header=0)75 scaler = StandardScaler()76 for col in df.columns:77 if col not in excluded_features:78 normalized = scaler.fit_transform(df[col].values.reshape(-1, 1))79 df[col] = pd.Series(normalized.reshape(-1))80 df.to_csv(output_path + crypto, sep=",", index=False)81# creates the horizontal dataset82def create_horizontal_dataset(data_path, output_path, test_set):83 cryptocurrencies_with_date_to_pred = os.listdir(data_path)84 cryptos_in_the_cluster = []85 already_created = False86 folder_creator(output_path + "horizontal_datasets" + "/", 0)87 print("Creating horizontal version")88 for date_to_predict in test_set:89 dictionary_m = {}90 dataframes = []91 # take just the date column one time92 for dataset_name in cryptocurrencies_with_date_to_pred:93 splitted = dataset_name.split("_")94 date_to_predict_crypto = str(splitted[1]).replace(".csv", "")95 if date_to_predict == date_to_predict_crypto:96 df_date = pd.read_csv(os.path.join(data_path, dataset_name))97 dataframes.append(df_date['Date'])98 break99 # creates Close_1,Open_1 ecc for each dataframe100 i = 1101 for dataset_name in cryptocurrencies_with_date_to_pred:102 splitted = dataset_name.split("_")103 crypto_name = splitted[0]104 date_to_predict_crypto = str(splitted[1]).replace(".csv", "")105 if date_to_predict == date_to_predict_crypto:106 df = pd.read_csv(os.path.join(data_path, dataset_name), header=0)107 if already_created == False:108 cryptos_in_the_cluster.append(crypto_name)109 df = df.drop('Date', axis=1)110 df['symbol'] = crypto_name111 df = df.add_suffix('_' + str(i))112 i += 1113 # dictionary_m[crypto_name]=crypto_name114 dictionary_m[crypto_name + 'dataframe'] = df115 for crypt in cryptos_in_the_cluster:116 dataframes.append(dictionary_m.get(crypt + 'dataframe'))117 already_created = True118 # concat horizontally all the dataframes119 horizontal = pd.concat(dataframes, axis=1)120 # serialization121 horizontal.to_csv(output_path + "horizontal_datasets/horizontal_" + date_to_predict + ".csv", sep=",",122 index=False)123 del horizontal124 del dataframes125 del dictionary_m126 print("Horizontal version created for the date: " + str(date_to_predict))127 return list(cryptos_in_the_cluster)128# [close(i+1)-close(i)/close(i)*100]129def add_trend_feature(input_path, output_path, percent):130 for crypto in os.listdir(input_path):131 df = pd.read_csv(os.path.join(input_path, crypto), sep=",", header=0)132 df['pct_change'] = df['Close'].pct_change()133 df['pct_change'] = df['pct_change'].apply(lambda x: x * 100)134 # 0 is stable135 # 1 is down136 # 2 is up137 df['trend'] = 0138 df.loc[df['pct_change'] < -percent, 'trend'] = -1 # down139 df.loc[df['pct_change'] > percent, 'trend'] = 1 # up140 # print(df[['pct_change','trend']])141 df.to_csv(output_path + crypto, sep=",", index=False)142def change_relative_variance(input_path, output_path):143 folder_creator(output_path, 1)144 for crypto in os.listdir(input_path):145 splitted = crypto.split("_")146 crypto_name = splitted[0]147 folder_creator(os.path.join(output_path, crypto_name), 0)148 df = pd.read_csv(os.path.join(input_path, crypto), sep=",", header=0)149 df[df.columns.drop(['Date', 'trend'])] = ((df[df.columns.drop(['Date', 'trend'])] - df[150 df.columns.drop(['Date', 'trend'])].shift(1)) / df[df.columns.drop(['Date', 'trend'])].shift(1)) * 100151 df = df.iloc[1:, :]152 df.to_csv(os.path.join(output_path, crypto_name, crypto), sep=",", index=False)153"""def add_trend_feature(input_path,output_path,percent):154 for crypto in os.listdir(input_path):155 df= pd.read_csv(os.path.join(input_path,crypto),sep=",",header=0)156 df['trend']=0157 for day in df.Date.values:158 if day!=df.Date.values[0]:159 day_before = (pd.to_datetime(day, format="%Y-%m-%d") - timedelta(days=1)).strftime('%Y-%m-%d')160 row_day_before = df[df['Date'] == day_before]161 row_day_before = row_day_before.set_index('Date')162 row_current_day = df[df['Date'] == day]163 row_current_day = row_current_day .set_index('Date')164 delta_percent=np.multiply(165 np.divide(np.subtract(row_current_day.loc[day,'Close'],166 row_day_before.loc[day_before,'Close']),167 row_day_before.loc[day_before,'Close']),100)168 print(delta_percent)169 df = df.set_index("Date")170 if delta_percent>percent:171 #Up:2172 df.at[day,'trend']=2173 elif delta_percent<percent:174 #down:1175 df.at[day, 'trend']=1176 else:177 pass178 df=df.reset_index()179 df.to_csv(output_path + crypto, sep=",", index=False)...
__init__.py
Source:__init__.py
1from typing import Callable, Any2import sys3from collections import ChainMap4from collections.abc import Mapping5from functools import wraps6from pyramid.config import *7from pyramid.config import Configurator8from tet.decorators import deprecated9from tet.i18n import configure_i18n10from tet.util.collections import flatten11from tet.util.path import caller_package12class TetAppFactory(object):13 """14 This method is deprecated in favour of procedural configuration /15 pyramid_zcml with create_configurator. See `application_factory`16 decorator for more details.17 """18 scan = None19 includes = []20 excludes = []21 i18n = True22 default_i18n_domain = None23 settings = {}24 global_config = None25 # :type config: Configurator26 config = None27 default_includes = [28 'tet.services',29 'tet.renderers.json'30 ]31 @deprecated32 def __new__(cls, global_config, **settings_kw):33 instance = cls.instantiate()34 instance.init_app_factory(global_config, settings_kw)35 return instance.construct_app()36 @classmethod37 def instantiate(cls):38 return super(TetAppFactory, cls).__new__(cls)39 def __init__(self, *args, **kwargs):40 super(TetAppFactory, self).__init__()41 def _dummy(self, config: Configurator):42 pass43 def init_app_factory(self, global_config, settings):44 self.settings = settings45 self.global_config = global_config46 self.config = self.make_configurator()47 self.do_default_includes()48 def do_default_includes(self):49 excludes = set(self.excludes)50 def conditional_include(item):51 if item not in excludes:52 self.config.include(item)53 for item in self.default_includes:54 conditional_include(item)55 def prepare_i18n(self):56 if self.i18n:57 configure_i18n(self.config, self.default_i18n_domain)58 def make_configurator(self) -> Configurator:59 return Configurator(settings=self.settings)60 pre_configure_app = _dummy61 configure_db = _dummy62 def configure_app(self, config: Configurator) -> None:63 self.configure_db(config)64 self.configure_routes(config)65 def configure_routes(self, config: Configurator) -> None:66 pass67 def post_configure_app(self, config: Configurator) -> None:68 pass69 def do_scan(self) -> None:70 self.config.scan(self.scan)71 def do_include(self) -> None:72 for i in self.includes:73 self.config.include(i)74 def construct_app(self) -> None:75 if self.includes:76 self.do_include()77 self.prepare_i18n()78 self.pre_configure_app(self.config)79 self.configure_app(self.config)80 self.post_configure_app(self.config)81 if self.scan:82 self.do_scan()83 return self.wrap_app(self.config.make_wsgi_app())84 def wrap_app(self, app) -> None:85 return app86 @classmethod87 @deprecated88 def main(cls, global_config, **settings):89 return cls(global_config, **settings)90ALL_FEATURES = [91 'services',92 'i18n',93 'renderers.json',94 'renderers.tonnikala',95 'renderers.tonnikala.i18n',96 'security.authorization',97 'security.csrf'98]99MINIMAL_FEATURES = []100def create_configurator(*,101 global_config=None,102 settings=None,103 merge_global_config=True,104 configurator_class=Configurator,105 included_features=(),106 excluded_features=(),107 package=None,108 **kw) -> Configurator:109 defaults = {}110 if merge_global_config and isinstance(global_config, Mapping):111 settings = ChainMap(settings, global_config, defaults)112 extracted_settings = {}113 if package is None:114 package = caller_package(ignored_modules=[__name__])115 for name in ['default_i18n_domain']:116 if name in kw:117 extracted_settings[name] = kw.pop(name)118 if hasattr(package, '__name__'):119 package_name = package.__name__120 else:121 package_name = package122 defaults['default_i18n_domain'] = package_name123 config = configurator_class(settings=settings,124 package=package,125 **kw)126 config.add_settings(extracted_settings)127 included_features = list(flatten(included_features))128 excluded_features = set(flatten(excluded_features))129 feature_set = set(included_features) - set(excluded_features)130 config.registry.tet_features = feature_set131 for feature_name in included_features:132 if feature_name in feature_set:133 try:134 config.include('tet.' + feature_name)135 except Exception as e:136 print('Unable to include feature {}: {}'.format(137 feature_name,138 e139 ), file=sys.stderr)140 raise141 return config142def application_factory(factory_function: Callable[[Configurator], Any]=None,143 configure_only=False,144 included_features=ALL_FEATURES,145 excluded_features=(),146 package=None,147 **extra_parameters):148 """149 A decorator for main method / application configurator for Tet. The150 wrapped function must accept a single argument - the Configurator. The151 wrapper itself accepts arguments (global_config, **settings) like an152 ordinary Pyramid/Paster application entry point does.153 If configure_only=False (the default), then the return value is a154 WSGI application created from the configurator.155 `included_features` contains an iterable of features that should be156 automatically included in the application. By default all standard Tet157 features are included. For maximal future compatibility you can specify the158 included feature names here.159 `excluded_features` should be an iterable of features that shouldn't be160 automatically included - this serves as a fast way to get all standard161 features except a named few.162 `package` should be the package passed to the Configurator object;163 otherwise the package of the caller is assumed.164 :param factory_function: The actual wrapped factory function that165 accepts parameter (config: Configurator)166 :param configure_only: True if no WSGI application is to be made, false167 to actually create the WSGI application as the return value168 :param included_features: The iterable of included features. This can169 in turn contain other iterables; they are flattened by the wrapper into170 a list of strings.171 :param excluded_features: The iterable of excluded features. This can172 in turn contain other iterables; they are flattened by the wrapper into173 a list of strings.174 :param extra_parameters: extra parameters that will be passed as-is to175 the actual configurator generation.176 :return: the WSGI app if `configure_only` is `False`; `config`, if177 `configure_only` is `True`.178 """179 if package is None:180 package = caller_package(ignored_modules=[__name__])181 def decorator(function):182 @wraps(function)183 def wrapper(*a, **kw):184 if len(a) > 1:185 raise TypeError('application_factory wrapped function '186 'called with more than 1 positional argument')187 global_config = a[0] if a else None188 settings = kw189 config = create_configurator(global_config=global_config,190 settings=settings,191 included_features=included_features,192 excluded_features=excluded_features,193 package=package,194 **extra_parameters)195 returned = function(config)196 if isinstance(returned, Configurator):197 config = returned198 if not configure_only:199 return config.make_wsgi_app()200 else:201 return returned202 return wrapper203 if factory_function is not None:204 if not callable(factory_function):205 raise TypeError("Factory function was specified but not callable")206 else:207 return decorator(factory_function)208 else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!