Best Python code snippet using uiautomator
analysis-old.py
Source:analysis-old.py
1from __future__ import division2import pandas as pd3import numpy as np4import random, os5import matplotlib6matplotlib.use('Agg')7import matplotlib.pyplot as plt8from collections import defaultdict, OrderedDict, Counter9import logging10# original 250-test.dat size is 95,000,000 rows11# head -1000000 250-test.dat > test-data-set.dat12# tail -1000000 250-test.dat >> test-data-set.dat13# head -1000000 control1.dat > test-control-set.dat14# tail -1000000 control1.dat >> test-control-set.dat15# when running in /data/users/sarthak it takes less than a min to load full16DATASET = '250-test.dat' #'test-data-set.dat'17CONTROLSET = 'control1.dat' #'test-control-set.dat'18DATALABEL = DATASET.split('.')[0]19CONTROLLABEL = CONTROLSET.split('.')[0]20NDEVICES = 100021LIST_OF_CONTROLLABELS = ['control5']22#LIST_OF_CONTROLLABELS = ['control'+str(l) for l in range(1,9)]23#LIST_OF_CONTROLLABELS.remove('control3') #causes errors unslicable24DATAPATH = '../data/'25PROCESSEDPATH = '../processed/'26OUTPUTPATH = '../output/'27header_row = ['Device_number',28 'end_time',29 'date_service_created',30 'service_class_name',31 'cmts_inet',32 'service_direction',33 'port_name',34 'octets_passed',35 'device_key',36 'service_identifier']37logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')38logger = logging.getLogger()39logger.setLevel('DEBUG')40#############################################################################41# load dataframes42#############################################################################43def load_csv(ds=DATASET, cs=CONTROLSET):44 df1 = pd.read_csv(DATAPATH + ds, delimiter='|', names=header_row)45 df2 = pd.read_csv(DATAPATH + cs, delimiter='|', names=header_row)46 return df1, df247def filter_csv(df, dfname):48 df_up = df[df['service_direction'] == 2][['Device_number', 'end_time',49 'octets_passed', 'service_class_name']]50 df_dw = df[df['service_direction'] == 1][['Device_number', 'end_time',51 'octets_passed', 'service_class_name']]52 df_up.to_pickle(PROCESSEDPATH+dfname+'_up.pkl')53 df_dw.to_pickle(PROCESSEDPATH+dfname+'_dw.pkl')54 return55def load_dataframe(dfname):56 df_up = pd.read_pickle(PROCESSEDPATH+dfname+'_up.pkl')57 df_dw = pd.read_pickle(PROCESSEDPATH+dfname+'_dw.pkl')58 return df_up, df_dw59def save_dfs():60 df1, df2 = load_csv()61 filter_csv(df1, DATALABEL)62 filter_csv(df2, CONTROLLABEL)63 return64#############################################################################65# Helper func66#############################################################################67def getSortedCDF(data):68 """69 return x,y for a cdf given one dimensional unsorted data x70 """71 sorted_data = np.sort( data )72 YSCALE = len(sorted_data)73 yvals = [y/YSCALE for y in range( len(sorted_data) )]74 return sorted_data, yvals75#############################################################################76# Analysis77#############################################################################78class DataProcessor:79 def __init__(self, DFNAME):80 self.dfname = DFNAME81 return82 def set_dfname(self, DFNAME):83 self.dfname = DFNAME84 return85 def load_csv(self):86 return pd.read_csv(self.dfname + '.dat', delimiter='|',87 names=header_row)88 def splitDateTime(self, df_temp):89 """90 dataframes are passed by reference by default.91 """92 df_temp['date'] = df_temp['end_time'].apply(lambda x:93 x.split(" ")[0])94 df_temp['time'] = df_temp['end_time'].apply(lambda x:95 x.split(" ")[1])96 return97 def save_pkl(self):98 df = self.load_csv()99 df_up = df[df['service_direction'] == 2][['Device_number', 'end_time',100 'octets_passed']]101 df_dw = df[df['service_direction'] == 1][['Device_number', 'end_time',102 'octets_passed']]103 self.splitDateTime(df_up)104 self.splitDateTime(df_dw)105 df_up.to_pickle(PROCESSEDPATH + self.dfname + '_up.pkl')106 df_dw.to_pickle(PROCESSEDPATH + self.dfname + '_dw.pkl')107 return df_up, df_dw108##############################################################################109def meta_data():110 listDS = ['test-data-set', 'test-control-set', '250-test', 'control1',111 'control2', 'control3' ,'control4', 'control5', 'control6',112 'control7', 'control8']113 attrib = defaultdict(list)114 for dfname in listDS:115 for direction in ['up', 'dw']:116 logger.info('load dataframe ' + dfname + '_' + direction)117 df = pd.read_pickle(PROCESSEDPATH + dfname +'_'+ direction +'.pkl')118 attrib['name'].append(dfname)119 attrib['direction'].append(direction)120 attrib['len'].append(len(df))121 df.sort('end_time', ascending=True)122 attrib['date_start'].append(df['end_time'].iloc[0])123 attrib['date_end'].append(df['end_time'].iloc[-1])124 devices = np.sort(df['Device_number'].unique())125 attrib['num_devices'].append(len(devices))126 attrib['device_start'].append(devices[0])127 attrib['device_end'].append(devices[-1])128 attrib['max_bytes'].append(df['octets_passed'].max())129 attrib['mean_bytes'].append(df['octets_passed'].mean())130 attrib['median_bytes'].append(df['octets_passed'].median())131 attrib['perc90_bytes'].append( np.percentile( df['octets_passed'],132 90) )133 logger.debug(pd.DataFrame(attrib))134 df_attributes = pd.DataFrame(attrib)[['name', 'direction', 'len',135 'date_start', 'date_end',136 'num_devices', 'device_start',137 'device_end', 'max_bytes',138 'mean_bytes', 'median_bytes',139 'perc90_bytes']]140 df_attributes.to_pickle(OUTPUTPATH + 'data_attributes.pkl')141 return df_attributes142##############################################################################143# SLICE data144# 1000 devices, 9/30 -- 12/25145##############################################################################146def get_dates(df):147 """148 given df, sort dates and get date_start and date_end149 """150 df.sort('end_time', ascending=True)151 date_start = df['end_time'].iloc[0]152 date_end = df['end_time'].iloc[-1]153 return date_start, date_end154def slice_df(df, num_devices, date_start, date_end):155 """156 num_devices = 1000157 date_start = 2014-09-30 00:00:00158 date_end = 2014-12-25 20:45:00159 """160 df2 = df[ (df['end_time'] >= date_start) & (df['end_time'] <= date_end) ]161 devices = df2['Device_number'].unique()162 sliced_devices = random.sample(devices, num_devices)163 df3 = df2[ df2['Device_number'].isin(sliced_devices) ]164 return df3165def slicer(ControlSet, DATA_up, DATA_dw, ndev):166 CONTROL_up, CONTROL_dw = load_dataframe(ControlSet)167 # get date_start, date_end168 date_start, date_stop = get_dates(CONTROL_up)169 # slice170 logger.debug(ControlSet + " ndev=" + str(ndev) + " control set unique: "\171 + str(len( CONTROL_up['Device_number'].unique() )) + " data set unique: "\172 + str(len( DATA_up['Device_number'].unique() )) )173 CUP_sliced = slice_df(CONTROL_up, ndev, date_start, date_stop)174 DUP_sliced = slice_df(DATA_up, ndev, date_start, date_stop)175 #logger.debug("sliced UP sets ")176 # get date_start, date_end177 date_start, date_stop = get_dates(CONTROL_dw)178 # slice179 logger.debug(ControlSet + " ndev=" + str(ndev) + " control set unique: "\180 + str(len( CONTROL_dw['Device_number'].unique() )) + " data set unique: "\181 + str(len( DATA_dw['Device_number'].unique() )) )182 CDW_sliced = slice_df(CONTROL_dw, ndev, date_start, date_stop)183 DDW_sliced = slice_df(DATA_dw, ndev, date_start, date_stop)184 # return name of outputfolder185 outputlabel = ControlSet + '_' + date_start.replace(" ","_") \186 + '_' + date_stop.replace(" ","_")187 #logger.debug("sliced DW sets ")188 return DUP_sliced, DDW_sliced, CUP_sliced, CDW_sliced, outputlabel189##############################################################################190class Plotter:191 def __init__(self, dlabel=DATALABEL, clabel=CONTROLLABEL, XSCALE='log'):192 self.test_up = 0193 self.test_dw = 0194 self.control_up = 0195 self.control_dw = 0196 self.DLABEL = dlabel197 self.CLABEL = clabel198 self.DISPLAY_MEAN = 1199 self.OUTPUTPATH = OUTPUTPATH200 self.XSCALE = XSCALE201 return202 def load_df(self):203 self.test_up, self.test_dw = load_dataframe(self.DLABEL)204 self.control_up, self.control_dw = load_dataframe(self.CLABEL)205 return206 def import_df(self, test_up, test_dw, control_up, control_dw):207 self.test_up = test_up208 self.test_dw = test_dw209 self.control_up = control_up210 self.control_dw = control_dw211 self.ALL_DF = {'test_up': self.test_up,212 'test_dw': self.test_dw,213 'control_up': self.control_up,214 'control_dw': self.control_dw}215 self.grouped = defaultdict(int)216 return217 def allBytes(self, df):218 return df219 def nonZeroBytes(self, df):220 return df[df['octets_passed'] > 0]221 def thresholdBytes(self, df, threshold=0):222 """223 return df, xlabel, filename224 """225 dfr = df[df['octets_passed'] > threshold]226 return dfr, "Bytes > "+str(threshold),"CDF-ThresholdBytes-"+str(threshold)227 def getStringsFromGroupBy(self, GROUPBY):228 """229 eg: input = ["Device_number", "time"]230 returns:231 xlab = " Bytes per Device-number per Time"232 filename = "Bytes_Device-number_Time"233 """234 xlab = " Bytes "235 filename = "Bytes"236 for col in GROUPBY:237 col2 = col.capitalize().replace("_", "-")238 xlab += " per "+col2239 filename += "_"+col2240 logger.debug("xlab, filename = "+xlab+"; "+filename)241 return xlab, filename242 def grouper(self, GROUPBY):243 """244 save df_grouped['octets_passed'] by ['Device_number'],245 ['Device_number', 'date'], ['Device_number', 'time'], ['end_time']246 """247 logger.debug("group ALL_DF according to "+str(GROUPBY))248 for name, df in self.ALL_DF.items():249 self.grouped[name] = df.groupby(GROUPBY, as_index=False)['octets_passed']250 logger.debug( "Group "+name+" by "+str(GROUPBY)+" len: "+str( len(self.grouped[name].first()) ) )251 return252 def getByteSeries(self, method, **kwargs):253 """254 for each sliced set255 method = all: series = df['octets_passed']256 method = threshold: series = thresholdBytes['octets_passed']257 method = peak, group = device: series = getGroupedByteStats(grouped).max()258 """259 self.byte_series = defaultdict(int)260 logger.debug("enter getByteSeries; method = "+method)261 if method=='all':262 #self.byte_series = self.ALL_DF263 for name, df in self.ALL_DF.items():264 self.byte_series[name] = self.allBytes(df)['octets_passed']265 elif method=='nonZero':266 for name, df in self.ALL_DF.items():267 self.byte_series[name] = self.nonZeroBytes(df)['octets_passed']268 elif method=='threshold':269 for name, df in self.ALL_DF.items():270 self.byte_series[name] = self.thresholdBytes(df, kwargs['threshold'])['octets_passed']271 elif method=='Peak':272 for name, gr in self.grouped.items():273 self.byte_series[name] = gr.max()['octets_passed']274 #logger.debug("name = "+name+"; max series len = "+ str(len(self.byte_series[name])))275 elif method=='Median':276 for name, gr in self.grouped.items():277 self.byte_series[name] = gr.median()['octets_passed']278 elif method=='90perc':279 for name, gr in self.grouped.items():280 #self.byte_series[name] = gr.quantile(.90)['octets_passed']281 self.byte_series[name] = gr.apply(lambda x: np.percentile(x, 90))282 else:283 logger.error("Unknown method = "+method)284 return285 def CDFPlotter(self, xlabel, figname):286 """287 given self BYTES[ ]: test_up/dw control_up/dw288 just plot in CDF289 """290 fig1, ax1 = plt.subplots(2,1, figsize=(8,10))291 labels = iter([self.DLABEL+'_up',self.CLABEL+'_up',self.DLABEL+'_dw',292 self.CLABEL+'_dw'])293 colors = iter(['b', 'g', 'b', 'g'])294 axis_ctr = iter([0,0,1,1])295 markers = iter(['o','x','o','x'])296 #logger.debug("byte_series.items() " + str(self.byte_series.items()))297 for name, series in self.byte_series.items():298 x, y = getSortedCDF(list(series))299 lab = next(labels)300 if self.DISPLAY_MEAN:301 calcMean = '%.2E' % np.mean(list(series))302 lab += ': '+ calcMean303 ax1[next(axis_ctr)].plot(x,y, color=next(colors), label=lab,304 marker=markers.next(), markevery=len(y)/10)305 ax1[0].set_xscale(self.XSCALE)306 ax1[1].set_xscale(self.XSCALE)307 ax1[1].set_xlabel(xlabel)308 ax1[0].set_ylabel("Normalized CDF UPLINK")309 ax1[1].set_ylabel("Normalized CDF DOWNLINK")310 ax1[0].grid(1)311 ax1[1].grid(1)312 ax1[0].legend(loc='best')313 ax1[1].legend(loc='best')314 fig1.tight_layout()315 fig1.savefig(self.OUTPUTPATH+figname)316 return317 def setOutputPath(self, outputlabel):318 # new outputpath by control-set and date319 logger.info("Plots stored in " + outputlabel)320 self.OUTPUTPATH = 'output/' + outputlabel + '/'321 if not os.path.exists(self.OUTPUTPATH):322 logger.debug("MakeDir " + self.OUTPUTPATH)323 os.makedirs(self.OUTPUTPATH)324 return325 def plotAll(self):326 """327 main method328 - Has sliced self.test_up, self.test_dw, self.control_up, self.control_dw329 - Plot330 # allBytes and nonZero then CDF all stats331 # group by dev [dev, dev-day, dev-time] then CDF all stats332 # group by [datetime] then CDF all stats333 # group by [datetime] then timeseries plot (x, y)334 """335 # allBytes and nonZero then CDF all stats336 self.getByteSeries('all')337 xlabel = 'All Bytes Seen'338 figname = 'CDF-allBytes'339 self.CDFPlotter(xlabel, figname)340 logger.debug("draw all CDF")341 self.getByteSeries('nonZero')342 xlabel = 'Bytes > 0'343 figname = 'CDF-nonZeroBytes'344 self.CDFPlotter(xlabel, figname)345 logger.debug("draw nonZero CDF")346 # group by dev [dev, dev-day, dev-time] then CDF all stats347 # group by [datetime] then CDF all stats348 for group_by in [['Device_number'], ['Device_number', 'date'],349 ['Device_number', 'time'], ['end_time']]:350 # store each group in self.grouped351 self.grouper(group_by)352 xlab, filename = self.getStringsFromGroupBy(group_by)353 for stats in ['Peak', 'Median', '90perc']:354 self.getByteSeries(stats)355 xlabel = stats + xlab356 figname = 'CDF-'+filename+'-'+stats357 self.CDFPlotter(xlabel, figname)358 logger.debug("draw CDF - Bytes per "+str(group_by)+" "+stats)359 # for the last groupby ['end_time']360 # group by [datetime] then timeseries plot (x, y)361 GROUPBY = ['end_time']362 xlab, filename = self.getStringsFromGroupBy(GROUPBY)363 for name, df in self.ALL_DF.items():364 self.grouped[name] = df.groupby(GROUPBY)365 for stats in ['Peak', 'Median', '90perc']:366 #return self.grouped367 if stats=='Peak':368 for name, df in self.grouped.items():369 self.byte_series[name] = df['octets_passed'].apply(lambda x: max(x))370 elif stats=='Median':371 for name, df in self.grouped.items():372 self.byte_series[name] = df['octets_passed'].apply(lambda x: np.median(x))373 elif stats=='90perc':374 for name, df in self.grouped.items():375 self.byte_series[name] = df['octets_passed'].apply(lambda x: np.percentile(x, 90))376 else:377 logging.error("invalid stat, self.byte_series may be corrupted")378 #self.getByteSeries(stats)379 ylabel = stats + xlab380 figname = 'TimeSeries-'+filename+'-'+stats381 self.SeriesPlotter(ylabel, figname)382 return383 def SeriesPlotter(self, ylabel, figname):384 """385 given self BYTES[ ]: test_up/dw control_up/dw386 just plot in CDF387 """388 logger.debug("enter seriesplotter")389 fig1, ax1 = plt.subplots(2,1, figsize=(18,10))390 labels = iter([self.DLABEL+'_up',self.CLABEL+'_up',self.DLABEL+'_dw',391 self.CLABEL+'_dw'])392 colors = iter(['b', 'g', 'b', 'g'])393 axis_ctr = iter([0,0,1,1])394 markers = iter(['o','d','o','d'])395 #logger.debug("byte_series.items() " + str(self.byte_series.items()))396 for name, series in self.byte_series.items():397 x = pd.to_datetime(series.index)398 y = series.values399 lab = next(labels)400 if self.DISPLAY_MEAN:401 calcMean = '%.2E' % np.mean(list(series))402 lab += ': '+ calcMean403 ax1[next(axis_ctr)].plot_date(x,y, color=next(colors), label=lab,404 alpha=0.5, marker=markers.next())#,405 #marker=markers.next(), markevery=len(y)/10)406 #ax1[0].set_xscale(self.XSCALE)407 #ax1[1].set_xscale(self.XSCALE)408 ax1[1].set_xlabel("DateTime")409 ax1[0].set_ylabel(ylabel + " Bytes UPLINK")410 ax1[1].set_ylabel(ylabel + " Byes DOWNLINK")411 ax1[0].grid(1)412 ax1[1].grid(1)413 ax1[0].legend(loc='best')414 ax1[1].legend(loc='best')415 fig1.tight_layout()416 fig1.savefig(self.OUTPUTPATH+figname)417 return418##############################################################################419# MAIN methods420##############################################################################421#DATASET = 'test-data-set.dat'422#CONTROLSET = 'test-control-set.dat'423#DATALABEL = 'test-data-set'424#LIST_OF_CONTROLLABELS = ['test-control-set']425NDEVICES = 1000426def main2(ndev=NDEVICES, DataSet=DATALABEL, ControlSets=LIST_OF_CONTROLLABELS, method='sum'):427 logger.info("NDEVICES "+str(ndev)+" DateSet "+DataSet+" ControlSets "+str(ControlSets))428 # Plotter Class Object429 p = Plotter()430 # load test set up and down431 DATA_up, DATA_dw = load_dataframe(DataSet)432 # for control set in [control1 - control8]433 for ControlSet in ControlSets:434 # load test and control up and dw sets435 test_up, test_dw, control_up, control_dw, outputlabel = slicer(ControlSet, DATA_up,436 DATA_dw, ndev)437 test_up = process_df(test_up, method)438 test_dw = process_df(test_up, method)439 control_up = process_df(control_up, method)440 control_dw = process_df(control_up, method)441 outputlabel = method.upper() + '/' + outputlabel442 p.setOutputPath(outputlabel)443 p.import_df(test_up, test_dw, control_up, control_dw)444 p.DLABEL = DataSet445 p.CLABEL = ControlSet446 x = p.plotAll()447 logger.info("Done "+ControlSet)448 return p, x449def process_df(df, method='sum'):450 g = df.groupby(['Device_number', 'end_time', 'date', 'time'], as_index=False)451 if method=='sum':452 return g.sum()453 elif method=='max':454 return g.max()455 elif method=='wifi':456 return df[df['service_class_name'] == 'xwifi_dn' | df['service_class_name'] == 'xwifi_up']457def plotByDateRange(ndev=NDEVICES, DataSet=DATALABEL, ControlSets=LIST_OF_CONTROLLABELS, method='sum'):458 """459 load test set up and down460 for control set in [control1 - control8]461 load control set up and down and get date_start, date_end462 slice control set to 1000 devices,463 slice test set to 1000 devices, date_start, date_end464 plot CDFs465 """466 # Plotter Class Object467 p = Plotter()468 # load test set up and down469 DATA_up, DATA_dw = load_dataframe(DataSet)470 # for control set in [control1 - control8]471 for ControlSet in ControlSets:472 # load test and control up and dw sets473 test_up, test_dw, control_up, control_dw = slicer(ControlSet, DATA_up,474 DATA_dw)475 test_up = process_df(test_up)476 test_dw = process_df(test_up)477 control_up = process_df(test_up)478 control_dw = process_df(test_up)479 p.import_df(test_up, test_dw, control_up, control_dw)480 p.DLABEL = DataSet481 p.CLABEL = ControlSet482 logger.debug("loaded sliced sets " + DataSet + " " + ControlSet )483 # new outputpath by control-set and date484 outputlabel = method.upper() + '/' + ControlSet + '_' + date_start.replace(" ","_") \485 + '_' + date_stop.replace(" ","_")486 p.setOutputPath(outputlabel)487 # allBytes and nonZeroBytes488 p.CDFPlotter(p.allBytes, '')489 p.CDFPlotter(p.thresholdBytes, 0)490 # max, median, 90 percentile491 for method in ['Peak', 'Median', '90perc']:492 p.CDFPlotter(p.getBytesPerDevice, method)493 logger.debug("Bytes per Device method = " + method)494 p.CDFPlotter(p.getBytesPerDevicePerDay, method)495 logger.debug("Bytes per Device per Day method = " + method)496 p.CDFPlotter(p.getBytesPerDevicePerTimeSlot, method)497 logger.debug("Bytes per Device per Time method = " + method)498 logger.debug("DONE control: " + ControlSet + "; test: " + DataSet)499 return #DUP_sliced, DDW_sliced, CUP_sliced, CDW_sliced500def detailedTimeSeries(ds, cs):501 df1, df2 = load_csv(ds, cs)502 test_up, test_dw = filter_csv(df1, ds.split('.')[0])503 control_up, control_dw = filter_csv(df2, cs.split('.')[0])504 return test_up, test_dw, control_up, control_dw505def main():506 p = Plotter()507 p.load_df()508 p.splitDateTime()509 p.CDFPlotter(p.allBytes)510 p.CDFPlotter(p.nonZeroBytes)511 p.CDFPlotter(p.peakBytesPerDevice)512 p.CDFPlotter(p.peakBytesPerDevicePerDay)513 p.CDFPlotter(p.peakBytesPerDevicePerTimeSlot)...
app.py
Source:app.py
1from flask import Flask2import config3import models4from database import init_database, db_insert, db_delete5def create_app():6 # Flask7 app = Flask(__name__)8 # config9 app.config.update(config.cf)10 # DBåæå11 init_database(app)12 return app13app = create_app()14#--------------------------------------#15# ãã¼ã¿ãã¼ã¹åç
§ï¼viewï¼16#--------------------------------------#17@app.route('/')18def index():19 html_list = ""20 list_Tests = models.Test.query.all()21 html_list += "<table border><tr><th>Id</th><th>Data</th></tr>\n"22 for i in list_Tests:23 html_list += "<tr><th>%s</th><td>%s</td></tr>\n" % (str(i.id), i.data)24 html_list += "</table>\n"25 return "Hello Flask<br>"+html_list26#--------------------------------------#27# ä¸è¡è¿½å ï¼insertï¼28#--------------------------------------#29@app.route('/insert/<param>')30def insert(param):31 test_new = models.Test(data=param)32 db_insert(test_new)33 return "insert: %s" % (param)34#--------------------------------------#35# ä¸è¡å¤æ´ï¼updateï¼36#--------------------------------------#37@app.route('/update/<id>/<param>')38def update(id, param):39 test_up = models.Test.query.filter_by(id=id).first()40 test_up.data=param41 db_insert(test_up)42 return "update(id:%s): to %s" % (id, param)43#--------------------------------------#44# ä¸è¡åé¤ï¼deleteï¼45#--------------------------------------#46@app.route('/delete/<id>')47def delete(id):48 test_del = models.Test.query.filter_by(id=id).first()49 db_delete(test_del)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!