Best Python code snippet using ATX
power_data_io.py
Source:power_data_io.py
1import time2import numpy as np3import transpose4# File format parameters5HEADER_LEN = 12 # Bytes: index, loops, n_FFT_int6FRAME_HEADER_LEN = 8 # Bytes: FGPA counts, Unix_time7DTYPE = np.dtype(np.uint32)8# Acquisition parameters9NTIME_RECORD = 1024 # Post integration, post-scrunch. Arbitrary, doesn't matter10FFT_RATE = 800e6 / 1024 / 2 # Hz11NFREQ = 102412FREQ0 = 800.13DELTA_F = -400. / 102414NPOL = 215POL_WEIGHTS = [0., 1.]16CAL_PERIOD_SAMPLES = 017class Ring(object):18 def __init__(self, filename, scrunch=1):19 self._filename = filename20 self._scrunch = scrunch21 with open(filename, 'r') as f:22 s = f.read(HEADER_LEN)23 f.seek(0, 2)24 size = f.tell()25 ring_bytes = size - HEADER_LEN26 integ_bytes = (FRAME_HEADER_LEN + NFREQ * NPOL * DTYPE.itemsize)27 self._ninteg_ring = ring_bytes // integ_bytes28 if ring_bytes % integ_bytes:29 raise RuntimeError('Odd filesize.')30 header = np.fromstring(s, DTYPE)31 self._sample_time = header[2] / FFT_RATE32 self._delta_t = self._sample_time * scrunch33 # The record we will call "0".34 # XXX35 self._record_offset = ((header[0] + int((header[1] - 0.4) * self._ninteg_ring))36 // (scrunch * NTIME_RECORD))37 #self._record_offset = ((header[0] + header[1] * self._ninteg_ring)38 # // (scrunch * NTIME_RECORD))39 def current_records(self):40 ninteg_ring = self._ninteg_ring41 with open(self._filename, 'r') as f:42 s = f.read(HEADER_LEN)43 header = np.fromstring(s, DTYPE)44 index = header[0]45 loops = header[1]46 last_record = ((index + loops * ninteg_ring)47 // (self._scrunch * NTIME_RECORD) + 1)48 last_record -= self._record_offset49 nrecord_ring = ninteg_ring // (self._scrunch * NTIME_RECORD)50 # The latest 80% of the ring is availble, for safety.51 first_available_record = last_record - int(nrecord_ring * 0.8)52 return first_available_record, last_record53 def get_parameters(self):54 parameters = {}55 parameters['cal_period_samples'] = CAL_PERIOD_SAMPLES56 parameters['delta_t'] = self._delta_t57 parameters['nfreq'] = NFREQ58 parameters['freq0'] = FREQ059 parameters['delta_f'] = DELTA_F60 parameters['ntime_record'] = NTIME_RECORD61 parameters['nrecords'] = self.current_records()[1]62 return parameters63 def read_records(self, start_record=None, end_record=None):64 first_available_record, last_record = self.current_records()65 if start_record is None:66 start_record = first_available_record67 if end_record is None:68 end_record = last_record69 if start_record < first_available_record:70 raise DataGone()71 if end_record > last_record:72 end_record = last_record73 if start_record >= last_record:74 raise ValueError("No data to read.")75 scrunch = self._scrunch76 out = np.empty((NFREQ, (end_record - start_record), NTIME_RECORD),77 dtype=np.float32)78 time = np.empty(((end_record - start_record), NTIME_RECORD),79 dtype=np.float32)80 integ_bytes_data = NPOL * NFREQ * DTYPE.itemsize81 integ_bytes = integ_bytes_data + FRAME_HEADER_LEN82 with open(self._filename, 'r') as f:83 for ii in range(end_record - start_record):84 abs_record_ind = ii + start_record + self._record_offset85 record_data = np.zeros((NTIME_RECORD, NFREQ), dtype=np.float32)86 for jj in range(NTIME_RECORD):87 integ_ind_jj = (abs_record_ind * NTIME_RECORD + jj) * scrunch88 for kk in range(self._scrunch):89 integ_ind = (integ_ind_jj + kk) % self._ninteg_ring90 #f.seek(HEADER_LEN + integ_ind * integ_bytes91 # + FRAME_HEADER_LEN)92 f.seek(HEADER_LEN + integ_ind * integ_bytes)93 s = f.read(FRAME_HEADER_LEN)94 header = np.fromstring(s, dtype=DTYPE)95 if kk == 0:96 time[ii,jj] = header[1]97 s = f.read(integ_bytes_data)98 integ = np.fromstring(s, dtype=DTYPE)99 integ.shape = (NPOL, NFREQ)100 for pp in range(NPOL):101 record_data[jj, :] += POL_WEIGHTS[pp] * integ[pp]102 out[:,ii,:] = transpose.blocked(record_data)103 out.shape = (NFREQ, (end_record - start_record) * NTIME_RECORD)104 time.shape = ((end_record - start_record) * NTIME_RECORD,)105 return out, time106class DataGone(Exception):107 pass108MOCK_N_SAMP_INTEG = 512109MOCK_RING_LEN = 200110MOCK_START_TIME = time.time() - MOCK_RING_LEN111class MockRing(object):112 def __init__(self, filename, scrunch):113 self._scrunch = scrunch114 self._delta_t = MOCK_N_SAMP_INTEG * self._scrunch / FFT_RATE115 def get_parameters(self):116 parameters = {}117 parameters['cal_period_samples'] = CAL_PERIOD_SAMPLES118 parameters['delta_t'] = self._delta_t119 parameters['nfreq'] = NFREQ120 parameters['freq0'] = FREQ0121 parameters['delta_f'] = DELTA_F122 parameters['ntime_record'] = NTIME_RECORD123 parameters['nrecords'] = self.get_nrecords()124 return parameters125 def read_records(self, start_record=None, end_record=None):126 """Right now just generates fake data."""127 128 first_available_record, last_record = self.current_records()129 if start_record is None:130 start_record = first_available_record131 if end_record is None:132 end_record = last_record133 if start_record < first_available_record:134 raise DataGone()135 if end_record > last_record:136 end_record = last_record137 if start_record >= last_record:138 raise ValueError("No data to read.")139 nrecords = end_record - start_record140 ntime = nrecords * NTIME_RECORD141 out = np.empty((NFREQ, nrecords, NTIME_RECORD), dtype=np.float32)142 # The fake part.143 from numpy import random144 # Every record is the same!145 noise = random.randn(NTIME_RECORD, 2, NFREQ)146 record_data = (noise + 32) * 10147 record_data = record_data.astype(np.uint32)148 for ii in range(nrecords):149 out[:,ii,:] = transpose.blocked(record_data[:,0,:] + record_data[:,1,:])150 t0 = time.time() - (self._delta_t * NTIME_RECORD * nrecords)151 time = t0 + np.arange(nrecords) * self._delta_t152 out.shape = (NFREQ, nrecords * NTIME_RECORD)153 return out, time154 def get_nrecords(self):155 """Totally fake."""156 last = int((time.time() - MOCK_START_TIME) / (self._delta_t * NTIME_RECORD))157 #print last158 return last159 def current_records(self):160 last = self.get_nrecords()161 first = last - int(MOCK_RING_LEN / self._delta_t / NTIME_RECORD)...
gather_ieee.py
Source:gather_ieee.py
1# import xplore23# query = xplore.xploreapi.XPLORE('cmvp85nujeda5s27qhyeze28')4# query.abstractText('SOA')5# data = query.callAPI() 6# print(data)7# cmvp85nujeda5s27qhyeze288import requests 9import datetime10import json1112## file name setting13dt = datetime.datetime.now()14timestamp = dt.timestamp()15filename = "ieee " + str(hex(int(timestamp)))16file_ext = ".json"1718## IEEE Explore API19url = "https://ieeexploreapi.ieee.org/api/v1/search/articles?parameter&apikey=cmvp85nujeda5s27qhyeze28"20## search parameter21querytext = "(SOA or ""service oriented architecture"")"22resp_format = "json"23## sorting and paging24max_records = 20025sort_field = "article_number"26sort_order = "asc"27start_record = 40012829params = {'querytext': querytext, 'format':resp_format, 'max_records':max_records, 'sort_field':sort_field, 'sort_order':sort_order, 'start_record':start_record}3031request_cnt = 2032for i in range(0, request_cnt):33 resp = requests.get(url, params=params, verify=False)34 params = {'querytext': querytext, 'format':resp_format, 'max_records':max_records, 'sort_field':sort_field, 'sort_order':sort_order, 'start_record':start_record} 35 print("params is " + str(params))36 # print("request count is " + str(i) + ", Status Code is " + str(resp.status_code) + ", and start record is " + str(start_record))37 start_record = start_record + max_records38 with open(filename + "-" + str(start_record) + file_ext, "a") as json_file :39 json_obj = resp.json()40 json.dump(json_obj["articles"], json_file)41 # json_file.write(", ")42 # json.dump(resp.json(), json_file) 43
...
measuring-station.py
Source:measuring-station.py
1class ADC:2 def start_record(self):3 pass4class ADC1(ADC):5 def start_record(self):6 print('very complex implementation')7class ADC2(ADC):8 def start_record(self):9 print('very very complex implementation!')10class measuring_station:11 def start_record(self):12 pass13 def another_functions(self):14 pass15class ecg_measuring_station:16 def __init__(self, adc):17 self.adc = adc18 def start_record(self, some_args):19 self.adc.start_record(some_args)20if __name__ == "__main__":21 ecg1 = ecg_measuring_station(ADC1)22 ecg1.start_record(None)23 ecg2 = ecg_measuring_station(ADC2)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!