How to use start_record method in ATX

Best Python code snippet using ATX

power_data_io.py

Source: power_data_io.py Github

copy

Full Screen

1import time2import numpy as np3import transpose4# File format parameters5HEADER_LEN = 12 # Bytes: index, loops, n_FFT_int6FRAME_HEADER_LEN = 8 # Bytes: FGPA counts, Unix_time7DTYPE = np.dtype(np.uint32)8# Acquisition parameters9NTIME_RECORD = 1024 # Post integration, post-scrunch. Arbitrary, doesn't matter10FFT_RATE = 800e6 /​ 1024 /​ 2 # Hz11NFREQ = 102412FREQ0 = 800.13DELTA_F = -400. /​ 102414NPOL = 215POL_WEIGHTS = [0., 1.]16CAL_PERIOD_SAMPLES = 017class Ring(object):18 def __init__(self, filename, scrunch=1):19 self._filename = filename20 self._scrunch = scrunch21 with open(filename, 'r') as f:22 s = f.read(HEADER_LEN)23 f.seek(0, 2)24 size = f.tell()25 ring_bytes = size - HEADER_LEN26 integ_bytes = (FRAME_HEADER_LEN + NFREQ * NPOL * DTYPE.itemsize)27 self._ninteg_ring = ring_bytes /​/​ integ_bytes28 if ring_bytes % integ_bytes:29 raise RuntimeError('Odd filesize.')30 header = np.fromstring(s, DTYPE)31 self._sample_time = header[2] /​ FFT_RATE32 self._delta_t = self._sample_time * scrunch33 # The record we will call "0".34 # XXX35 self._record_offset = ((header[0] + int((header[1] - 0.4) * self._ninteg_ring))36 /​/​ (scrunch * NTIME_RECORD))37 #self._record_offset = ((header[0] + header[1] * self._ninteg_ring)38 # /​/​ (scrunch * NTIME_RECORD))39 def current_records(self):40 ninteg_ring = self._ninteg_ring41 with open(self._filename, 'r') as f:42 s = f.read(HEADER_LEN)43 header = np.fromstring(s, DTYPE)44 index = header[0]45 loops = header[1]46 last_record = ((index + loops * ninteg_ring)47 /​/​ (self._scrunch * NTIME_RECORD) + 1)48 last_record -= self._record_offset49 nrecord_ring = ninteg_ring /​/​ (self._scrunch * NTIME_RECORD)50 # The latest 80% of the ring is availble, for safety.51 first_available_record = last_record - int(nrecord_ring * 0.8)52 return first_available_record, last_record53 def get_parameters(self):54 parameters = {}55 parameters['cal_period_samples'] = CAL_PERIOD_SAMPLES56 parameters['delta_t'] = self._delta_t57 parameters['nfreq'] = NFREQ58 parameters['freq0'] = FREQ059 parameters['delta_f'] = DELTA_F60 parameters['ntime_record'] = NTIME_RECORD61 parameters['nrecords'] = self.current_records()[1]62 return parameters63 def read_records(self, start_record=None, end_record=None):64 first_available_record, last_record = self.current_records()65 if start_record is None:66 start_record = first_available_record67 if end_record is None:68 end_record = last_record69 if start_record < first_available_record:70 raise DataGone()71 if end_record > last_record:72 end_record = last_record73 if start_record >= last_record:74 raise ValueError("No data to read.")75 scrunch = self._scrunch76 out = np.empty((NFREQ, (end_record - start_record), NTIME_RECORD),77 dtype=np.float32)78 time = np.empty(((end_record - start_record), NTIME_RECORD),79 dtype=np.float32)80 integ_bytes_data = NPOL * NFREQ * DTYPE.itemsize81 integ_bytes = integ_bytes_data + FRAME_HEADER_LEN82 with open(self._filename, 'r') as f:83 for ii in range(end_record - start_record):84 abs_record_ind = ii + start_record + self._record_offset85 record_data = np.zeros((NTIME_RECORD, NFREQ), dtype=np.float32)86 for jj in range(NTIME_RECORD):87 integ_ind_jj = (abs_record_ind * NTIME_RECORD + jj) * scrunch88 for kk in range(self._scrunch):89 integ_ind = (integ_ind_jj + kk) % self._ninteg_ring90 #f.seek(HEADER_LEN + integ_ind * integ_bytes91 # + FRAME_HEADER_LEN)92 f.seek(HEADER_LEN + integ_ind * integ_bytes)93 s = f.read(FRAME_HEADER_LEN)94 header = np.fromstring(s, dtype=DTYPE)95 if kk == 0:96 time[ii,jj] = header[1]97 s = f.read(integ_bytes_data)98 integ = np.fromstring(s, dtype=DTYPE)99 integ.shape = (NPOL, NFREQ)100 for pp in range(NPOL):101 record_data[jj, :] += POL_WEIGHTS[pp] * integ[pp]102 out[:,ii,:] = transpose.blocked(record_data)103 out.shape = (NFREQ, (end_record - start_record) * NTIME_RECORD)104 time.shape = ((end_record - start_record) * NTIME_RECORD,)105 return out, time106class DataGone(Exception):107 pass108MOCK_N_SAMP_INTEG = 512109MOCK_RING_LEN = 200110MOCK_START_TIME = time.time() - MOCK_RING_LEN111class MockRing(object):112 def __init__(self, filename, scrunch):113 self._scrunch = scrunch114 self._delta_t = MOCK_N_SAMP_INTEG * self._scrunch /​ FFT_RATE115 def get_parameters(self):116 parameters = {}117 parameters['cal_period_samples'] = CAL_PERIOD_SAMPLES118 parameters['delta_t'] = self._delta_t119 parameters['nfreq'] = NFREQ120 parameters['freq0'] = FREQ0121 parameters['delta_f'] = DELTA_F122 parameters['ntime_record'] = NTIME_RECORD123 parameters['nrecords'] = self.get_nrecords()124 return parameters125 def read_records(self, start_record=None, end_record=None):126 """Right now just generates fake data."""127 128 first_available_record, last_record = self.current_records()129 if start_record is None:130 start_record = first_available_record131 if end_record is None:132 end_record = last_record133 if start_record < first_available_record:134 raise DataGone()135 if end_record > last_record:136 end_record = last_record137 if start_record >= last_record:138 raise ValueError("No data to read.")139 nrecords = end_record - start_record140 ntime = nrecords * NTIME_RECORD141 out = np.empty((NFREQ, nrecords, NTIME_RECORD), dtype=np.float32)142 # The fake part.143 from numpy import random144 # Every record is the same!145 noise = random.randn(NTIME_RECORD, 2, NFREQ)146 record_data = (noise + 32) * 10147 record_data = record_data.astype(np.uint32)148 for ii in range(nrecords):149 out[:,ii,:] = transpose.blocked(record_data[:,0,:] + record_data[:,1,:])150 t0 = time.time() - (self._delta_t * NTIME_RECORD * nrecords)151 time = t0 + np.arange(nrecords) * self._delta_t152 out.shape = (NFREQ, nrecords * NTIME_RECORD)153 return out, time154 def get_nrecords(self):155 """Totally fake."""156 last = int((time.time() - MOCK_START_TIME) /​ (self._delta_t * NTIME_RECORD))157 #print last158 return last159 def current_records(self):160 last = self.get_nrecords()161 first = last - int(MOCK_RING_LEN /​ self._delta_t /​ NTIME_RECORD)...

Full Screen

Full Screen

gather_ieee.py

Source: gather_ieee.py Github

copy

Full Screen

1# import xplore23# query = xplore.xploreapi.XPLORE('cmvp85nujeda5s27qhyeze28')4# query.abstractText('SOA')5# data = query.callAPI() 6# print(data)7# cmvp85nujeda5s27qhyeze288import requests 9import datetime10import json1112## file name setting13dt = datetime.datetime.now()14timestamp = dt.timestamp()15filename = "ieee " + str(hex(int(timestamp)))16file_ext = ".json"1718## IEEE Explore API19url = "https:/​/​ieeexploreapi.ieee.org/​api/​v1/​search/​articles?parameter&apikey=cmvp85nujeda5s27qhyeze28"20## search parameter21querytext = "(SOA or ""service oriented architecture"")"22resp_format = "json"23## sorting and paging24max_records = 20025sort_field = "article_number"26sort_order = "asc"27start_record = 40012829params = {'querytext': querytext, 'format':resp_format, 'max_records':max_records, 'sort_field':sort_field, 'sort_order':sort_order, 'start_record':start_record}3031request_cnt = 2032for i in range(0, request_cnt):33 resp = requests.get(url, params=params, verify=False)34 params = {'querytext': querytext, 'format':resp_format, 'max_records':max_records, 'sort_field':sort_field, 'sort_order':sort_order, 'start_record':start_record} 35 print("params is " + str(params))36 # print("request count is " + str(i) + ", Status Code is " + str(resp.status_code) + ", and start record is " + str(start_record))37 start_record = start_record + max_records38 with open(filename + "-" + str(start_record) + file_ext, "a") as json_file :39 json_obj = resp.json()40 json.dump(json_obj["articles"], json_file)41 # json_file.write(", ")42 # json.dump(resp.json(), json_file) 43 ...

Full Screen

Full Screen

measuring-station.py

Source: measuring-station.py Github

copy

Full Screen

1class ADC:2 def start_record(self):3 pass4class ADC1(ADC):5 def start_record(self):6 print('very complex implementation')7class ADC2(ADC):8 def start_record(self):9 print('very very complex implementation!')10class measuring_station:11 def start_record(self):12 pass13 def another_functions(self):14 pass15class ecg_measuring_station:16 def __init__(self, adc):17 self.adc = adc18 def start_record(self, some_args):19 self.adc.start_record(some_args)20if __name__ == "__main__":21 ecg1 = ecg_measuring_station(ADC1)22 ecg1.start_record(None)23 ecg2 = ecg_measuring_station(ADC2)...

Full Screen

Full Screen

Blogs

Check out the latest blogs from LambdaTest on this topic:

Putting Together a Testing Team

As part of one of my consulting efforts, I worked with a mid-sized company that was looking to move toward a more agile manner of developing software. As with any shift in work style, there is some bewilderment and, for some, considerable anxiety. People are being challenged to leave their comfort zones and embrace a continuously changing, dynamic working environment. And, dare I say it, testing may be the most ‘disturbed’ of the software roles in agile development.

QA Innovation &#8211; Using the senseshaping concept to discover customer needs

QA Innovation - Using the senseshaping concept to discover customer needsQA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.

What is Selenium Grid &#038; Advantages of Selenium Grid

Manual cross browser testing is neither efficient nor scalable as it will take ages to test on all permutations & combinations of browsers, operating systems, and their versions. Like every developer, I have also gone through that ‘I can do it all phase’. But if you are stuck validating your code changes over hundreds of browsers and OS combinations then your release window is going to look even shorter than it already is. This is why automated browser testing can be pivotal for modern-day release cycles as it speeds up the entire process of cross browser compatibility.

QA&#8217;s and Unit Testing &#8211; Can QA Create Effective Unit Tests

Unit testing is typically software testing within the developer domain. As the QA role expands in DevOps, QAOps, DesignOps, or within an Agile team, QA testers often find themselves creating unit tests. QA testers may create unit tests within the code using a specified unit testing tool, or independently using a variety of methods.

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful