Best Python code snippet using ATX
Anomaly_Detection.py
Source:Anomaly_Detection.py
1import numpy as np2import pandas as pd3import pickle4import math5from sklearn.decomposition import PCA6from sklearn import preprocessing7import matplotlib.pyplot as plt8import matplotlib.patches as mpatches9from sklearn.preprocessing import StandardScaler10from scipy.spatial.distance import pdist, cdist, squareform11from matplotlib.colors import ListedColormap12from sklearn.model_selection import train_test_split13import os14import SODA15import data_manipulation as dm16import multiprocessing17from sklearn.utils.validation import check_array18import sys19#-------------------------Main Code--------------------------#20def calculate(func, args):21 result = func(*args)22 return result23def calculatestar(args):24 return calculate(*args)25def main():26 #------------------------------------------------------------#27 #-------------------Initiation Part--------------------------#28 ####### Variables set by user #######29 # PCA number of components30 N_PCs = 831 # List of granularities 32 gra_list = [13] 33 # Number of iteration34 iterations = 135 # Number of process to create in the multiprocessing step36 PROCESSES = 437 # Number of Data-set divisions38 total = 10000039 # Number of Data-set divisions40 windows = 10041 # Percentage of background samples on the testing phase42 background_percent = 9943 # Firstly the model loads the background and signal data, then it removes the 44 # attributes first string line, in order to avoid NaN values in the array.45 # Using multiprocess to load the data-sets into the code46 print(' ==== Commencing Initiation ====\n', file=open("log_file.txt", "a+"))47 ### Background 48 b_name='/AtlasDisk/user/pestana/Input/Input_Background_1.csv'49 #b_name='Input_Background_1.csv'50 background = np.genfromtxt(b_name, delimiter=',')51 background = background[1:,:]52 background, _ = dm.divide(background, windows, total)53 print(" .Background Loaded...", file=open("log_file.txt", "a"))54 ### Signal55 s_name='/AtlasDisk/user/pestana/Input/Input_Signal_1.csv'56 #s_name='Input_Signal_1.csv'57 signal = np.genfromtxt(s_name, delimiter=',')58 signal = signal[1:,:]59 print(" .Signal Loaded...", file=open("log_file.txt", "a"))60 print('\n ==== Initiation Complete ====\n', file=open("log_file.txt", "a"))61 print('=*='*17, file=open("log_file.txt", "a"))62 print(' ==== Commencing Data Processing ====', file=open("log_file.txt", "a"))63 with multiprocessing.Pool(PROCESSES) as pool:64 TASKS = [(model, (n_i,background,background_percent,signal,windows,N_PCs,gra_list, total)) for n_i in range(iterations)]65 print(' .Executing SODA for granularities', gra_list, file=open("log_file.txt", "a"))66 pool.map(calculatestar, TASKS)67def model(n_i,background,background_percent,signal,windows,N_PCs,gra_list, total):68 print('\n => Iteration Number', (n_i+1), file=open("log_file.txt", "a"))69 # Devide data-set into training and testing sub-sets70 print(' .Dividing training and testing sub-sets', file=open("log_file.txt", "a"))71 test_size = 0.372 test = int(total*test_size)73 b_test = int(test*background_percent/100)74 background_train, background_test = train_test_split(background, test_size=0.30, random_state=42)75 background_test, _ = dm.divide(background_test, windows, b_test)76 # Defining number of events Signal events on online phase.77 signal_online_samples = int(test - b_test)78 # Devide online signal79 print(' .Selecting Signal on the following porpotion:', file=open("log_file.txt", "a"))80 print(' .' + str(background_percent) + '% Background samples', file=open("log_file.txt", "a"))81 print(' .' + str(100-background_percent) + '% Signal samples', file=open("log_file.txt", "a"))82 print(' .{:9d} of Background samples (Offline)'.format(int(total*(1-test_size))), file=open("log_file.txt", "a"))83 print(' .{:9d} of Background samples (Online)'.format(int(b_test)), file=open("log_file.txt", "a"))84 print(' .{:9d} of Signal samples (Online)'.format(int(signal_online_samples)), file=open("log_file.txt", "a"))85 reduced_signal, signal_sample_id = dm.divide(signal, windows, signal_online_samples)86 # Nextly, the Signal data processed is saved in the Analised data directory.87 #np.savetxt('/AtlasDisk/user/pestana/Output/Analysed_Signal/Reduced_iteration_' + str(n_i) + '_' + s_name,reduced_signal,delimiter=',')88 #np.savetxt('/AtlasDisk/user/pestana/Output/Analysed_Signal/Reduced_ID_iteration_' + str(n_i) + '_' + s_name,signal_sample_id,delimiter=',')89 # Concatenating Signal and the Test Background sub-set90 streaming_data = np.concatenate((background_test,reduced_signal), axis=0)91 # Normalize Data92 norm_background_train = preprocessing.normalize(background_train)93 norm_streaming_data = preprocessing.normalize(streaming_data)94 #print(' .Normalizing Data', file=open("log_file.txt", "a"))95 # Calculates Statistical attributes96 print(' .Calculating statistical attributes', file=open("log_file.txt", "a"))97 xyz_streaming_data = dm.statistics_attributes(norm_streaming_data)98 xyz_background_train = dm.statistics_attributes(norm_background_train)99 #xyz_streaming_data = dm.statistics_attributes(streaming_data)100 #xyz_background_train = dm.statistics_attributes(background_train)101 #xyz_signal = dm.statistics_attributes(signal)102 #xyz_background = dm.statistics_attributes(background)103 104 #transformer = preprocessing.Normalizer().fit(np.vstack((xyz_signal,xyz_background)))105 106 #xyz_background = transformer.transform(xyz_background)107 #xyz_signal = transformer.transform(xyz_signal)108 #np.savetxt('xyz_reduced_signal_norm.csv',xyz_signal,delimiter=',')109 #np.savetxt('xyz_background_norm.csv',xyz_background,delimiter=',')110 # Normalize Features111 #print(' .Normalizing Features', file=open("log_file.txt", "a"))112 # Calculates PCA and projects the sub-sets 113 print(' .Calculating PCA:', file=open("log_file.txt", "a"))114 proj_xyz_background_train, proj_xyz_streaming_data, xyz_mantained_variation, xyz_attributes_influence = dm.PCA_Projection(xyz_background_train,xyz_streaming_data,N_PCs)115 #proj_xyz_background_train, proj_xyz_streaming_data, xyz_mantained_variation, xyz_attributes_influence = dm.PCA_Projection(norm_xyz_background_train,norm_xyz_streaming_data,N_PCs)116 # Plots PCA results117 print(' .Ploting PCA results', file=open("log_file.txt", "a"))118 dm.PCA_Analysis(xyz_mantained_variation,xyz_attributes_influence)119 #proj_xyz_background_train = preprocessing.normalize(proj_xyz_background_train)120 #proj_xyz_streaming_data = preprocessing.normalize(proj_xyz_streaming_data)121 122 for gra in gra_list:123 dm.SODA_Granularity_Iteration(proj_xyz_background_train,proj_xyz_streaming_data, gra,len(background_test),n_i)124 125 """126 print(' .Running SODA on base granularity', file=open("log_file.txt", "a"))127 dm.SODA_Granularity_Iteration(proj_xyz_background_train,proj_xyz_streaming_data, 1,len(background_test),n_i)128 print(' .Creating pool with %d processes:' % PROCESSES, file=open("log_file.txt", "a"))129 with multiprocessing.Pool(PROCESSES) as pool:130 TASKS = [(dm.SODA_Granularity_Iteration, (proj_xyz_background_train,proj_xyz_streaming_data, gra,len(background_test),n_i)) for gra in gra_list]131 print(' .Executing SODA for granularities', gra_list, file=open("log_file.txt", "a"))132 pool.map(calculatestar, TASKS)133 """134 135 136 print('\n ====Data Processing Complete====\n', file=open("log_file.txt", "a"))137 print('=*='*17, file=open("log_file.txt", "a"))138 139if __name__ == '__main__':140 multiprocessing.freeze_support()...
realtime_test.py
Source:realtime_test.py
1from __future__ import print_function2import time3try: # Python 24 import urlparse5except ModuleNotFoundError: # Python 36 from urllib import parse as urlparse7import threading8import json9import os.path10import webbrowser11from optparse import OptionParser12from . import webserver13from . import monitor14from . import background_test15from . import project16 17background_test.RunTests.instance = background_test.RunTests()18class RunAllTestsWhenAChangeHappens(object):19 20 21 def __init__(self, server):22 self.must_run = False23 self.server = server24 25 def start(self):26 self.must_run = True;27 self.thread = threading.Thread(target=self.run)28 self.thread.daemon = True;29 self.thread.start()30 31 def stop(self):32 self.must_run = False;33 34 def run(self):35 cwd = os.getcwd()36 paths = [os.path.join(cwd, x) for x in project.DIRECTORIES] 37 monitor_directories = monitor.MonitorDirectories(paths)38 monitor_directories.check()39 monitor_directories.changed = True40 while self.must_run:41 if monitor_directories.changed:42 if not background_test.RunTests.instance.can_start_a_test():43 monitor_directories.check()44 time.sleep(0.5)45 continue46 47 if not self.server.last_report is None:48 for element in monitor_directories.updated_elements:49 if not element.is_file():50 continue51 for x in self.server.last_report.address_to_report.values():52 path, module, testcase = x.address53 if path == element.path:54 x.reset_timing()55 print("will rerun: ", module, testcase)56 print("Changed files:")57 number_of_changed_files = 058 for element in monitor_directories.updated_elements:59 if not element.is_file():60 continue61 print(element.path)62 number_of_changed_files += 163 64 if number_of_changed_files > 0 or self.server.last_report is None:65 last_report = self.server.last_report66 67 self.server.set_last_report(None)68 69 report = background_test.RunTests.instance.run_tests(last_report)70 71 self.server.set_last_report(report)72 73 monitor_directories.check()74 else:75 time.sleep(0.5)76 monitor_directories.check()77 78 79 80class HandleRequest(webserver.HandleRequest):81 82 def do_start(self):83 self.server.restart_testrunner()84 string = 'null'85 content_type = 'text/javascript'86 return string, content_type87 88 89 def do_pause(self):90 self.server.stop_testrunner()91 return 'null', 'text/javascript' 92 93 def do_get_last_report(self):94 string = json.dumps(self.server.get_last_report_as_dict())95 content_type = 'text/javascript'96 return string, content_type97 def do_get_last_report_information(self):98 string = json.dumps(self.server.get_last_report_information())99 content_type = 'text/javascript'100 return string, content_type101 102 def do_run_test(self):103 parameters = urlparse.parse_qs(self.parsed_path.query)104 a0 = parameters['a0'][0]105 a1 = parameters['a1'][0]106 a2 = parameters['a2'][0]107 address = (a0, a1, a2)108 result = background_test.RunTests.instance.run_test_with_address(address)109 string = json.dumps(result)110 content_type = 'text/javascript'111 self.server.continue_testrunner()112 return string, content_type113 114 def index_file(self):115 base = os.path.split(__file__)[0]116 filename = os.path.join(base, "realtime_test.html")117 with open(filename, "r") as file:118 contents = file.read()119 return contents, 'text/html'120 121class ContinuosTestWebServer(webserver.WebServer):122 123 def __init__(self, port):124 webserver.WebServer.__init__(self, port, HandleRequest)125 self.last_report = None126 self.run_all_tests = RunAllTestsWhenAChangeHappens(self)127 self.run_all_tests.start()128 self.report_id = 0129 130 131 def stop(self):132 self.run_all_tests.stop()133 self.shutdown()134 135 def restart_testrunner(self):136 self.run_all_tests.stop()137 self.last_report = None138 self.run_all_tests = RunAllTestsWhenAChangeHappens(self)139 self.run_all_tests.start()140 141 def stop_testrunner(self):142 self.run_all_tests.stop() 143 144 def continue_testrunner(self):145 if not self.run_all_tests.must_run:146 self.restart_testrunner()147 148 def get_last_report_as_dict(self):149 if self.last_report is None:150 return None151 else:152 result = self.last_report.to_dict()153 return result154 155 def get_last_report_information(self):156 if self.last_report is None:157 result = self.get_live_report_info()158 else:159 result = self.last_report.to_information_dict()160 result['reports'] = self.get_live_reports()161 return result162 163 def get_live_reports(self):164 return background_test.RunTests.instance.get_reports()165 166 def get_live_report_info(self):167 return background_test.RunTests.instance.report_info168 169 def set_last_report(self, report):170 self.last_report = report171 if not report is None:172 self.report_id += 1173 self.last_report.report_id = self.report_id174 175def start_browser(serverport):176 time.sleep(2.0)177 webbrowser.open("http://localhost:{0}/".format(serverport))178 179if __name__ == '__main__':180 parser = OptionParser() 181 182 183 parser.add_option("-p", "--port", 184 dest="serverport",185 help="start serving on PORT", 186 metavar="PORT", 187 default=9070,188 type="int")189 190 parser.add_option("-e", "--editor", 191 dest="editor",192 help="preferred EDITOR for editing the files", 193 metavar="EDITOR", 194 default="geany",195 type="string")196 parser.add_option("-b", "--browser", 197 dest="startbrowser",198 help="automatically start a browser", 199 metavar="PORT", 200 default="yes",201 type="string")202 203 (options, args) = parser.parse_args()204 print("starting server on port: ", options.serverport)205 print("will use editor: ", options.editor)206 webserver.EDITOR = options.editor207 208 if options.startbrowser == "yes":209 thread = threading.Thread(target = start_browser, args = (options.serverport,))210 thread.start()211 212 server = ContinuosTestWebServer(options.serverport)213 server.start()...
pca.py
Source:pca.py
1#!/usr/bin/env python2"""3============================4Principal Component Analysis5============================6Principal Component Analysis (PCA) applied to this data identifies the7combination of attributes (principal components, or directions in the feature8space) that account for the most variance in the data.9"""10print __doc__11import numpy as np12import pylab as pl13from matplotlib.ticker import NullFormatter14from sklearn.decomposition import PCA15from sklearn import svm16import samples17import features18def perform_pca(channel):19 signals, backgrounds = samples.get_samples(channel, purpose='train')20 if channel == '01jet':21 branches = features.hh_01jet_vars22 else:23 branches = features.hh_2jet_vars24 X_train, X_test,\25 w_train, w_test,\26 y_train, y_test = samples.make_classification(27 *(samples.make_train_test(signals, backgrounds,28 branches=branches,29 train_fraction=.5,30 max_sig_train=2000,31 max_bkg_train=2000,32 max_sig_test=2000,33 max_bkg_test=2000,34 same_size_train=True,35 same_size_test=True,36 norm_sig_to_bkg_train=True,37 norm_sig_to_bkg_test=True)),38 standardize=True)39 print X_train40 print X_test41 print w_train42 print w_test43 print w_train.min(), w_train.max()44 pca = PCA(n_components=2)45 # fit only on background46 pca.fit(X_train[y_train == 0])47 X_train_pca = pca.transform(X_train)48 X_test_pca = pca.transform(X_test)49 xmin = X_test_pca[:, 0].min()50 xmax = X_test_pca[:, 0].max()51 ymin = X_test_pca[:, 1].min()52 ymax = X_test_pca[:, 1].max()53 width = xmax - xmin54 height = ymax - ymin55 xmin -= width*.156 xmax += width*.157 ymin -= height*.158 ymax += height*.159 # fit support vector machine on output of PCA60 clf = svm.SVC(C=100, gamma=.01, probability=True, scale_C=True)61 clf.fit(X_train_pca, y_train, sample_weight=w_train)62 # plot the decision function63 xx, yy = np.meshgrid(np.linspace(xmin, xmax, 500), np.linspace(ymin, ymax, 500))64 Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])65 Z = Z.reshape(xx.shape)66 channel_name = samples.CHANNEL_NAMES[channel]67 target_names = ['%s Signal' % channel_name,68 '%s Background' % channel_name]69 target_values = [1, 0]70 # Percentage of variance explained for each components71 print 'explained variance ratio (first two components):', \72 pca.explained_variance_ratio_73 # plot PCA and SVM output74 pl.figure()75 # plot support vector machine decision function76 pl.set_cmap(pl.cm.jet)77 pl.contourf(xx, yy, Z, alpha=0.75)78 for c, i, target_name in zip("rb", target_values, target_names):79 pl.scatter(X_test_pca[y_test == i, 0], X_test_pca[y_test == i, 1],80 c=c, label=target_name,81 s=w_test[y_test == i]*10,82 alpha=0.9)83 pl.xlim((xmin, xmax))84 pl.ylim((ymin, ymax))85 pl.legend()86 pl.xlabel('Principal Component [arb. units]')87 pl.ylabel('Secondary Component [arb. units]')88 pl.title('Principal Component Analysis\n'89 'and Support Vector Machine Decision Function')90 pl.savefig('pca_%s.png' % channel)91 # testing:92 signals, backgrounds = samples.get_samples(channel, purpose='test',93 mass=125)94 signal_train, signal_weight_train, \95 signal_test, signal_weight_test, \96 background_train, background_weight_train, \97 background_test, background_weight_test = samples.make_train_test(98 signals, backgrounds,99 branches=branches,100 train_fraction=.5,101 norm_sig_to_bkg_train=False,102 norm_sig_to_bkg_test=False)103 sample_test = np.concatenate((background_test, signal_test))104 sample_test = samples.std(sample_test)105 background_test, signal_test = sample_test[:len(background_test)], \106 sample_test[len(background_test):]107 signal_test = pca.transform(signal_test)108 background_test = pca.transform(background_test)109 pl.figure()110 pl.hist(clf.predict_proba(background_test)[:,-1],111 weights=background_weight_test, bins=30, range=(0, 1),112 label='Background', color='b')113 pl.hist(clf.predict_proba(signal_test)[:,-1],114 weights=signal_weight_test*10, bins=30, range=(0, 1),115 label='Signal x 10', color='r')116 pl.legend()117 pl.ylabel('Events')118 pl.xlabel('Support Vector Machine Signal Probability')119 pl.savefig('pca_svm_score_%s.png' % channel)120if __name__ == '__main__':121 perform_pca('2jet')...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!