Best Python code snippet using slash
spectral_recorder.py
Source:spectral_recorder.py
1#!/usr/bin/env python2#3#4# This program is free software: you can redistribute it and/or modify5# it under the terms of the GNU General Public License as published by6# the Free Software Foundation, either version 3 of the License, or7# (at your option) any later version.8# 9# This program is distributed in the hope that it will be useful,10# but WITHOUT ANY WARRANTY; without even the implied warranty of11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the12# GNU General Public License for more details.13# 14# You should have received a copy of the GNU General Public License15# along with this program. If not, see <http://www.gnu.org/licenses/>.16import array17import struct18import sys19import time20import os21import json22from subprocess import call23from operator import itemgetter24from sklearn import preprocessing25import numpy as np26import matplotlib27matplotlib.use('TkAgg')28import matplotlib.pyplot as plt29import numpy as np30class SpectralRecorder:31 phy = "phy0"32 dev = "wlan0"33 drv= "ath9k"34 mode="manual" #works35 fft_period=1536 spectral_count=10037 spectral_period=138 short_repeat=139 f0=2437e640 def __init__(self, phy="self.phy0", dev="wlan0", drv="ath9k",41 mode="manual", fft_period=15, spectral_count=100, spectral_period=1,short_repeat=1,42 load=False,offline=True,freq=2437e6):43 self.phy = self.phy44 self.dev = self.dev45 self.drv = drv46 self.mode = mode47 self.fft_period = fft_period48 self.spectral_count = spectral_count49 self.spectral_period = spectral_period50 self.short_repeat = short_repeat51 self.f0 = freq #optional useful only if load is True52 #set spectral parameters53 if offline==False:54 self.set_spectral_params()55 if load:56 self.load_monitor()57 def get_spectral_params(self):58 return self.fft_period,self.spectral_count,self.spectral_period,self.short_repeat59 def load_monitor(self):60 #call("bash build.sh --load-module",shell=True)61 call("ifconfig "+self.dev+" down",shell=True)62 call("iwconfig "+self.dev+" mode monitor",shell=True)63 call("ifconfig "+self.dev+" up",shell=True)64 call("iwconfig "+self.dev+" freq "+str(self.f0),shell=True)65 def set_spectral_params(self):66 call("echo "+ self.mode + " > /sys/kernel/debug/ieee80211/" +self.phy+ "/"+ self.drv +"/spectral_scan_ctl", shell=True)67 call("echo "+ str(self.short_repeat) + " > /sys/kernel/debug/ieee80211/" +self.phy+ "/"+ self.drv +"/spectral_short_repeat",shell=True)68 call("echo "+ str(self.fft_period) + " > /sys/kernel/debug/ieee80211/" +self.phy+ "/"+ self.drv +"/spectral_fft_period",shell=True)69 call("echo "+ str(self.spectral_count-1) + " > /sys/kernel/debug/ieee80211/" +self.phy+ "/"+ self.drv +"/spectral_count",shell=True)70 call("echo "+ str(self.spectral_period) + " > /sys/kernel/debug/ieee80211/" +self.phy+ "/"+ self.drv +"/spectral_period",shell=True)71 def acquire(self,filename="data",T_acquire=1,T=0.1):72 73 call("cat /sys/kernel/debug/ieee80211/" + self.phy + "/"+ self.drv +"/spectral_scan0 > {}".format(filename), shell=True)74 time.sleep(T)75 t0=time.time()76 t0_a=t077 now=t078 now_a=t079 while now_a-t0_a < T_acquire:80 now_a=time.time()81 while now-t0 < T:82 now=time.time()83 call("echo trigger > /sys/kernel/debug/ieee80211/" + self.phy + "/"+ self.drv +"/spectral_scan_ctl", shell=True)84 now=time.time()85 t0=now86 call("cat /sys/kernel/debug/ieee80211/" + self.phy + "/"+ self.drv +"/spectral_scan0 >> {}".format(filename), shell=True)87 #print(now_a-t0_a)88 def fix_timestamp_dict(self,samp_dict,T=50e3):89 ret=samp_dict90 timestamp=[v['tsf'] for v in samp_dict]91 timestamp=np.array(timestamp)-timestamp[0]92 decr=093 if len(timestamp)!=0:94 tsf_diff=[]95 for i_t in range(0,len(timestamp)-1):96 t_=timestamp[i_t]97 t=timestamp[i_t+1]98 dt=t-t_99 print("t[{}]={}".format(i_t,t))100 print("t[{}]_={}".format(i_t,t_))101 if abs(dt) > T:102 #EMERGE ERROR, TSF chagnes103 print("[E] t[{}]={}".format(i_t,t))104 print("[E] t_[{}]={}".format(i_t,t_))105 ret.pop(i_t+1)106 decr=decr+1107 else:108 tsf_diff.append(dt)109 tsf_new=sum(tsf_diff)110 ret[i_t-decr]['tsf']=tsf_new111 else:112 print("timestamp is empty")113 ret.pop(len(ret)-1)114 return ret115 def fix_timestamp(self,timestamp,busy,x,T=50e3):116 if len(timestamp)!=0:117 timestamp=np.array(timestamp)-timestamp[0]118 tsf_diff=[]119 for i_t in range(0,len(timestamp)-1):120 t_=timestamp[i_t]121 t=timestamp[i_t+1]122 dt=t-t_123 #print("t={}".format(t))124 #print("t_={}".format(t_))125 if abs(dt) > T:126 #EMERGE ERROR, TSF chagnes127 print("t={}".format(t))128 print("t_={}".format(t_))129 busy.pop(i_t)130 x.pop(i_t)131 else:132 tsf_diff.append(dt)133 timestamp=np.cumsum(tsf_diff)134 busy=busy[0:len(timestamp)]135 else:136 print ("timestamp is empty")137 return timestamp,busy,x138 def extract_samples(self,filename="data",out_file="out_samp.json",T=-1):139 y = []140 p_fft=[]141 freq_fft=[]142 busy = []143 timestamp = []144 out_samp=[]145 take_all_samples=False;146 if T == -1:147 take_all_samples=True;148 with open(filename, "rb") as file:149 data = file.read(76)150 i_pos_tmp=0151# while data != "":152 while data:153 i_pos_tmp=i_pos_tmp+1154 y_t = []155 x = []156 t, length = struct.unpack(">BH", data[0:3])157 if t != 1 or length != 73:158 print("only 20MHz supported atm")159 sys.exit(1)160 ### metadata161 max_exp, freq, rssi, noise, max_magnitude, max_index, bitmap_weight, tsf = struct.unpack('>BHbbHBBQ', data[3:20])162 ### measurements163 measurements = array.array("B")164 measurements.fromstring(data[20:])165 squaresum = sum([(m << max_exp)**2 for m in measurements])166 if squaresum == 0:167 data = file.read(76)168 continue169 fft_sub=[]170 for i, m in enumerate(measurements):171 if m == 0 and max_exp == 0:172 m = 1173 v = 10.0**((noise + rssi + 20.0 * np.log10(m << max_exp) - 10.0 * np.log10(squaresum))/10.0)174 fft_sub.append(v)175 entry={}176 entry['tsf']=tsf177 timestamp.append(int(tsf))178 entry['freq']=freq179 entry['rssi']=rssi180 entry['noise']=noise181 entry['fft_sub']=fft_sub182 out_samp.append(entry)183 data = file.read(76)184 if not(take_all_samples):185 if int(timestamp[len(timestamp)-1])-int(timestamp[0]) < T:186 data = file.read(76)187 continue;188 else:189 break190 #with open(out_file, 'w') as file:191 # out_json=json.dumps(out_samp)192 # file.write(out_json)193 return out_samp194 def get_feature(self,busy,timestamp):195 W = 1e3; #usec196 ts = timestamp-timestamp[0]197 t_= ts[0];198 t = ts[1];199 b_curr=[];200 b_mean=[];201 b_var =[];202 for b in range(0,len(busy)):203 if t-t_ < W:204 t = ts[b]205 else:206 t_= ts[b]207 b_mean.append(np.mean(b_curr))208 b_var.append(np.var(b_curr))209 b_curr=[]210 b_curr.append(busy[b])211 return b_mean, b_var212 def rrc_f(self, T=1, beta=0.8):213 out = []214 for f in np.linspace(-1 / (2.0 * T), 1 / (2.0 * T), num=5):215 if abs(f) <= (1 - beta) / float(2.0 * T):216 out.append(1.0)217 else:218 if (1 - beta) / float(2.0 * T) < abs(f) and abs(f) <= (1 + beta) / float(2.0 * T):219 v = 0.5 * (1 + np.cos(np.pi * T / float(beta) * (abs(f) - (1 - beta) / float(2.0 * T))))220 out.append(v)221 else:222 out.append(0)223 return out224 def get_freq_list(self,freq, N=1):225 ff = []226 for i in range(0, 56):227 # if m == 0 and max_exp == 0:228 # m = 1229 if i < 28:230 fr = freq - (20.0 / 64) * (28 - i)231 else:232 fr = freq + (20.0 / 64) * (i - 27)233 ff.append(fr)234 fff = []235 #Overasmpling: unused if N=1236 for f in ff:237 for o in range(0, N):238 fff.append(f + o * (20.0 / 64 / N))239 return fff240 def get_spectrum_scan_features(self,filename="demo.tlv",T=100e3):241 skip = False242 thr_bw = 0.05243 thr_corr = 1e-10244 thr_corr_mean = 1245 P_thr_db = -75246 P_thr = 10 ** (P_thr_db / 10.0)247 dt_thr = 2600248 # OUTPUT FEATURES249 spectrum_features = []250 duration_features = []251 duration_energy_det_features = []252 measurements = self.extract_samples(filename, out_file="not-in-use.json",T=T)253 power_features=[]254 mark_as_failure = False255 csi_data = list(map(itemgetter('fft_sub'), measurements))256 csi_data = np.array(csi_data)257 freq = list(map(itemgetter('freq'), measurements))258 tsf = list(map(itemgetter('tsf'), measurements))259 freq = list(set(freq))260 freq = freq[0]261 ff = self.get_freq_list(freq)262 y = np.array(csi_data[0])263 y_ = y264 y_nofilt_ = y265 tt = tsf[0]266 tt_ = tt;267 PLOT = False268 if PLOT:269 fig = plt.figure();270 plt.ion()271 plt.show()272 start_corr = True273 corr_duration = 0;274 energy_det_duration = 0;275 t_corr_start = 0;276 t_energy_det_start = 0;277 y_cont=[]278 P_av_w=[]279 P_av_=0280 print("====================================");281 p_av_list = np.convolve(np.mean(csi_data, axis=1), np.array([1, 1, 1, 1])[::-1], 'same')282 for ii, cc in enumerate(csi_data):283 if ii==0:284 start_energy_det = True285 finish_energy_det = False286 skip = False287 yy = []288 start_f = []289 stop_f = []290 START_BW = True291 y_pow = []292 bw_meas = []293 freq_meas = []294 y = np.array(cc)295 tt = tsf[ii]296 dt = tt - tt_297 y_nofilt = y298 weights = self.rrc_f()299 #weights = [1, 1, 1]300 y = np.convolve(y, np.array(weights)[::-1], 'same')301 min_max_scaler = preprocessing.MinMaxScaler()302 y_det = min_max_scaler.fit_transform(y.reshape(-1, 1))303 y_det_ = min_max_scaler.fit_transform(y_.reshape(-1, 1))304 y_det_nofilt = min_max_scaler.fit_transform(y_nofilt.reshape(-1, 1))305 y_det_nofilt_ = min_max_scaler.fit_transform(y_nofilt_.reshape(-1, 1))306 y_det = y_det[:, 0]307 y_det_ = y_det_[:, 0]308 y_det_nofilt = y_det_nofilt[:, 0]309 y_det_nofilt_ = y_det_nofilt_[:, 0]310 P_av = np.mean(y)311 P_av_w = p_av_list[ii]312 power_features.append(313 {"tsf_p": tt, "p_av": P_av,"p_av_w": P_av_w})314 P_av=P_av_w315 # if len(P_av_w) >N_av: #window size = 5316 # P_av_w.pop(0) #window step=1317 # ENERGY DETECTION318 # if P_av_ < P_thr and P_av > P_thr and dt < dt_thr:319 # start_energy_det=True;320 # t_energy_det_start = tt321 # else:322 # if P_av_ > P_thr and P_av < P_thr and dt < dt_thr:323 # start_energy_det = False;324 # energy_det_duration = tt - t_energy_det_start325 # duration_energy_det_features.append(326 # {"tsf": t_energy_det_start, "duration": energy_det_duration})327 # else:328 if P_av >= P_thr and start_energy_det:329 t_energy_det_start = tt;330 print("start:{}",t_energy_det_start);331 start_energy_det = False332 mark_as_failure = False;333 if P_av <= P_thr and not start_energy_det:334 t_energy_det_stop = tt_;335 print("stop:{}", t_energy_det_stop);336 finish_energy_det = True337 if P_av >= P_thr and not start_energy_det:338 if ii < len(tsf)-1:339 if tsf[ii+1]-tsf[ii] > dt_thr:340 mark_as_failure=True341 print("[ii={}] failure dt={}".format(ii,tsf[ii+1]-tsf[ii]))342 if P_av <= P_thr and finish_energy_det:343 if not mark_as_failure:344 print(" [ OK ] finish:{}", t_energy_det_stop-t_energy_det_start);345 else:346 print(" [FAIL] finish:{}", t_energy_det_stop - t_energy_det_start);347 finish_energy_det=False348 start_energy_det=True349 if not mark_as_failure:350 # duration_energy_det_features.append(351 # {"tsf": t_energy_det_start, "duration": t_energy_det_stop-t_energy_det_start})352 duration_energy_det_features.append(353 {"tsf": t_energy_det_start, "tsf_stop": t_energy_det_stop, "duration": t_energy_det_stop - t_energy_det_start})354 print(dt)355 # else:356 # if start_energy_det == False:357 # energy_det_duration = tt - t_energy_det_start358 # start_energy_det = True359 # if energy_det_duration != 0:360 # if not mark_as_failure:361 # duration_energy_det_features.append(362 # {"tsf": t_energy_det_start, "duration": energy_det_duration})363 # else:364 # print("FAILED")365 # print({"tsf": t_energy_det_start, "duration": energy_det_duration})366 # mark_as_failure=False;367 # energy_det_duration = 0368 P_av_ = P_av369 #print(duration_energy_det_features)370 yy = y_det371 N_av = 10372 if len(y_cont) >= N_av:373 y_cont.pop(0) # window step=1374 if P_av >= P_thr:375 # collect some consecutive samples and average it!376 y_cont.append(yy)377 yy = np.mean(y_cont, axis=0)378 if len(y_cont) == N_av:379 for i in range(0, len(yy)):380 #print(yy[i])381 if yy[i] > thr_bw and i < len(yy) - 1:382 y_pow.append(yy[i])383 if START_BW:384 # print "START"385 start_f.append(ff[i])386 START_BW = False387 else:388 if not START_BW:389 stop_f.append(ff[i - 1])390 START_BW = True391 for i in range(0, min(len(start_f), len(stop_f))):392 bw_meas.append(stop_f[i] - start_f[i] - 20/64.0)393 freq_meas.append((stop_f[i] + start_f[i]) / 2.0)394 # CORRELATION395 y_corr = np.correlate(y_det, y_det_, "same")396 # y_corr = np.correlate(y_det_nofilt, y_det_nofilt_, "same")397 if dt >= dt_thr:398 start_corr = True399 corr_duration = 0400 else:401 # if np.median(y_corr) <= thr_corr_mean and P_av >= P_thr:402 if np.median(y_corr) <= thr_corr_mean:403 if start_corr:404 t_corr_start = tt;405 start_corr = False406 else:407 corr_duration = tt - t_corr_start408 else:409 if start_corr == False:410 start_corr = True411 if corr_duration != 0:412 duration_features.append({"tsf": t_corr_start, "duration": corr_duration})413 corr_duration = 0414 spectrum_features.append({"tsf": tsf[ii], "bw": bw_meas, "freq": freq_meas})415 else:416 y_cont = []417 # update state variables, ready for next round418 tt_ = tt419 y_ = y420 y_nofilt_ = y_nofilt421 #if P_av <= P_thr:422 # skip = True423 #else:424 return measurements, spectrum_features, duration_energy_det_features, duration_features,freq,power_features425 def plot_waterfall(self,ax,measurements,exp_name):426 csi_data = list(map(itemgetter('fft_sub'), measurements))427 timestamp = list(map(itemgetter('tsf'), measurements))428 freq = list(map(itemgetter('freq'), measurements))429 freq=list(set(freq))430 freq=freq[0]431 timestamp=np.array(timestamp)-timestamp[0]432 T=np.max(timestamp)433 N=len(timestamp)434 y=self.get_freq_list(freq)435 x=timestamp[0:N]436 Z=10.0 * np.log10(csi_data[0:N])437 X,Y = np.meshgrid(y,x)438 cc=ax.pcolormesh(X,Y,Z,vmin=-140, vmax=-20)439 fmin=min(y)440 fmax=max(y)441 ax.set_xlim([fmin,fmax])442 ax.set_ylim([0,T])443 ax.set_ylabel('Time [us]')444 ax.set_xlabel('freq [MHz]')445 ax.set_title(exp_name)446 return ax447 def get_freq_list(self,freq, N=1):448 ff = []449 for i in range(0, 56):450 # if m == 0 and max_exp == 0:451 # m = 1452 if i < 28:453 fr = freq - (20.0 / 64) * (28 - i)454 else:455 fr = freq + (20.0 / 64) * (i - 27)456 ff.append(fr)457 fff = []458 for f in ff:459 for o in range(0, N):460 fff.append(f + o * (20.0 / 64 / N))461 return fff...
test_database.py
Source:test_database.py
...30 tid3 = uuid()31 try:32 raise KeyError('foo')33 except KeyError as exception:34 self.b.mark_as_failure(tid3, exception)35 assert self.b.get_status(tid3) == states.FAILURE36 assert isinstance(self.b.get_result(tid3), KeyError)37 def xxx_backend(self):38 tid = uuid()39 assert self.b.get_status(tid) == states.PENDING40 assert self.b.get_result(tid) is None41 self.b.mark_as_done(tid, 42)42 assert self.b.get_status(tid) == states.SUCCESS43 assert self.b.get_result(tid) == 4244 tid2 = uuid()45 try:46 raise KeyError('foo')47 except KeyError as exception:48 self.b.mark_as_failure(tid2, exception)49 assert self.b.get_status(tid2) == states.FAILURE50 assert isinstance(self.b.get_result(tid2), KeyError)51 def test_forget(self):52 tid = uuid()53 self.b.mark_as_done(tid, {'foo': 'bar'})54 x = self.app.AsyncResult(tid)55 assert x.result.get('foo') == 'bar'56 x.forget()57 if celery.VERSION[0:3] == (3, 1, 10):58 # bug in 3.1.10 means result did not clear cache after forget.59 x._cache = None...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!