Best Python code snippet using pytest-cov
Instruments.py
Source:Instruments.py
1#-----------------------------------------------------------------------------2# Name: Instruments.py3# Purpose: To deal with controlling instruments4# Author: Aric Sanders5# Created: 2016/06/236#-----------------------------------------------------------------------------7""" The Module Instruments Contains Classes and functions to control 8instruments; GPIB,RS232 and other visa instruments. Instrument control classes are9wrappers around the pyvisa instrument class with static xml based metadata added in. In addition,10instruments have an emulation_mode that allows for the tracking of commands when not connected to a11viable communications bus.12Examples13--------14 #!python15 >> from pyMez import *16 >> vna=VNA("GPIB::16")17 >> vna.initialize()18 >> s2p=vna.measure_sparameters()19 >> s2p.show()20<a href="../../../Examples/html/VNA_Measurement_Example_WR15.html">VNA Measurement Examples</a>21Help22---------------23<a href="./index.html">`pyMez.Code.InstrumentControl`</a>24<div>25<a href="../../../pyMez_Documentation.html">Documentation Home</a> |26<a href="../../index.html">API Documentation Home</a> |27<a href="../../../Examples/html/Examples_Home.html">Examples Home</a> |28<a href="../../../Reference_Index.html">Index</a>29</div>"""30# TODO:Fix Save State, and importing from DataHandlers31#-------------------------------------------------------------------------------32# Standard Imports-- All in the python standard library33import os34import re35from types import *36from ctypes import *37import datetime,time38import sys39#-------------------------------------------------------------------------------40# Third Party Imports41sys.path.append(os.path.join(os.path.dirname( __file__ ), '..','..'))42try: 43 from PIL import Image44 PIL_AVAILABLE=145except:46 print('PIL is required for some camera operations')47 PIL_AVAILABLE=048try:49 import visa,pyvisa50except:51 print("To control comm and gpib instruments this module requires the package PyVisa")52 print(" Please download it at http://pyvisa.sourceforge.net/ ")53 print(" Or add it to the Python Path")54 pass 55try:56 #raise57 import Code.DataHandlers.XMLModels58 InstrumentSheet=Code.DataHandlers.XMLModels.InstrumentSheet59 InstrumentState=Code.DataHandlers.XMLModels.InstrumentState60 DATA_SHEETS=161 #print dir(pyMez)62except:63 # If the import of DataHandlers Does not work 64 class InstrumentSheet():pass65 DATA_SHEETS=066 print("Can't Find MySelf")67 pass68try:69 from Code.Utils.Alias import *70 METHOD_ALIASES=171except:72 METHOD_ALIASES=073 pass 74try:75 from Code.Utils.Names import *76except:77 print("Could not load pyMez.Code.Utils.Names")78 pass79try:80 from Code.DataHandlers.TouchstoneModels import *81except:82 print("Could not load Code.DataHandlers.TouchstoneModels")83 pass84try:85 from Code.DataHandlers.NISTModels import *86except:87 print("Could not load Code.DataHandlers.NISTModels")88 pass89try:90 import numpy as np91except:92 print("Could not load numpy")93 pass94#-------------------------------------------------------------------------------95# Module Constants96ACTIVE_COMPONENTS=[PIL_AVAILABLE,DATA_SHEETS,METHOD_ALIASES]97INSTRUMENT_TYPES=['GPIB','COMM','OCEAN_OPTICS','MIGHTEX','LABJACK']98INSTRUMENTS_DEFINED=[]99#TODO Make PYMEASURE_ROOT be read from the settings folder100PYMEASURE_ROOT=os.path.join(os.path.dirname( __file__ ), '..','..')101VNA_FREQUENCY_UNIT_MULTIPLIERS={"Hz":1.,"kHz":10.**3,"MHz":10.**6,"GHz":10.**9,"THz":10.**12}102[EMULATION_S2P,EMULATION_S1P,EMULATION_W1P,EMULATION_W2P,EMULATION_SWITCH_TERMS]=[None,None,None,None,None]103try:104 EMULATION_S2P=S2PV1(os.path.join(TESTS_DIRECTORY,"704b.S2P"))105 EMULATION_S1P=S1PV1(os.path.join(TESTS_DIRECTORY,"Power_Meter.s1p"))106 EMULATION_W1P=W1P(os.path.join(TESTS_DIRECTORY,"Line_4909_WR15_Wave_Parameters_Port2_20180313_001.w1p"))107 EMULATION_W2P=W2P(os.path.join(TESTS_DIRECTORY,"Line_5079_WR15_Wave_Parameters_20180313_001.w2p"))108 EMULATION_SWITCH_TERMS=S2PV1(os.path.join(TESTS_DIRECTORY,"GTrue_Thru_WR15_Switch_Terms_20180313_001.s2p"))109 EMULATION_FILES_PRESENT=True110except:111 EMULATION_FILES_PRESENT=False112 print("Emulation files were not present, the emulation decorator will only return None or the original method")113#-------------------------------------------------------------------------------114# Module Functions115def emulation_data(data=None):116 """emulation data is a method decorator that returns a set of emulation data if the instrument mode is117 self.emulation_mode=True.118 For example just add @emulation_data(data_to_return) before an instrument method. This decorator also119 is conditional that the emulated data can be found. If there is a problem then it only returns the method120 undecorated"""121 def method_decorator(method):122 def return_data(self,*args,**kwargs):123 if self.emulation_mode and not data==None:124 return data125 elif data==None:126 print("No data was present to return in emulation_mode, returning None instead.")127 return None128 else:129 return method(self,*args,**kwargs)130 return return_data131 return method_decorator132def whos_there():133 """Whos_there is a function that prints the idn string for all134 GPIB instruments connected"""135 resource_manager = visa.ResourceManager()136 resource_list=resource_manager.list_resources()137 gpib_resources=[]138 gpib_idn_dictionary={}139 for instrument in resource_list:140 if re.search("GPIB|USB",instrument,re.IGNORECASE):141 try:142 resource=resource_manager.open_resource(instrument)143 gpib_resources.append(resource)144 idn=resource.query("*IDN?")145 gpib_idn_dictionary[instrument]=idn146 except:147 print("{0} did not respond to idn query".format(instrument))148 if gpib_resources:149 for instrument_name,idn in gpib_idn_dictionary.items():150 print(("{0} is at address {1}".format(idn,instrument_name)))151 #return gpib_idn_dictionary152 else:153 print("There are no GPIB resources available")154def determine_instrument_type_from_string(string):155 """ Given a string returns the instrument type"""156 if type(string) in StringTypes:157 # Start with the easy ones158 for instrument_type in INSTRUMENT_TYPES:159 match= re.compile(instrument_type,re.IGNORECASE)160 if re.search(match,string):161 return instrument_type162 # Now read in all the Instrument sheets and look for a match163 # Returning the Name in the Instrument_Type Tag164 instrument_folder=os.path.join(PYMEASURE_ROOT,'Instruments')165 for instrument_sheet in os.listdir(instrument_folder):166 path=os.path.join(PYMEASURE_ROOT,'Instruments',instrument_sheet)167 if os.path.isfile(path):168 f=open(path,'r')169 text=f.read()170 if re.search(string,text):171 tag_match=re.search(172 '<Instrument_Type>(?P<instrument_type>\w+)</Instrument_Type>',173 text)174 try:175 return tag_match.group('instrument_type')176 except:pass177 else:178 return None179def determine_instrument_type(object):180 """Tries to return an instrument type given an address, name, serial #181 or class instance"""182 # attributes that normally have the type hidden in there183 # should be in the order of likelyhood 184 attritbute_names=['instrument_type','address','serial','Id'] 185 # Check to see if it is a string and then go through the possibilities186 if type(object) in StringTypes:187 return determine_instrument_type_from_string(object)188 # If it is a object or class look around in normal names looking for a string189 # to process190 elif isinstance(object, InstanceType) or ClassType:191 for attribute in attritbute_names:192 try:193 if attribute in dir(object):194 string=eval('object.%s'%attribute)195 answer=determine_instrument_type_from_string(string)196 if answer is None:pass197 else: return answer 198 except AttributeError: 199 try:200 string=object.__class__.__name__201 return determine_instrument_type_from_string(string)202 except: pass 203 204def find_description(identifier,output='path',directory=None):205 """ Finds an instrument description in pyMez/Instruments given an identifier, 206 outputs a path or the file. Right now this outputs the first sheet that matches the identifier"""207 if isinstance(identifier,str):208 # Now read in all the Instrument sheets and look for a match209 if directory is None:210 instrument_folder=os.path.join(PYMEASURE_ROOT,'Instruments')211 else:212 instrument_folder=directory213 for instrument_sheet in os.listdir(instrument_folder):214 path=os.path.join(PYMEASURE_ROOT,'Instruments',instrument_sheet)215 if os.path.isfile(path):216 f=open(path,'r')217 text=f.read()218 if re.search(identifier,text):219 path_out=re.compile('name|path',re.IGNORECASE)220 file_contents=re.compile('file|xml|node|contents',re.IGNORECASE)221 if re.search(path_out,output):222 return path223 elif re.search(file_contents,output):224 return text225 else:226 return None227def fix_segment_table(segment_table):228 """Given a list of dictionaries in the form [{"start":start_frequency,229 "stop":stop_frequency,"number_points":number_points,"step":frequency_step}...] returns a table that is ordered by start230 frequency and has no overlapping points"""231 segment_table = sorted(segment_table, key=lambda x: x["start"])232 i = 0233 while (i + 1 < len(segment_table)):234 if segment_table[i]["stop"] == segment_table[i + 1]["start"]:235 segment_table[i + 1]["start"] = segment_table[i + 1]["start"] + segment_table[i + 1]["step"]236 segment_table[i + 1]["number_points"] -= 1237 i += 1238 return segment_table239#-------------------------------------------------------------------------------240# Class Definitions241class VisaInstrumentError(Exception):242 def __init__(self,*args):243 Exception.__init__(self,*args)244class EmulationInstrument(InstrumentSheet):245 """ General Class to communicate with COMM and GPIB instruments246 This is a blend of the pyvisa resource and an xml description. """247 def __init__(self, resource_name=None, **options):248 """ Intializes the VisaInstrument Class"""249 defaults = {"state_directory": os.getcwd(),250 "instrument_description_directory": os.path.join(PYMEASURE_ROOT, 'Instruments')}251 self.options = {}252 for key, value in defaults.items():253 self.options[key] = value254 for key, value in options.items():255 self.options[key] = value256 # First we try to look up the description and get info from it257 if DATA_SHEETS:258 try:259 self.info_path = find_description(resource_name,260 directory=self.options["instrument_description_directory"])261 InstrumentSheet.__init__(self, self.info_path, **self.options)262 self.info_found = True263 self.DEFAULT_STATE_QUERY_DICTIONARY = self.get_query_dictionary()264 except:265 print('The information sheet was not found defaulting to address')266 self.DEFAULT_STATE_QUERY_DICTIONARY = {}267 self.info_found = False268 self.instrument_address = resource_name269 self.name = resource_name.replace(":", "_")270 pass271 else:272 self.info_found = False273 self.DEFAULT_STATE_QUERY_DICTIONARY = {}274 self.instrument_address = resource_name275 # Create a description for state saving276 if self.info_found:277 self.description = {'Instrument_Description': self.path}278 else:279 self.description = {'Instrument_Description': self.instrument_address}280 self.state_buffer = []281 self.STATE_BUFFER_MAX_LENGTH = 10282 self.write_buffer=[]283 self.read_buffer=[]284 self.history=[]285 #self.resource_manager = visa.ResourceManager()286 # Call the visa instrument class-- this gives ask,write,read287 #self.resource = self.resource_manager.open_resource(self.instrument_address)288 self.current_state = self.get_state()289 def write(self, command):290 "Writes command to instrument"291 now=datetime.datetime.utcnow().isoformat()292 self.write_buffer.append(command)293 self.history.append({"Timestamp":now,"Action":"self.write",294 "Argument":command,"Response":None})295 def read(self):296 "Reads from the instrument"297 now=datetime.datetime.utcnow().isoformat()298 out="Buffer Read at {0}".format(now)299 self.read_buffer.append(out)300 time.sleep(.001)301 self.history.append({"Timestamp":now,"Action":"self.read",302 "Argument":None,"Response":out})303 return out304 def query(self, command):305 "Writes command and then reads a response"306 self.write(command)307 return self.read()308 def ask(self, command):309 "Writes command and then reads a response"310 return self.query(command)311 def set_state(self, state_dictionary=None, state_table=None):312 """ Sets the instrument to the state specified by Command:Value pairs"""313 if state_dictionary:314 if len(self.state_buffer) + 1 < self.STATE_BUFFER_MAX_LENGTH:315 self.state_buffer.append(self.get_state())316 else:317 self.state_buffer.pop(1)318 self.state_buffer.insert(-1, self.get_state())319 for state_command, value in state_dictionary.items():320 self.write(state_command + ' ' + str(value))321 self.current_state = self.get_state()322 if state_table:323 if "Index" in list(state_table[0].keys()):324 state_table = sorted(state_table, key=lambda x: x["Index"])325 if len(self.state_buffer) + 1 < self.STATE_BUFFER_MAX_LENGTH:326 self.state_buffer.append(self.get_state())327 else:328 self.state_buffer.pop(1)329 self.state_buffer.insert(-1, self.get_state())330 # now we need to write the command331 for state_row in state_table:332 # a state row has a set and value333 state_command = state_row["Set"]334 value = state_row["Value"]335 self.write(state_command + ' ' + str(value))336 def get_state(self, state_query_dictionary=None, state_query_table=None):337 """ Gets the current state of the instrument. get_state accepts any query dictionary in338 the form state_query_dictionary={"GPIB_SET_COMMAND":"GPIB_QUERY_COMMAND",...} or any state_query_table339 in the form [{"Set":"GPIB_SET_COMMAND","Query":"GPIB_QUERY_COMMAND","Index":Optional_int_ordering commands,340 if no state is provided it returns the DEFAULT_STATE_QUERY_DICTIONARY as read in from the InstrumentSheet"""341 if not state_query_table:342 if state_query_dictionary is None or len(state_query_dictionary) == 0:343 state_query_dictionary = self.DEFAULT_STATE_QUERY_DICTIONARY344 state = dict([(state_command, self.query(str(query)).replace("\n", "")) for state_command, query345 in state_query_dictionary.items()])346 return state347 else:348 # a state_query_table is a list of dictionaries, each row has at least a Set and Query key but could349 # have an Index key that denotes order350 if "Index" in list(state_query_table[0].keys()):351 state_query_table = sorted(state_query_table, key=lambda x: int(x["Index"]))352 state = []353 for state_row in state_query_table:354 set = state_row["Set"]355 query = state_row["Query"]356 index = state_row["Index"]357 state.append({"Set": set, "Value": self.query(query).replace("\n", ""), "Index": index})358 return state359 else:360 state = []361 for state_row in state_query_table:362 set = state_row["Set"]363 query = state_row["Query"]364 state.append({"Set": set, "Value": self.query(query).replace("\n", "")})365 return state366 def update_current_state(self):367 self.current_state = self.get_state()368 def save_current_state(self):369 """ Saves the state in self.current_state attribute """370 self.current_state = self.get_state()371 self.save_state(None, state_dictionary=self.current_state)372 def save_state(self, state_path=None, state_dictionary=None, state_table=None):373 """ Saves any state dictionary to an xml file, with state_path, if not specified defaults to autonamed state374 """375 if state_path is None:376 state_path = auto_name(specific_descriptor=self.name, general_descriptor="State",377 directory=self.options["state_directory"])378 if state_dictionary:379 new_state = InstrumentState(None, **{"state_dictionary": state_dictionary,380 "style_sheet": "./DEFAULT_STATE_STYLE.xsl"})381 elif state_table:382 new_state = InstrumentState(None, **{"state_table": state_table,383 "style_sheet": "./DEFAULT_STATE_STYLE.xsl"})384 else:385 new_state = InstrumentState(None, **{"state_dictionary": self.get_state(),386 "style_sheet": "./DEFAULT_STATE_STYLE.xsl"})387 try:388 new_state.add_state_description()389 new_state.append_description(description_dictionary=self.description)390 except:391 raise # pass392 new_state.save(state_path)393 return state_path394 def load_state(self, file_path):395 """Loads a state from a file."""396 # TODO put a UDT to state_table397 state_model = InstrumentState(file_path)398 self.set_state(state_table=state_model.get_state_list_dictionary())399 def close(self):400 """Closes the VISA session"""401 print("Emulation Instrument has been closed")402 403class VisaInstrument(InstrumentSheet):404 """ General Class to communicate with COMM and GPIB instruments405 This is a blend of the pyvisa resource and an xml description. If there is no device connected406 enters into a emulation mode. Where all the commands are logged as .history and the attribute emulation_mode=True"""407 def __init__(self,resource_name=None,**options):408 """ Initializes the VisaInstrument Class"""409 defaults={"state_directory":os.getcwd(),410 "instrument_description_directory":os.path.join(PYMEASURE_ROOT,'Instruments')}411 self.options={}412 for key,value in defaults.items():413 self.options[key]=value414 for key,value in options.items():415 self.options[key]=value416 # First we try to look up the description and get info from it417 if DATA_SHEETS:418 try: 419 self.info_path=find_description(resource_name,420 directory=self.options["instrument_description_directory"])421 InstrumentSheet.__init__(self,self.info_path,**self.options)422 self.info_found=True423 self.DEFAULT_STATE_QUERY_DICTIONARY=self.get_query_dictionary()424 except:425 print('The information sheet was not found defaulting to address')426 self.DEFAULT_STATE_QUERY_DICTIONARY={}427 self.info_found=False428 self.instrument_address=resource_name429 self.name=resource_name.replace(":","_")430 pass431 else:432 self.info_found=False433 self.DEFAULT_STATE_QUERY_DICTIONARY={}434 self.instrument_address=resource_name435 436 # Create a description for state saving437 if self.info_found:438 self.description={'Instrument_Description':self.path}439 else:440 self.description={'Instrument_Description':self.instrument_address}441 442 self.state_buffer=[]443 self.STATE_BUFFER_MAX_LENGTH=10444 try:445 self.resource_manager=visa.ResourceManager()446 # Call the visa instrument class-- this gives ask,write,read447 self.resource=self.resource_manager.open_resource(self.instrument_address)448 self.emulation_mode = False449 except:450 print("Unable to load resource entering emulation mode ...")451 self.resource=EmulationInstrument(self.instrument_address)452 self.history=self.resource.history453 self.emulation_mode=True454 self.current_state=self.get_state()455 456 def write(self,command):457 "Writes command to instrument"458 return self.resource.write(command)459 def read(self):460 "Reads from the instrument"461 return self.resource.read()462 def query(self,command):463 "Writes command and then reads a response"464 return self.resource.query(command)465 def ask(self,command):466 "Writes command and then reads a response"467 return self.resource.query(command)468 def set_state(self,state_dictionary=None,state_table=None):469 """ Sets the instrument to the state specified by state_dictionary={Command:Value,..} pairs, or a list of dictionaries470 of the form state_table=[{"Set":Command,"Value":Value},..]"""471 if state_dictionary:472 if len(self.state_buffer)+1<self.STATE_BUFFER_MAX_LENGTH:473 self.state_buffer.append(self.get_state())474 else:475 self.state_buffer.pop(1)476 self.state_buffer.insert(-1,self.get_state())477 for state_command,value in state_dictionary.items():478 self.write(state_command+' '+str(value))479 self.current_state=self.get_state()480 if state_table:481 if "Index" in list(state_table[0].keys()):482 state_table=sorted(state_table,key=lambda x:x["Index"])483 if len(self.state_buffer)+1<self.STATE_BUFFER_MAX_LENGTH:484 self.state_buffer.append(self.get_state())485 else:486 self.state_buffer.pop(1)487 self.state_buffer.insert(-1,self.get_state())488 # now we need to write the command489 for state_row in state_table:490 # a state row has a set and value491 state_command=state_row["Set"]492 value=state_row["Value"]493 self.write(state_command+' '+str(value))494 495 def get_state(self,state_query_dictionary=None,state_query_table=None):496 """ Gets the current state of the instrument. get_state accepts any query dictionary in497 the form state_query_dictionary={"GPIB_SET_COMMAND":"GPIB_QUERY_COMMAND",...} or any state_query_table498 in the form [{"Set":"GPIB_SET_COMMAND","Query":"GPIB_QUERY_COMMAND","Index":Optional_int_ordering commands,499 if no state is provided it returns the DEFAULT_STATE_QUERY_DICTIONARY as read in from the InstrumentSheet"""500 if not state_query_table:501 if state_query_dictionary is None or len(state_query_dictionary)==0 :502 state_query_dictionary=self.DEFAULT_STATE_QUERY_DICTIONARY503 state=dict([(state_command,self.query(str(query)).replace("\n","")) for state_command,query504 in state_query_dictionary.items()])505 return state506 else:507 # a state_query_table is a list of dictionaries, each row has at least a Set and Query key but could508 # have an Index key that denotes order509 if "Index" in list(state_query_table[0].keys()):510 state_query_table=sorted(state_query_table,key=lambda x:int(x["Index"]))511 state=[]512 for state_row in state_query_table:513 set=state_row["Set"]514 query=state_row["Query"]515 index=state_row["Index"]516 state.append({"Set":set,"Value":self.query(query).replace("\n",""),"Index":index})517 return state518 else:519 state=[]520 for state_row in state_query_table:521 set=state_row["Set"]522 query=state_row["Query"]523 state.append({"Set":set,"Value":self.query(query).replace("\n","")})524 return state525 526 def update_current_state(self):527 self.current_state=self.get_state()528 529 def save_current_state(self):530 """ Saves the state in self.current_state attribute """531 self.current_state=self.get_state()532 self.save_state(None,state_dictionary=self.current_state)533 534 def save_state(self,state_path=None,state_dictionary=None,state_table=None,refresh_state=False):535 """ Saves any state dictionary to an xml file, with state_path,536 if not specified defaults to autonamed state and the default state dictionary refreshed at the time537 of the method call. If refresh_state=True it gets the state at the time of call otherwise538 the state is assumed to be all ready complete. state=instrument.get_state(state_dictionary) and then539 instrument.save_state(state). Or instrument.save_state(state_dictionary=state_dictionary,refresh=True)540 """541 if state_path is None:542 state_path=auto_name(specific_descriptor=self.name,general_descriptor="State",543 directory=self.options["state_directory"])544 state_path=os.path.join(self.options["state_directory"],state_path)545 if state_dictionary:546 if refresh_state:547 state_dictionary=self.get_state(state_dictionary)548 new_state=InstrumentState(None,**{"state_dictionary":state_dictionary,549 "style_sheet":"./DEFAULT_STATE_STYLE.xsl"})550 elif state_table:551 if refresh_state:552 state_table=self.get_state(state_table)553 new_state=InstrumentState(None,**{"state_table":state_table,554 "style_sheet":"./DEFAULT_STATE_STYLE.xsl"})555 else:556 new_state=InstrumentState(None,**{"state_dictionary":self.get_state(),557 "style_sheet":"./DEFAULT_STATE_STYLE.xsl"})558 try:559 new_state.append_description(description_dictionary=self.description)560 except: raise #pass561 new_state.save(state_path)562 return state_path563 def load_state(self,file_path):564 """Loads a state from a file."""565 #TODO put a UDT to state_table566 state_model=InstrumentState(file_path)567 self.set_state(state_table=state_model.get_state_list_dictionary())568 def close(self):569 """Closes the VISA session"""570 self.resource_manager.close()571class VNA(VisaInstrument):572 """Control class for a linear VNA.573 The .measure_sparameters ans .measure_switch_terms return a S2PV1574 class that can be saved, printed or have a simple plot using show(). The attribute frequency_list575 stores the frequency points as Hz."""576 def __init__(self, resource_name=None, **options):577 """Initializes the E8631A control class"""578 defaults = {"state_directory": os.getcwd(), "frequency_units": "Hz"}579 self.options = {}580 for key, value in defaults.items():581 self.options[key] = value582 for key, value in options.items():583 self.options[key] = value584 VisaInstrument.__init__(self, resource_name, **self.options)585 if self.emulation_mode:586 self.power = -20587 self.IFBW = 10588 self.frequency_units = self.options["frequency_units"]589 self.frequency_table = []590 # this should be if SENS:SWE:TYPE? is LIN or LOG591 self.sweep_type ="LIN"592 else:593 self.power = self.get_power()594 self.IFBW = self.get_IFBW()595 self.frequency_units = self.options["frequency_units"]596 self.frequency_table = []597 # this should be if SENS:SWE:TYPE? is LIN or LOG598 self.sweep_type = self.get_sweep_type()599 if re.search("LIN", self.sweep_type, re.IGNORECASE):600 start = float(self.query("SENS:FREQ:START?").replace("\n", ""))601 stop = float(self.query("SENS:FREQ:STOP?").replace("\n", ""))602 number_points = int(self.query("SENS:SWE:POIN?").replace("\n", ""))603 self.frequency_list = np.linspace(start, stop, number_points).tolist()604 elif re.search("LOG", self.sweep_type, re.IGNORECASE):605 start = float(self.query("SENS:FREQ:START?").replace("\n", ""))606 stop = float(self.query("SENS:FREQ:STOP?").replace("\n", ""))607 number_points = int(self.query("SENS:SWE:POIN?").replace("\n", ""))608 logspace_start = np.log10(start)609 logspace_stop = np.log10(stop)610 self.frequency_list = [round(x, ndigits=3) for x in np.logspace(logspace_start, logspace_stop,611 num=number_points, base=10).tolist()]612 elif re.search("SEG", self.sweep_type, re.IGNORECASE):613 number_segments = int(self.query("SENS:SEGM:COUN?").replace("\n", ""))614 for i in range(number_segments):615 start = float(self.query("SENS:SEGM{0}:FREQ:START?".format(i + 1)).replace("\n", ""))616 stop = float(self.query("SENS:SEGM{0}:FREQ:STOP?".format(i + 1)).replace("\n", ""))617 number_points = int(self.query("SENS:SEGM{0}:SWE:POIN?".format(i + 1)).replace("\n", ""))618 step = (stop - start) / float(number_points - 1)619 self.frequency_table.append({"start": start, "stop": stop,620 "number_points": number_points, "step": step})621 self.frequency_table = fix_segment_table(self.frequency_table)622 frequency_list = []623 for row in self.frequency_table[:]:624 new_list = np.linspace(row["start"], row["stop"], row["number_points"]).tolist()625 frequency_list = frequency_list + new_list626 self.frequency_list = frequency_list627 else:628 self.frequency_list = []629 def add_trace(self,trace_name,trace_parameter,drive_port=1,display_trace=True):630 """Adds a single trace to the VNA. Trace parameters vary by instrument and can be ratios of631 recievers or raw receiver values. For instance, R1 is the a1 wave. Traditional Sparameters632 do not require the identification of a drive_port. Does not display trace on the front panel"""633 if re.search("S",trace_parameter,re.IGNORECASE):634 self.write("CALCulate:PARameter:DEFine '{0}',{1}".format(trace_name,trace_parameter))635 else:636 self.write("CALCulate:PARameter:DEFine '{0}',{1},{2}".format(trace_name, trace_parameter,drive_port))637 if display_trace:638 self.write("DISPlay:WINDow1:TRACe1:FEED '{0}'".format(trace_name))639 def read_trace(self,trace_name):640 """Returns a 2-d list of [[reParameter1,imParameter1],..[reParameterN,imParameterN]] where641 n is the number of points in the sweep. User is responsible for triggering the sweep and retrieving642 the frequency array vna.get_frequency_list()"""643 self.write('FORM:ASC,0')644 # First get the A and Blists645 self.write('CALC:PAR:SEL "{0}"'.format(trace_name))646 self.write('CALC:FORM MLIN')647 while self.is_busy():648 time.sleep(.01)649 out_string = self.query('CALC:DATA? SDATA')650 out_string=out_string.replace("\n", "").split(",")651 re_list=out_string[0::2]652 im_list=out_string[1::2]653 out_list=[[float(real_parameter),float(im_list[index])] for index,real_parameter in enumerate(re_list)]654 return out_list655 def trigger_sweep(self):656 """Triggers a single sweep of the VNA, note you need to wait for the sweep to finish before reading the657 values. It takes ~ #ports sourced*#points/IFBW """658 self.write("INITiate:CONTinuous OFF")659 self.write("ABORT;INITiate:IMMediate;*wai")660 def get_trace_catalog(self):661 """Returns the trace catalog as a string"""662 trace_string = self.query("CALC:PAR:CAT?")663 return trace_string664 def get_trace_list(self):665 """Returns the active trace names as a list"""666 trace_string = self.query("CALC:PAR:CAT?")667 trace_list = trace_string.replace("\n", "").split(",")[0::3]668 return trace_list669 def delete_all_traces(self):670 """Deletes all active traces"""671 trace_string = self.query("CALC:PAR:CAT?")672 # remove endline, split on , and then take every third one673 trace_list = trace_string.replace("\n", "").split(",")[0::3]674 for trace in trace_list:675 self.write("CALC:PAR:DEL '{0}'".format(trace))676 def set_source_output(self,state=0):677 """Sets all of the outputs of the VNA to OFF(0) or ON (1). This disables/enables all the source outputs."""678 self.write("OUTP {0}".format(state))679 def get_source_output(self):680 """Returns the state of the outputs. This is equivelent to vna.query('OUTP?')"""681 state=self.query("OUTP?")682 return int(state.replace("\n",""))683 def add_all_traces(self,**options):684 """Adds all Sparameter and wave parameter traces.685 Does not initialize the instrument. The trace names match those in the686 measure methods (S11,S12,..S22) and (A1_D1,B1_D1..B2_D2) by default it687 assumes port 1 and port 2 are being used. In addition, it assumes the B receiver names are [A,B,C,D].688 This method will cause an error if the traces are already defined"""689 defaults = {"port1": 1,"port2":2, "b_name_list": ["A", "B", "C", "D"]}690 initialize_options = {}691 for key, value in defaults.items():692 initialize_options[key] = value693 for key, value in options.items():694 initialize_options[key] = value695 self.write("DISPlay:WINDow1:STATE ON")696 scattering_parameter_names=["S11","S12","S21","S22"]697 trace_definitions=["S{0}{1}".format(initialize_options["port1"],initialize_options["port1"]),698 "S{0}{1}".format(initialize_options["port1"], initialize_options["port2"]),699 "S{0}{1}".format(initialize_options["port2"], initialize_options["port1"]),700 "S{0}{1}".format(initialize_options["port2"], initialize_options["port2"])]701 for index,name in enumerate(scattering_parameter_names):702 self.write("CALCulate:PARameter:DEFine '{0}',{1}".format(name,trace_definitions[index]))703 self.write("DISPlay:WINDow1:TRACe{1}:FEED '{0}'".format(name,index+1))704 b1_name = initialize_options["b_name_list"][initialize_options["port1"] - 1]705 b2_name= initialize_options["b_name_list"][initialize_options["port2"] - 1]706 # Initialize Port 1 traces A1_D1,B1_D1,B2_D1707 self.write("CALCulate:PARameter:DEFine 'A{0}_D{0}',R{0},{0}".format(initialize_options["port1"]))708 self.write("DISPlay:WINDow1:TRACe5:FEED 'A{0}_D{0}'".format(initialize_options["port1"]))709 self.write("CALCulate:PARameter:DEFine 'B{0}_D{0}',{1},{0}".format(initialize_options["port1"],b1_name))710 self.write("DISPlay:WINDow1:TRACe6:FEED 'B{0}_D{0}'".format(initialize_options["port1"]))711 self.write("CALCulate:PARameter:DEFine 'A{1}_D{0}',R{1},{0}".format(initialize_options["port1"],712 initialize_options["port2"] ))713 self.write("DISPlay:WINDow1:TRACe7:FEED 'A{1}_D{0}'".format(initialize_options["port1"],714 initialize_options["port2"]))715 self.write("CALCulate:PARameter:DEFine 'B{1}_D{0}',{2},{0}".format(initialize_options["port1"],716 initialize_options["port2"],717 b2_name))718 self.write("DISPlay:WINDow1:TRACe8:FEED 'B{1}_D{0}'".format(initialize_options["port1"],719 initialize_options["port2"]))720 # Initialize Port 2 Traces A1_D2,B1_D2,721 self.write("CALCulate:PARameter:DEFine 'A{0}_D{1}',R{0},{1}".format(initialize_options["port1"],722 initialize_options["port2"] ))723 self.write("DISPlay:WINDow1:TRACe9:FEED 'A{0}_D{1}'".format(initialize_options["port1"],724 initialize_options["port2"]))725 self.write("CALCulate:PARameter:DEFine 'B{0}_D{1}',{2},{1}".format(initialize_options["port1"],726 initialize_options["port2"],727 b1_name))728 self.write("DISPlay:WINDow1:TRACe10:FEED 'B{0}_D{1}'".format(initialize_options["port1"],729 initialize_options["port2"]))730 self.write("CALCulate:PARameter:DEFine 'A{1}_D{1}',R{1},{1}".format(initialize_options["port1"],731 initialize_options["port2"] ))732 self.write("DISPlay:WINDow1:TRACe11:FEED 'A{1}_D{1}'".format(initialize_options["port1"],733 initialize_options["port2"]))734 self.write("CALCulate:PARameter:DEFine 'B{1}_D{1}',{2},{1}".format(initialize_options["port1"],735 initialize_options["port2"],736 b2_name))737 self.write("DISPlay:WINDow1:TRACe12:FEED 'B{1}_D{1}'".format(initialize_options["port1"],738 initialize_options["port2"]))739 def initialize_s2p(self, **options):740 """Intializes the system to take two sparameters"""741 defaults = {"reset": False}742 initialize_options = {}743 for key, value in defaults.items():744 initialize_options[key] = value745 for key, value in options.items():746 initialize_options[key] = value747 if initialize_options["reset"]:748 self.write("SYST:FPRESET")749 self.write("DISPlay:WINDow1:STATE ON")750 self.write("CALCulate:PARameter:DEFine 'S11',S11")751 self.write("DISPlay:WINDow1:TRACe1:FEED 'S11'")752 self.write("CALCulate:PARameter:DEFine 'S12',S12")753 self.write("DISPlay:WINDow1:TRACe2:FEED 'S12'")754 self.write("CALCulate:PARameter:DEFine 'S21',S21")755 self.write("DISPlay:WINDow1:TRACe3:FEED 'S21'")756 self.write("CALCulate:PARameter:DEFine 'S22',S22")757 self.write("DISPlay:WINDow1:TRACe4:FEED 'S22'")758 self.sweep_type = self.get_sweep_type()759 if re.search("LIN", self.sweep_type, re.IGNORECASE):760 start = float(self.query("SENS:FREQ:START?").replace("\n", ""))761 stop = float(self.query("SENS:FREQ:STOP?").replace("\n", ""))762 number_points = int(self.query("SENS:SWE:POIN?").replace("\n", ""))763 self.frequency_list = np.linspace(start, stop, number_points).tolist()764 elif re.search("LOG", self.sweep_type, re.IGNORECASE):765 start = float(self.query("SENS:FREQ:START?").replace("\n", ""))766 stop = float(self.query("SENS:FREQ:STOP?").replace("\n", ""))767 number_points = int(self.query("SENS:SWE:POIN?").replace("\n", ""))768 logspace_start = np.log10(start)769 logspace_stop = np.log10(stop)770 self.frequency_list = [round(x, ndigits=3) for x in np.logspace(logspace_start, logspace_stop,771 num=number_points, base=10).tolist()]772 elif re.search("SEG", self.sweep_type, re.IGNORECASE):773 number_segments = int(self.query("SENS:SEGM:COUN?").replace("\n", ""))774 for i in range(number_segments):775 start = float(self.query("SENS:SEGM{0}:FREQ:START?".format(i + 1)).replace("\n", ""))776 stop = float(self.query("SENS:SEGM{0}:FREQ:STOP?".format(i + 1)).replace("\n", ""))777 number_points = int(self.query("SENS:SEGM{0}:SWE:POIN?".format(i + 1)).replace("\n", ""))778 step = (stop - start) / float(number_points - 1)779 self.frequency_table.append({"start": start, "stop": stop,780 "number_points": number_points, "step": step})781 self.frequency_table = fix_segment_table(self.frequency_table)782 frequency_list = []783 for row in self.frequency_table[:]:784 new_list = np.linspace(row["start"], row["stop"], row["number_points"]).tolist()785 frequency_list = frequency_list + new_list786 self.frequency_list = frequency_list787 else:788 self.frequency_list = []789 def initialize(self,**options):790 """A handler to initialize the system to acquire the parameters of choice.791 The default behavior is to initialize_s2p"""792 defaults={"parameters":"s2p"}793 self.initialize_options={}794 for key,value in defaults.items():795 self.initialize_options[key]=value796 for key,value in options.items():797 self.initialize_options[key]=value798 if re.search("s2p",self.initialize_options["parameters"],re.IGNORECASE):799 self.initialize_s2p(**self.initialize_options)800 elif re.search("w1p",self.initialize_options["parameters"],re.IGNORECASE):801 self.initialize_w1p(**self.initialize_options)802 elif re.search("w2p",self.initialize_options["parameters"],re.IGNORECASE):803 self.initialize_w2p(**self.initialize_options)804 else:805 print("Initialization failed because it did not understand {0}".format(self.initialize_options))806 def set_power(self, power):807 """Sets the power of the Instrument in dbm"""808 self.write('SOUR:POW {0}'.format(power))809 def get_power(self):810 "Returns the power of the instrument in dbm"811 return self.query('SOUR:POW?')812 def get_sweep_type(self):813 "Returns the current sweep type. It can be LIN, LOG, or SEG"814 return self.query("SENS:SWE:TYPE?")815 def set_IFBW(self, ifbw):816 """Sets the IF Bandwidth of the instrument in Hz"""817 self.write('SENS:BAND {0}'.format(ifbw))818 self.write('SENS:BAND:TRAC OFF')819 self.IFBW = ifbw820 def get_IFBW(self):821 """Returns the IFBW of the instrument in Hz"""822 ifbw = float(self.query('SENS:BAND?'))823 self.IFBW = ifbw824 return ifbw825 def set_frequency_units(self, frequency_units="Hz"):826 """Sets the frequency units of the class, all values are still written to the VNA827 as Hz and the attrbiute frequncy_list is in Hz,828 however all commands that deal with sweeps and measurements will be in units"""829 for unit in list(VNA_FREQUENCY_UNIT_MULTIPLIERS.keys()):830 if re.match(unit, frequency_units, re.IGNORECASE):831 self.frequency_units = unit832 def add_segment(self, start, stop=None, number_points=None, step=None, frequency_units="Hz"):833 """Sets the VNA to a segment mode and appends a single entry in the frequency table. If start is the only specified834 parameter sets the entry to start=stop and number_points = 1. If step is specified calculates the number of points835 and sets start, stop, number_points on the VNA. It also stores the value into the attribute frequency_list.836 Note this function was primarily tested on an agilent which stores frequency to the nearest mHz.837 """838 # first handle the start only case839 if stop is None and number_points is None:840 stop = start841 number_points = 1842 # fix the frequency units843 for unit in list(VNA_FREQUENCY_UNIT_MULTIPLIERS.keys()):844 if re.match(unit, frequency_units, re.IGNORECASE):845 start = start * VNA_FREQUENCY_UNIT_MULTIPLIERS[unit]846 stop = stop * VNA_FREQUENCY_UNIT_MULTIPLIERS[unit]847 if step:848 step = step * VNA_FREQUENCY_UNIT_MULTIPLIERS[unit]849 self.frequency_units = unit850 # handle creating step and number of points851 if number_points is None and not step is None:852 number_points = round((stop - start) / step) + 1853 elif number_points is None:854 number_points = 201 # I don't like the default for n_points this far down in the code855 step = (stop - start) / (number_points - 1)856 else:857 step = (stop - start) / (number_points - 1)858 # append the new segment to self.frequency_table and fix any strangeness859 self.frequency_table.append({"start": start, "stop": stop, "number_points": number_points, "step": step})860 self.frequency_table = fix_segment_table(self.frequency_table[:])861 # update the frequency_list862 frequency_list = []863 for row in self.frequency_table[:]:864 new_list = np.linspace(row["start"], row["stop"], row["number_points"]).tolist()865 frequency_list = frequency_list + new_list866 self.frequency_list = frequency_list867 # now we write the segment to the instrument868 if not re.search("SEG", self.get_sweep_type(), re.IGNORECASE):869 self.write('SENS:SWE:TYPE SEGM')870 # now get the number of segments and add or delete the right amount to make it line up with self.frequency_table871 # This routine is broken872 number_segments = int(self.query("SENS:SEGM:COUN?").replace("\n", ""))873 #print(("{0} is {1}".format("number_segments", number_segments)))874 if len(self.frequency_table) < number_segments:875 difference = number_segments - len(self.frequency_table)876 max_segment = number_segments877 while (difference != 0):878 self.write("SENS:SEGM{0}:DEL".format(max_segment))879 max_segment -= 1880 difference -= 1881 elif len(self.frequency_table) > number_segments:882 difference = len(self.frequency_table) - number_segments883 max_segment = number_segments + 1884 #print(("{0} is {1}".format("difference", difference)))885 while (difference != 0):886 self.write("SENS:SEGM{0}:ADD".format(max_segment))887 max_segment += 1888 difference -= 1889 #print(("{0} is {1}".format("difference", difference)))890 else:891 pass892 for row_index, row in enumerate(self.frequency_table[:]):893 [start, stop, number_points] = [row["start"], row["stop"], row["number_points"]]894 # SENSe<cnum>:SEGMent<snum>:SWEep:POINts <num>895 self.write("SENS:SEGM{0}:FREQ:START {1}".format(row_index + 1, start))896 self.write("SENS:SEGM{0}:FREQ:STOP {1}".format(row_index + 1, stop))897 self.write("SENS:SEGM{0}:SWE:POIN {1}".format(row_index + 1, number_points))898 self.write("SENS:SEGM{0}:STAT ON".format(row_index + 1))899 def remove_segment(self,segment=1):900 """Removes a the segment, default is segment 1 """901 self.write("SENS:SEGM{0}:DEL".format(segment))902 def remove_all_segments(self):903 """Removes all segments from VNA"""904 segment_count = int(self.query("SENS:SEGM:COUN?").replace("\n", ""))905 for i in range(segment_count):906 segment = segment_count - i907 self.write("SENS:SEGM{0}:DEL".format(segment))908 def write_frequency_table(self, frequency_table=None):909 """Writes frequency_table to the instrument, the frequency table should be in the form910 [{start:,stop:,number_points:}..] or None"""911 if frequency_table is None:912 frequency_table = self.frequency_table[:]913 for row_index, row in enumerate(frequency_table[:]):914 [start, stop, number_points] = [row["start"], row["stop"], row["number_points"]]915 # SENSe<cnum>:SEGMent<snum>:SWEep:POINts <num>916 self.write("SENS:SEGM{0}:FREQ:START {1}".format(row_index + 1, start))917 self.write("SENS:SEGM{0}:FREQ:STOP {1}".format(row_index + 1, stop))918 self.write("SENS:SEGM{0}:SWE:POIN {1}".format(row_index + 1, number_points))919 self.write("SENS:SEGM{0}:STAT ON".format(row_index + 1))920 def set_frequency(self, start, stop=None, number_points=None, step=None, type='LIN', frequency_units="Hz"):921 """Sets the VNA to a linear mode and creates a single entry in the frequency table. If start is the only specified922 parameter sets the entry to start=stop and number_points = 1. If step is specified calculates the number of points923 and sets start, stop, number_points on the VNA. It also stores the value into the attribute frequency_list.924 Note this function was primarily tested on an agilent which stores frequency to the nearest mHz.925 """926 if stop is None and number_points is None:927 stop = start928 number_points = 1929 for unit in list(VNA_FREQUENCY_UNIT_MULTIPLIERS.keys()):930 if re.match(unit, frequency_units, re.IGNORECASE):931 start = start * VNA_FREQUENCY_UNIT_MULTIPLIERS[unit]932 stop = stop * VNA_FREQUENCY_UNIT_MULTIPLIERS[unit]933 if step:934 step = step * VNA_FREQUENCY_UNIT_MULTIPLIERS[unit]935 self.frequency_units = unit936 if number_points is None and not step is None:937 number_points = round((stop - start) / step) + 1938 if re.search("LIN", type, re.IGNORECASE):939 self.write('SENS:SWE:TYPE LIN')940 self.frequency_list = np.linspace(start, stop, number_points).tolist()941 elif re.search("LOG", type, re.IGNORECASE):942 self.write('SENS:SWE:TYPE LOG')943 logspace_start = np.log10(start)944 logspace_stop = np.log10(stop)945 self.frequency_list = [round(x, ndigits=3) for x in np.logspace(logspace_start, logspace_stop,946 num=number_points, base=10).tolist()]947 else:948 self.write('SENS:SWE:TYPE LIN')949 self.frequency_list = [round(x, ndigits=3) for x in np.linspace(start, stop, number_points).tolist()]950 self.write("SENS:FREQ:START {0}".format(start))951 self.write("SENS:FREQ:STOP {0}".format(stop))952 self.write("SENS:SWE:POIN {0}".format(number_points))953 def get_frequency(self):954 "Returns the frequency in python list format"955 return self.get_frequency_list()956 def is_busy(self):957 """Checks if the instrument is currently doing something and returns a boolean value"""958 opc = bool(self.resource.query("*OPC?"))959 return not opc960 def clear_window(self, window=1):961 """Clears the window of traces. Does not delete the variables"""962 string_response = self.query("DISPlay:WINDow{0}:CATalog?".format(window))963 traces = string_response.split(",")964 for trace in traces:965 self.write("DISP:WIND{0}:TRAC{1}:DEL".format(window, trace))966 @emulation_data(EMULATION_SWITCH_TERMS)967 def measure_switch_terms(self, **options):968 """Measures switch terms and returns a s2p table in forward and reverse format. To return in port format969 set the option order= "PORT"""970 defaults = {"view_trace": True,"initialize":True,"order":"FR"}971 self.measure_switch_term_options = {}972 for key, value in defaults.items():973 self.measure_switch_term_options[key] = value974 for key, value in options.items():975 self.measure_switch_term_options[key] = value976 # this resets the traces to be based on swith terms977 # Set VS to be remotely triggered by GPIB978 self.write("SENS:HOLD:FUNC HOLD")979 self.write("TRIG:REM:TYP CHAN")980 if self.measure_switch_term_options["initialize"]:981 # Set the Channel to have 2 Traces982 self.write("CALC1:PAR:COUN 2")983 # Trace 1 This is port 2 or Forward Switch Terms984 self.write("CALC1:PAR:DEF 'FWD',R2B,1") # note this command is different for vector star A2,B2985 if self.measure_switch_term_options["view_trace"]:986 self.write("DISPlay:WINDow1:TRACe5:FEED 'FWD'")987 # Trace 2 This is port 1 or Reverse Switch Terms988 self.write("CALC1:PAR:DEF 'REV',R1A,2")989 if self.measure_switch_term_options["view_trace"]:990 self.write("DISPlay:WINDow1:TRACe6:FEED 'REV'")991 # Select Channel992 self.write("CALC1:SEL;")993 self.write("ABORT;TRIG:SING;")994 # Sleep for the duration of the scan995 time.sleep(len(self.frequency_list) * 2.5 / float(self.IFBW))996 # wait for other functions to be completed997 # while self.is_busy():998 # time.sleep(.01)999 # Set the read format1000 self.write("FORM:ASC,0")1001 # Read in the data1002 self.write("CALC:PAR:SEL 'FWD';")1003 foward_switch_string = self.query("CALC:DATA? SDATA")1004 while self.is_busy():1005 time.sleep(.01)1006 self.write("CALC:PAR:SEL 'REV';")1007 reverse_switch_string = self.query("CALC:DATA? SDATA")1008 # Anritsu Specific String Parsing1009 foward_switch_string=re.sub("#\d+\n","",foward_switch_string)1010 reverse_switch_string=re.sub("#\d+\n","",reverse_switch_string)1011 # Now parse the string1012 foward_switch_list = foward_switch_string.replace("\n", "").split(",")1013 reverse_switch_list = reverse_switch_string.replace("\n", "").split(",")1014 real_foward = foward_switch_list[0::2]1015 imaginary_forward = foward_switch_list[1::2]1016 real_reverse = reverse_switch_list[0::2]1017 imaginary_reverse = reverse_switch_list[1::2]1018 switch_data = []1019 if re.search("f",self.measure_switch_term_options["order"],re.IGNORECASE):1020 for index, frequency in enumerate(self.frequency_list[:]):1021 new_row = [frequency,1022 real_foward[index], imaginary_forward[index],1023 real_reverse[index], imaginary_reverse[index],1024 0, 0,1025 0, 0]1026 new_row = [float(x) for x in new_row]1027 switch_data.append(new_row)1028 elif re.search("p",self.measure_switch_term_options["order"],re.IGNORECASE):1029 for index, frequency in enumerate(self.frequency_list[:]):1030 new_row = [frequency,1031 real_reverse[index], imaginary_reverse[index],1032 real_foward[index], imaginary_forward[index],1033 0, 0,1034 0, 0]1035 new_row = [float(x) for x in new_row]1036 switch_data.append(new_row)1037 option_line = "# Hz S RI R 50"1038 # add some options here about auto saving1039 # do we want comment options?1040 s2p = S2PV1(None, option_line=option_line, data=switch_data)1041 s2p.change_frequency_units(self.frequency_units)1042 return s2p1043 @emulation_data(EMULATION_S2P)1044 def measure_sparameters(self, **options):1045 """Triggers a single sparameter measurement for all 4 parameters and returns a SP2V1 object"""1046 defaults = {"trigger": "single"}1047 self.measure_sparameter_options = {}1048 for key, value in defaults.items():1049 self.measure_sparameter_options[key] = value1050 for key, value in options.items():1051 self.measure_sparameter_options[key] = value1052 if self.measure_sparameter_options["trigger"] in ["single"]:1053 self.write("INITiate:CONTinuous OFF")1054 self.write("ABORT;INITiate:IMMediate;*wai")1055 # now go to sleep for the time to take the scan1056 time.sleep(len(self.frequency_list) * 2 / float(self.IFBW))1057 # wait for other functions to be completed1058 while self.is_busy():1059 time.sleep(.01)1060 # Set the format to ascii and set up sweep definitions1061 self.write('FORM:ASC,0')1062 # First get the Sparameter lists1063 self.write('CALC:PAR:SEL S11')1064 self.write('CALC:FORM MLIN')1065 while self.is_busy():1066 time.sleep(.01)1067 s11_string = self.query('CALC:DATA? SDATA')1068 self.write('CALC:PAR:SEL S12')1069 self.write('CALC:FORM MLIN')1070 while self.is_busy():1071 time.sleep(.01)1072 s12_string = self.query('CALC:DATA? SDATA')1073 self.write('CALC:PAR:SEL S21')1074 self.write('CALC:FORM MLIN')1075 while self.is_busy():1076 time.sleep(.01)1077 s21_string = self.query('CALC:DATA? SDATA')1078 self.write('CALC:PAR:SEL S22')1079 self.write('CALC:FORM MLIN')1080 while self.is_busy():1081 time.sleep(.01)1082 s22_string = self.query('CALC:DATA? SDATA')1083 # String Parsing, Vector star specific, but no harm to Keysight, Rohde1084 s11_string=re.sub("#\d+\n","",s11_string)1085 s12_string=re.sub("#\d+\n","",s12_string)1086 s21_string=re.sub("#\d+\n","",s21_string)1087 s22_string=re.sub("#\d+\n","",s22_string)1088 s11_list = s11_string.replace("\n", "").split(",")1089 s12_list = s12_string.replace("\n", "").split(",")1090 s21_list = s21_string.replace("\n", "").split(",")1091 s22_list = s22_string.replace("\n", "").split(",")1092 # Construct a list of lists that is data in RI format1093 reS11 = s11_list[0::2]1094 imS11 = s11_list[1::2]1095 reS12 = s12_list[0::2]1096 imS12 = s12_list[1::2]1097 reS21 = s21_list[0::2]1098 imS21 = s21_list[1::2]1099 reS22 = s22_list[0::2]1100 imS22 = s22_list[1::2]1101 sparameter_data = []1102 for index, frequency in enumerate(self.frequency_list[:]):1103 new_row = [frequency,1104 reS11[index], imS11[index],1105 reS21[index], imS21[index],1106 reS12[index], imS12[index],1107 reS22[index], imS22[index]]1108 new_row = [float(x) for x in new_row]1109 sparameter_data.append(new_row)1110 option_line = "# Hz S RI R 50"1111 # add some options here about auto saving1112 # do we want comment options?1113 s2p = S2PV1(None, option_line=option_line, data=sparameter_data)1114 s2p.change_frequency_units(self.frequency_units)1115 return s2p1116 def initialize_w2p(self,**options):1117 """Initializes the system for w2p acquisition"""1118 defaults = {"reset": False, "port1": 1,"port2":2, "b_name_list": ["A", "B", "C", "D"]}1119 initialize_options = {}1120 for key, value in defaults.items():1121 initialize_options[key] = value1122 for key, value in options.items():1123 initialize_options[key] = value1124 if initialize_options["reset"]:1125 self.write("SYST:FPRESET")1126 b1_name = initialize_options["b_name_list"][initialize_options["port1"] - 1]1127 b2_name= initialize_options["b_name_list"][initialize_options["port2"] - 1]1128 # Initialize Port 1 traces A1_D1,B1_D1,B2_D11129 self.write("DISPlay:WINDow1:STATE ON")1130 self.write("CALCulate:PARameter:DEFine 'A{0}_D{0}',R{0},{0}".format(initialize_options["port1"]))1131 self.write("DISPlay:WINDow1:TRACe1:FEED 'A{0}_D{0}'".format(initialize_options["port1"]))1132 self.write("CALCulate:PARameter:DEFine 'B{0}_D{0}',{1},{0}".format(initialize_options["port1"],b1_name))1133 self.write("DISPlay:WINDow1:TRACe2:FEED 'B{0}_D{0}'".format(initialize_options["port1"]))1134 self.write("CALCulate:PARameter:DEFine 'A{1}_D{0}',R{1},{0}".format(initialize_options["port1"],1135 initialize_options["port2"] ))1136 self.write("DISPlay:WINDow1:TRACe3:FEED 'A{1}_D{0}'".format(initialize_options["port1"],1137 initialize_options["port2"]))1138 self.write("CALCulate:PARameter:DEFine 'B{1}_D{0}',{2},{0}".format(initialize_options["port1"],1139 initialize_options["port2"],1140 b2_name))1141 self.write("DISPlay:WINDow1:TRACe4:FEED 'B{1}_D{0}'".format(initialize_options["port1"],1142 initialize_options["port2"]))1143 # Initialize Port 2 Traces A1_D2,B1_D2,1144 self.write("CALCulate:PARameter:DEFine 'A{0}_D{1}',R{0},{1}".format(initialize_options["port1"],1145 initialize_options["port2"] ))1146 self.write("DISPlay:WINDow1:TRACe5:FEED 'A{0}_D{1}'".format(initialize_options["port1"],1147 initialize_options["port2"]))1148 self.write("CALCulate:PARameter:DEFine 'B{0}_D{1}',{2},{1}".format(initialize_options["port1"],1149 initialize_options["port2"],1150 b1_name))1151 self.write("DISPlay:WINDow1:TRACe6:FEED 'B{0}_D{1}'".format(initialize_options["port1"],1152 initialize_options["port2"]))1153 self.write("CALCulate:PARameter:DEFine 'A{1}_D{1}',R{1},{1}".format(initialize_options["port1"],1154 initialize_options["port2"] ))1155 self.write("DISPlay:WINDow1:TRACe7:FEED 'A{1}_D{1}'".format(initialize_options["port1"],1156 initialize_options["port2"]))1157 self.write("CALCulate:PARameter:DEFine 'B{1}_D{1}',{2},{1}".format(initialize_options["port1"],1158 initialize_options["port2"],1159 b2_name))1160 self.write("DISPlay:WINDow1:TRACe8:FEED 'B{1}_D{1}'".format(initialize_options["port1"],1161 initialize_options["port2"]))1162 self.sweep_type = self.get_sweep_type()1163 self.frequency_list=self.get_frequency_list()1164 def initialize_w1p(self, **options):1165 """Initializes the system for w1p acquisition, default works for ZVA"""1166 defaults = {"reset": False, "port": 1, "b_name_list": ["A", "B", "C", "D"],"source_port":1}1167 initialize_options = {}1168 for key, value in defaults.items():1169 initialize_options[key] = value1170 for key, value in options.items():1171 initialize_options[key] = value1172 if initialize_options["reset"]:1173 self.write("SYST:FPRESET")1174 b_name = initialize_options["b_name_list"][initialize_options["port"] - 1]1175 self.write("DISPlay:WINDow1:STATE ON")1176 self.write("CALCulate:PARameter:DEFine 'A{0}_D{0}',R{0}".format(initialize_options["port"]))1177 self.write("DISPlay:WINDow1:TRACe1:FEED 'A{0}_D{0}'".format(initialize_options["port"]))1178 self.write("CALCulate:PARameter:DEFine 'B{0}_D{0}',{1}".format(initialize_options["port"],1179 b_name))1180 self.write("DISPlay:WINDow1:TRACe2:FEED 'B{0}_D{0}'".format(initialize_options["port"]))1181 self.sweep_type = self.get_sweep_type()1182 if re.search("LIN", self.sweep_type, re.IGNORECASE):1183 start = float(self.query("SENS:FREQ:START?").replace("\n", ""))1184 stop = float(self.query("SENS:FREQ:STOP?").replace("\n", ""))1185 number_points = int(self.query("SENS:SWE:POIN?").replace("\n", ""))1186 self.frequency_list = np.linspace(start, stop, number_points).tolist()1187 elif re.search("LOG", self.sweep_type, re.IGNORECASE):1188 start = float(self.query("SENS:FREQ:START?").replace("\n", ""))1189 stop = float(self.query("SENS:FREQ:STOP?").replace("\n", ""))1190 number_points = int(self.query("SENS:SWE:POIN?").replace("\n", ""))1191 logspace_start = np.log10(start)1192 logspace_stop = np.log10(stop)1193 self.frequency_list = [round(x, ndigits=3) for x in np.logspace(logspace_start, logspace_stop,1194 num=number_points, base=10).tolist()]1195 elif re.search("SEG", self.sweep_type, re.IGNORECASE):1196 self.frequency_table=[]1197 number_segments = int(self.query("SENS:SEGM:COUN?").replace("\n", ""))1198 for i in range(number_segments):1199 start = float(self.query("SENS:SEGM{0}:FREQ:START?".format(i + 1)).replace("\n", ""))1200 stop = float(self.query("SENS:SEGM{0}:FREQ:STOP?".format(i + 1)).replace("\n", ""))1201 number_points = int(self.query("SENS:SEGM{0}:SWE:POIN?".format(i + 1)).replace("\n", ""))1202 step = (stop - start) / float(number_points - 1)1203 self.frequency_table.append({"start": start, "stop": stop,1204 "number_points": number_points, "step": step})1205 self.frequency_table = fix_segment_table(self.frequency_table)1206 frequency_list = []1207 for row in self.frequency_table[:]:1208 new_list = np.linspace(row["start"], row["stop"], row["number_points"]).tolist()1209 frequency_list = frequency_list + new_list1210 self.frequency_list = frequency_list1211 else:1212 self.frequency_list = []1213 def get_frequency_list(self):1214 "Returns the frequency list as read from the VNA"1215 self.sweep_type = self.get_sweep_type()1216 if re.search("LIN", self.sweep_type, re.IGNORECASE):1217 start = float(self.query("SENS:FREQ:START?").replace("\n", ""))1218 stop = float(self.query("SENS:FREQ:STOP?").replace("\n", ""))1219 number_points = int(self.query("SENS:SWE:POIN?").replace("\n", ""))1220 self.frequency_list = np.linspace(start, stop, number_points).tolist()1221 elif re.search("LOG", self.sweep_type, re.IGNORECASE):1222 start = float(self.query("SENS:FREQ:START?").replace("\n", ""))1223 stop = float(self.query("SENS:FREQ:STOP?").replace("\n", ""))1224 number_points = int(self.query("SENS:SWE:POIN?").replace("\n", ""))1225 logspace_start = np.log10(start)1226 logspace_stop = np.log10(stop)1227 self.frequency_list = np.logspace(logspace_start, logspace_stop,num=number_points, base=10).tolist()1228 elif re.search("SEG", self.sweep_type, re.IGNORECASE):1229 self.frequency_table=[]1230 number_segments = int(self.query("SENS:SEGM:COUN?").replace("\n", ""))1231 for i in range(number_segments):1232 start = float(self.query("SENS:SEGM{0}:FREQ:START?".format(i + 1)).replace("\n", ""))1233 stop = float(self.query("SENS:SEGM{0}:FREQ:STOP?".format(i + 1)).replace("\n", ""))1234 number_points = int(self.query("SENS:SEGM{0}:SWE:POIN?".format(i + 1)).replace("\n", ""))1235 step = (stop - start) / float(number_points - 1)1236 self.frequency_table.append({"start": start, "stop": stop,1237 "number_points": number_points, "step": step})1238 self.frequency_table = fix_segment_table(self.frequency_table)1239 frequency_list = []1240 for row in self.frequency_table[:]:1241 new_list = np.linspace(row["start"], row["stop"], row["number_points"]).tolist()1242 frequency_list = frequency_list + new_list1243 self.frequency_list = frequency_list1244 else:1245 self.frequency_list = []1246 return self.frequency_list[:]1247 @emulation_data(EMULATION_W1P)1248 def measure_w1p(self, **options):1249 """Triggers a single w1p measurement for a specified1250 port and returns a w1p object."""1251 defaults = {"trigger": "single", "port": 1,1252 "b_name_list": ["A", "B", "C", "D"],1253 "w1p_options": None}1254 self.measure_w1p_options = {}1255 for key, value in defaults.items():1256 self.measure_w1p_options[key] = value1257 for key, value in options.items():1258 self.measure_w1p_options[key] = value1259 if self.measure_w1p_options["trigger"] in ["single"]:1260 self.write("INITiate:CONTinuous OFF")1261 self.write("ABORT;INITiate:IMMediate;*wai")1262 # now go to sleep for the time to take the scan1263 time.sleep(len(self.frequency_list) * 2 / float(self.IFBW))1264 # wait for other functions to be completed1265 while self.is_busy():1266 time.sleep(.01)1267 # Set the format to ascii and set up sweep definitions1268 self.write('FORM:ASC,0')1269 # First get the A and Blists1270 self.write('CALC:PAR:SEL "A{0}_D{0}"'.format(self.measure_w1p_options["port"]))1271 self.write('CALC:FORM MLIN')1272 while self.is_busy():1273 time.sleep(.01)1274 a_string = self.query('CALC:DATA? SDATA')1275 self.write('CALC:PAR:SEL B{0}_D{0}'.format(self.measure_w1p_options["port"]))1276 self.write('CALC:FORM MLIN')1277 while self.is_busy():1278 time.sleep(.01)1279 b_string = self.query('CALC:DATA? SDATA')1280 # Anritsu Specific String Parsing1281 a_string=re.sub("#\d+\n","",a_string)1282 b_string=re.sub("#\d+\n","",b_string)1283 # String Parsing1284 a_list = a_string.replace("\n", "").split(",")1285 b_list = b_string.replace("\n", "").split(",")1286 # Construct a list of lists that is data in RI format1287 re_a = a_list[0::2]1288 im_a = a_list[1::2]1289 re_b = b_list[0::2]1290 im_b = b_list[1::2]1291 wparameter_data = []1292 for index, frequency in enumerate(self.frequency_list[:]):1293 new_row = [frequency / 10. ** 9,1294 re_a[index], im_a[index],1295 re_b[index], im_b[index]]1296 new_row = [float(x) for x in new_row]1297 wparameter_data.append(new_row)1298 column_names = ["Frequency", "reA1_D1", "imA1_D1", "reB1_D1", "imB1_D1"]1299 # add some options here about auto saving1300 # do we want comment options?1301 options = {"column_names_begin_token": "!", "data_delimiter": " ", "column_names": column_names,1302 "data": wparameter_data, "specific_descriptor": "Wave_Parameters",1303 "general_descriptor": "One_Port", "extension": "w1p",1304 "column_types":["float" for column in column_names]}1305 if self.measure_w1p_options["w1p_options"]:1306 for key,value in self.measure_w1p_options["w1p_options"].items():1307 options[key]=value1308 w1p = AsciiDataTable(None, **options)1309 return w1p1310 @emulation_data(EMULATION_W2P)1311 def measure_w2p(self, **options):1312 """Triggers a single w2p measurement for a specified1313 port and returns a w2p object."""1314 defaults = {"trigger": "single", "port1": 1,"port2":2,1315 "b_name_list": ["A", "B", "C", "D"],1316 "w2p_options": None}1317 self.measure_w2p_options = {}1318 for key, value in defaults.items():1319 self.measure_w2p_options[key] = value1320 for key, value in options.items():1321 self.measure_w2p_options[key] = value1322 if self.measure_w2p_options["trigger"] in ["single"]:1323 self.write("INITiate:CONTinuous OFF")1324 self.write("ABORT;INITiate:IMMediate;*wai")1325 # now go to sleep for the time to take the scan1326 time.sleep(len(self.frequency_list) * 2 / float(self.IFBW))1327 # wait for other functions to be completed1328 while self.is_busy():1329 time.sleep(.01)1330 # Set the format to ascii and set up sweep definitions1331 self.write('FORM:ASC,0')1332 # First get the A and B lists drive port 11333 # Note this could be a loop over the list = [a1_d1,b1_d1,b2_d1...,b2_d2]1334 waveparameter_names=[]1335 for drive_port in [self.measure_w2p_options["port1"], self.measure_w2p_options["port2"]]:1336 for detect_port in [self.measure_w2p_options["port1"], self.measure_w2p_options["port2"]]:1337 for receiver in ["A","B"]:1338 waveparameter_names.append("{0}{1}_D{2}".format(receiver,detect_port,drive_port))1339 # now get data for all of them1340 all_wave_raw_string=[]1341 for waveparameter in waveparameter_names:1342 self.write('CALC:PAR:SEL "{0}"'.format(waveparameter))1343 self.write('CALC:FORM MLIN')1344 while self.is_busy():1345 time.sleep(.01)1346 all_wave_raw_string .append(self.query('CALC:DATA? SDATA'))1347 # Anritsu specific parsing1348 for index,wave in enumerate(all_wave_raw_string):1349 all_wave_raw_string[index]=re.sub("#\d+\n","",wave)1350 # String Parsing1351 all_wave_list=[x.replace("\n","").split(",") for x in all_wave_raw_string]1352 # Construct a list of lists that is data in RI format1353 re_all_wave_list = [a_list[0::2] for a_list in all_wave_list]1354 im_all_wave_list = [a_list[1::2] for a_list in all_wave_list]1355 wparameter_data = []1356 for index, frequency in enumerate(self.frequency_list[:]):1357 re_row=[re[index] for re in re_all_wave_list ]1358 im_row=[im[index] for im in im_all_wave_list]1359 wave_row=[]1360 for index,value in enumerate(re_row):1361 wave_row.append(value)1362 wave_row.append(im_row[index])1363 new_row = [frequency / 10. ** 9]+wave_row1364 new_row = [float(x) for x in new_row]1365 wparameter_data.append(new_row)1366 waveparameter_column_names=[]1367 for drive_port in [self.measure_w2p_options["port1"], self.measure_w2p_options["port2"]]:1368 for detect_port in [self.measure_w2p_options["port1"], self.measure_w2p_options["port2"]]:1369 for receiver in ["A","B"]:1370 for complex_type in ["re","im"]:1371 waveparameter_column_names.append("{3}{0}{1}_D{2}".format(receiver,1372 detect_port,1373 drive_port,1374 complex_type))1375 column_names = ["Frequency"]+waveparameter_column_names1376 # add some options here about auto saving1377 # do we want comment options?1378 options = {"column_names_begin_token": "!", "data_delimiter": " ", "column_names": column_names,1379 "data": wparameter_data, "specific_descriptor": "Wave_Parameters",1380 "general_descriptor": "Two_Port", "extension": "w2p",1381 "column_types":["float" for column in column_names]}1382 if self.measure_w2p_options["w2p_options"]:1383 for key,value in self.measure_w2p_options["w2p_options"].items():1384 options[key]=value1385 w2p = W2P(None, **options)1386 return w2p1387class PowerMeter(VisaInstrument):1388 """Controls power meters"""1389 def initialize(self):1390 """Initializes the power meter to a state with W units"""1391 self.write("*RST")1392 time.sleep(.1)1393 self.write("UNIT:POW W")1394 self.write("INIT")1395 def set_frequency(self,frequency=1*10**9):1396 """Sets the frequency of the power meter"""1397 self.write("SENS:FREQ {0}".format(frequency))1398 def get_frequency(self):1399 "Returns the frequency of the power meter"1400 frequency=self.query("SENS:FREQ?").replace("\n","")1401 return float(frequency)1402 def get_reading(self):1403 """Initializes and fetches a reading"""1404 self.write("INIT")1405 return float(self.query("FETCh?").replace("\n",""))1406 def set_units(self,unit="W"):1407 """Sets the power meters units, acceptable units are W or adBM"""1408 # Todo put an input checker on this to only allow desired commands1409 self.write("UNIT:POW {0}".format(unit))1410 def get_units(self,unit="W"):1411 """Gets the power meters units"""1412 # Todo put an input checker on this to only allow desired commands1413 unit=self.query("UNIT:POW?").replace("\n","")1414 return unit1415class NRPPowerMeter(PowerMeter):1416 """Controls RS power meters"""1417 def initialize(self):1418 """Initializes the power meter to a state with W units, 10ms aperture and Avgerage power readings"""1419 self.write("*RST")1420 time.sleep(.1)1421 self.write("UNIT:POW W")1422 self.write("SENS:FUNC POW:AVG")1423 self.write("SENS:APER 10 MS")1424 self.write("INIT")1425class HighSpeedOscope(VisaInstrument):1426 """Control Class for high speed oscilloscopes. Based on Keysight/Agilent 86100C Based on code from Diogo """1427 def initialize(self, **options):1428 """Initializes the oscilloscope for data collection"""1429 defaults = {"reset": True}1430 initialize_options = {}1431 for key, value in defaults.items():1432 initialize_options[key] = value1433 for key, value in options.items():1434 initialize_options[key] = value1435 pass1436 def measure_waves(self, **options):1437 """Returns data for a measurement in an AsciiDataTable"""1438 defaults = {"number_frames": 1, "number_points": self.get_number_points(),1439 "timebase_scale": self.get_timebase_scale(), "channels": [1, 2, 3, 4],1440 "initial_time_offset": self.get_time_position(), "timeout_measurement": 10000,1441 "save_data": False, "data_format": "dat", "directory": os.getcwd(),1442 "specific_descriptor": "Scope", "general_descriptor": "Measurement", "add_header": False,1443 "output_table_options": {"data_delimiter": "\t", "treat_header_as_comment": True},1444 "download_format":"ASCII","verbose_timing":False,1445 }1446 self.measure_options = {}1447 for key, value in defaults.items():1448 self.measure_options[key] = value1449 for key, value in options.items():1450 self.measure_options[key] = value1451 if self.measure_options["verbose_timing"]:1452 start_timer=datetime.datetime.now()1453 print(("The .measure_waves method began at {0}".format(start_timer)))1454 self.set_number_points(self.measure_options["number_points"])1455 channel_string_list = ["CHAN1", "CHAN2", "CHAN3", "CHAN4"]1456 # begin by setting timeout to timeout_measurement1457 timeout = self.resource.timeout1458 self.resource.timeout = self.measure_options["timeout_measurement"]1459 # now calculate timestep1460 self.measure_options["timestep"] = 10.*float(self.measure_options["timebase_scale"]) / float(self.measure_options["number_points"])1461 time_step = self.measure_options["timestep"]1462 # now configure the scope for data transfer1463 # define the way the data is transmitted from the instrument to the PC1464 # Word -> 16bit signed integer1465 # now we choose if you want it to be bin or ASCII1466 if re.search("asc",self.measure_options["download_format"],re.IGNORECASE):1467 self.write(':WAV:FORM ASCII')1468 else:1469 self.write(':WAV:FORM WORD')1470 # little-endian1471 self.write(':WAV:BYT LSBF')1472 number_rows = self.measure_options["number_points"] * self.measure_options["number_frames"]1473 # this is the way diogo did it1474 # number of points1475 number_points = self.measure_options["number_points"]1476 if self.measure_options["verbose_timing"]:1477 setup_timer=datetime.datetime.now()1478 time_difference=setup_timer-start_timer1479 print(("The setup of the sweep finished at {0} and took {1} seconds".format(setup_timer,time_difference)))1480 frames_data = []1481 for frame_index in range(self.measure_options["number_frames"]):1482 if self.measure_options["verbose_timing"]:1483 frame_timer=datetime.datetime.now()1484 time_difference=frame_timer-start_timer1485 print((" Frame {0} began at {1}, {2} seconds from measure_waves begin ".format(frame_index,1486 frame_timer,1487 time_difference.seconds)))1488 new_frame = []1489 # calculate time position for this frame1490 time_position = frame_index * self.measure_options["timebase_scale"]*10. + self.measure_options[1491 "initial_time_offset"]1492 # define postion to start the acquisition1493 self.write(':TIM:POS {0}ns'.format(time_position))1494 if self.measure_options["verbose_timing"]:1495 timer=datetime.datetime.now()1496 print(("Writing Channel Command at {0}".format(timer)))1497 # acquire channels desired1498 channel_string = ""1499 for channel_index, channel in enumerate(self.measure_options["channels"]):1500 if channel_index == len(self.measure_options["channels"]) - 1:1501 channel_string = channel_string + "{0}".format(channel_string_list[channel - 1])1502 else:1503 channel_string = channel_string + "{0},".format(channel_string_list[channel - 1])1504 channel_command = ":DIG {0}".format(channel_string)1505 self.write(channel_command)1506 if self.measure_options["verbose_timing"]:1507 timer=datetime.datetime.now()1508 print(("Finished Writing Channel Command at {0}".format(timer)))1509 # trigger reading and wait1510 self.write("*OPC?")1511 if self.measure_options["verbose_timing"]:1512 timer=datetime.datetime.now()1513 print(("Finshed Trigger Command and Started to Read Channels at {0}".format(timer)))1514 # get data from the necessary channels1515 for channel_read_index, channel_read in enumerate(self.measure_options["channels"]):1516 # get data for channel 11517 self.write(':WAV:SOUR CHAN{0}'.format(channel_read))1518 # get data1519 if re.search("asc", self.measure_options["download_format"], re.IGNORECASE):1520 data_column = self.resource.query(':WAV:DATA?')1521 data_column = data_column.replace("\n", "").replace("1-", "-").split(",")1522 else:1523 # This downloads the data as signed 16bit ints1524 # Need a conversion to volts1525 data_column=self.resource.query_binary_values(':WAV:DATA?', datatype='h')1526 new_frame.append(data_column)1527 # print("{0} is {1}".format("data_column",data_column))1528 if self.measure_options["verbose_timing"]:1529 timer = datetime.datetime.now()1530 print(("Finshed Data Acquistion for Channel {0} at {1}".format(channel_read,timer)))1531 frames_data.append(new_frame)1532 if self.measure_options["verbose_timing"]:1533 timer = datetime.datetime.now()1534 print(("Data Manipulation Began at {0}".format(timer)))1535 # reshape measurement data1536 measurement_data = [list(range(len(frames_data[0]))) for x in range(number_points * len(frames_data))]1537 # print(len(measurement_data))1538 # print(len(measurement_data[0]))1539 # print("{0} is{1}".format("len(frames_data)",len(frames_data)))1540 # print("{0} is{1}".format("len(frames_data[0])",len(frames_data[0])))1541 # print("{0} is{1}".format("len(frames_data[0][0])",len(frames_data[0][0])))1542 if self.measure_options["verbose_timing"]:1543 timer = datetime.datetime.now()1544 print(("Data reshaping step1 ended at {0}".format(timer)))1545 for frame_index, frame in enumerate(frames_data):1546 for column_index, column in enumerate(frame):1547 for row_index, row in enumerate(column):1548 number_rows = len(column)1549 # print("{0} is {1}".format("([row_index+frame_index*number_rows],[column_index],[frame_index])",1550 #([row_index + frame_index * number_rows], [column_index], [frame_index])))1551 measurement_data[row_index + frame_index * number_rows][column_index] =frames_data[frame_index][column_index][row_index]1552 if self.measure_options["verbose_timing"]:1553 timer = datetime.datetime.now()1554 print(("Data reshaping step 2 ended at {0}".format(timer)))1555 # reset timeout1556 self.resource.timeout = timeout1557 data_out = []1558 time_start = self.measure_options["initial_time_offset"]1559 for row_index, data_row in enumerate(measurement_data):1560 new_row = [time_start + row_index * time_step] + data_row1561 data_out.append(new_row)1562 if self.measure_options["add_header"]:1563 header = []1564 for key, value in self.measure_options.items():1565 header.append("{0} = {1}".format(key, value))1566 else:1567 header = None1568 column_names = ["Time"]1569 for channel in self.measure_options["channels"]:1570 column_names.append(channel_string_list[channel - 1])1571 table_options = {"data": data_out,1572 "header": header,1573 "specific_descriptor": self.measure_options["specific_descriptor"],1574 "general_descriptor": self.measure_options["general_descriptor"],1575 "extension": "dat",1576 "directory": self.measure_options["directory"],1577 "column_names": column_names}1578 if re.search("asc",self.measure_options["download_format"],re.IGNORECASE):1579 table_options["column_types"]=["float" for i in range(len(column_names))]1580 else:1581 table_options["column_types"]=["float"]+["int" for i in range(len(column_names)-1)]1582 for key, value in self.measure_options["output_table_options"].items():1583 table_options[key] = value1584 output_table = AsciiDataTable(None, **table_options)1585 if self.measure_options["save_data"]:1586 data_save_path = auto_name(specific_descriptor=self.measure_options["specific_descriptor"],1587 general_descriptor=self.measure_options["general_descriptor"],1588 directory=self.measure_options["directory"],1589 extension='dat'1590 , padding=3)1591 output_table.path = data_save_path1592 output_table.save()1593 if self.measure_options["verbose_timing"]:1594 timer = datetime.datetime.now()1595 print(("Data Manipulation Ended at {0}".format(timer)))1596 return output_table1597 def get_error_state(self):1598 """Returns the error state of the oscope"""1599 state = self.query(':SYSTEM:ERROR? STRING')1600 self.error_state = state1601 return state1602 def set_number_points(self, number_points=16384):1603 """Sets the number of points for the acquisition"""1604 self.write(':ACQ:POINTS {0}'.format(number_points))1605 def get_number_points(self):1606 """Returns the number of points in the waveform"""1607 number_points = int(self.query(':ACQ:POINTS?'))1608 return number_points1609 def set_time_position(self, time=50):1610 """Sets the time in ns to start the acquisition"""1611 self.write(':TIM:POS {0}ns'.format(time))1612 def get_time_position(self):1613 """Returns the time position in ns"""1614 position = float(self.query(":TIM:POS?"))1615 return position * 10 ** 91616 def set_timebase_scale(self, time_scale=40.96):1617 """Sets the timebase scale in ns"""1618 self.write(':TIM:SCAL {0}ns'.format(time_scale))1619 def get_timebase_scale(self):1620 """Returns the timebase scale in ns"""1621 time_scale = float(self.query(':TIM:SCAL?'))1622 return time_scale * 10 ** 91623 def set_trigger_source(self, source="FPAN"):1624 """Sets the tigger source, 'FPAN' Frontpanel or 'FRUN' freerun"""1625 self.write(':TRIG:SOUR {0}'.format(source))1626 def get_trigger_source(self):1627 """Returns the trigger source FPAN for FrontPanel or FRUN for free run"""1628 source = self.query(':TRIG:SOUR?')1629 def set_trigger_level(self, level=10):1630 """Sets the trigger level in mv"""1631 self.write(':TRIG:LEV {0}m'.format(level))1632 def get_trigger_level(self):1633 """Returns the trigger level"""1634 level = self.query(':TRIG:LEV?')1635 def set_channel_scale(self, scale=10, channel=1):1636 """Sets the scale in mv of channel. Default is 10mv/division on channel 1"""1637 self.write(':CHAN{0}:SCAL {1}m'.format(channel, scale))1638 def get_channel_scale(self, channel=1):1639 "Returns the scale for a specified channel, the default is channel 1"1640 scale = self.query(':CHAN{0}:SCAL?'.format(channel))1641 return scale1642 def set_channel_offset(self, offset=0, channel=1):1643 """Sets the scale in mv of channel. Default is 10mv/division on channel 1"""1644 self.write(':CHAN{0}:OFFSet {1}'.format(channel, offset))1645 def get_channel_offset(self, channel=1):1646 "Returns the scale for a specified channel, the default is channel 1"1647 offset = self.query(':CHAN{0}:OFFSet?'.format(channel))1648 return offset1649 def set_channel_bandwidth(self, bandwidth="LOW", channel=1):1650 """Sets the specified channel's bandwith to LOW, MED or HIGH, default is to set channel 1 to LOW"""1651 self.write(':CHAN{0}:BAND {1}'.format(channel, bandwidth))1652 def get_channel_bandwidth(self, channel=1):1653 """Returns the selected channels bandwidth"""1654 bandwidth = self.query(':CHAN{0}:BAND?'.format(channel))1655 return bandwidth1656 def set_trigger_slope(self, slope="POS"):1657 """Sets the trigger slope on the oscope choose from POS or NEG"""1658 self.write(':TRIG:SLOP {0}'.format(slope))1659 def get_trigger_slope(self):1660 """Returns the trigger slope either POS or NEG"""1661 slope = self.query(":TRIG:SLOP?")1662 return slope1663#-------------------------------------------------------------------------------1664# Module Scripts1665def test_determine_instrument_type():1666 print('Type is %s'%determine_instrument_type('GPIB::22'))1667 print('Type is %s'%determine_instrument_type('COMM::1'))1668 print('Type is %s'%determine_instrument_type('CoMm::1')) 1669 print('Type is %s'%determine_instrument_type('SRS830')) 1670 print('Type is %s'%determine_instrument_type('36111')) 1671 class blank():pass1672 new=blank()1673 print(type(new))1674 print('Type is %s'%determine_instrument_type(new))1675 new.instrument_type='Ocean_Optics'1676 print(new.instrument_type)1677 print('Type is %s'%determine_instrument_type(new))1678 TF=(isinstance(new, InstanceType) or ClassType)1679 print(TF)1680 print(dir(new))1681 print('instrument_type' in dir(new))1682 1683def test_find_description():1684 """Tests the function find description"""1685 print("The path of the description of %s is %s"%('Lockin2',find_description('Lockin2')))1686 print("The File Contents are:")1687 print(find_description('Lockin2','file'))1688 1689def test_VisaInstrument(address="GPIB::21"):1690 """ Simple test of the VisaInstrument class"""1691 instrument=VisaInstrument(address)1692 #print instrument.ask('*IDN?')1693 print(dir(instrument))1694 print(instrument.idn)1695 print(instrument.DEFAULT_STATE_QUERY_DICTIONARY)1696 print(instrument.current_state)1697 print('Writing 0 volts to AUX4')1698 instrument.set_state(state_dictionary={'AUXV 4,':0})1699 print(instrument.current_state)1700 print(instrument.state_buffer)1701 print(instrument.commands)1702#-------------------------------------------------------------------------------1703# Module Runner 1704if __name__ == '__main__':1705 #test_IV()1706 #test_find_description()1707 test_VisaInstrument()1708 #user_terminate=raw_input("Please Press Any key To Finish:")...
setup_commands.py
Source:setup_commands.py
...9class TestCommand(distutils.core.Command):10 """http://da44en.wordpress.com/2002/11/22/using-distutils/"""11 description = "Runs tests"12 user_options = []13 def initialize_options(self):14 self._dir = os.path.join(DIRECTORY, PACKAGE_NAME)15 sys.path.insert(0, self._dir)16 def finalize_options(self):17 pass18 def run(self):19 """Finds all the tests modules in tests/ and runs them."""20 testdir = 'tests'21 testfiles = []22 verbosity = 223 globbed = glob.glob(os.path.join(self._dir, testdir, '*.py'))24 del globbed[globbed.index(os.path.join(self._dir, testdir, '__init__.py'))]25 for t in globbed:26 testfiles.append(27 '.'.join((PACKAGE_NAME, testdir,28 os.path.splitext(os.path.basename(t))[0])))29 tests = unittest.TestSuite()30 for t in testfiles:31 __import__(t)32 tests.addTests(33 unittest.defaultTestLoader.loadTestsFromModule(sys.modules[t]))34 unittest.TextTestRunner(verbosity=verbosity).run(tests)35 del sys.path[0]36class CoverageCommand(TestCommand):37 """Check test coverage38 API for coverage-python:39 http://nedbatchelder.com/code/coverage/api.html40 """41 description = "Check test coverage"42 user_options = []43 def initialize_options(self):44 # distutils.core.Command is an old-style class.45 TestCommand.initialize_options(self)46 if '.coverage' in os.listdir(self._dir):47 os.unlink('.coverage')48 def finalize_options(self):49 pass50 def run(self):51 import coverage52 self.cov = coverage.coverage()53 self.cov.start()54 TestCommand.run(self)55 self.cov.stop()56 self.cov.save()57 # Somehow os gets set to None.58 import os59 self.cov.report(file=sys.stdout)60 self.cov.html_report(directory=COVERAGE_PATH)61 # Somehow os gets set to None.62 import os63 print os.path.join(COVERAGE_PATH, 'index.html')64class CleanCommand(distutils.core.Command):65 description = "Cleans directories of .pyc files and documentation"66 user_options = []67 def initialize_options(self):68 self._clean_me = []69 for root, dirs, files in os.walk('.'):70 for f in files:71 if f.endswith('.pyc'):72 self._clean_me.append(os.path.join(root, f))73 def finalize_options(self):74 print "Clean."75 def run(self):76 for clean_me in self._clean_me:77 try:78 os.unlink(clean_me)79 except:80 pass81class PurgeCommand(CleanCommand):82 description = "Purges directories of .pyc files, caches, and documentation"83 user_options = []84 def initialize_options(self):85 CleanCommand.initialize_options(self)86 def finalize_options(self):87 print "Purged."88 def run(self):89 doc_dir = os.path.join(DIRECTORY, 'doc')90 if os.path.isdir(doc_dir):91 shutil.rmtree(doc_dir)92 build_dir = os.path.join(DIRECTORY, 'build')93 if os.path.isdir(build_dir):94 shutil.rmtree(build_dir)95 dist_dir = os.path.join(DIRECTORY, 'dist')96 if os.path.isdir(dist_dir):97 shutil.rmtree(dist_dir)98 CleanCommand.run(self)99class ProfileCommand(distutils.core.Command):100 description = "Run profiler on a test script"101 user_options = [('file=', 'f', 'A file to profile (required)'), ]102 def initialize_options(self):103 self.file = None104 def finalize_options(self):105 pass106 def run(self):107 if not self.file:108 print >>sys.stderr, ('Please give a file to profile: '109 'setup.py profile --file=path/to/file')110 return 1111 import cProfile112 cProfile.run("import imp;imp.load_source('_', '%s')" % self.file)113class REPLCommand(distutils.core.Command):114 description = "Launch a REPL with the library loaded"115 user_options = []116 def initialize_options(self):117 pass118 def finalize_options(self):119 pass120 def run(self):121 import code122 import readline123 console = code.InteractiveConsole()124 map(console.runsource, """\125import sys126import os127import libcchdo as L128import libcchdo.db.model.legacy as DBML129import libcchdo.db.model.convert as DBMC130import libcchdo.db.model.std as STD131#"FIND = L.db.parameters.find_by_mnemonic132#"f = open(os.path.join(mpath, 'testfiles/hy1/i05_33RR20090320_hy1.csv')133#"f = open(os.path.join(mpath, 'testfiles/hy1/p16s_hy1.csv')134#"f = open(os.path.join(mpath, 'testfiles/hy1/tenline_hy1.csv')135#"import cProfile136#"cProfile.run('x = file_to_botdb.convert(d)', 'convert.profile')137#"print 'x = ', x138""".split('\n'))139 console.interact('db: DBML <- DBMC -> STD')140class TestCommand(distutils.core.Command):141 """http://da44en.wordpress.com/2002/11/22/using-distutils/"""142 description = "Runs tests"143 user_options = []144 def initialize_options(self):145 self._dir = os.path.join(DIRECTORY, PACKAGE_NAME)146 sys.path.insert(0, self._dir)147 def finalize_options(self):148 pass149 def run(self):150 """Finds all the tests modules in tests/ and runs them."""151 testdir = 'tests'152 testfiles = []153 verbosity = 2154 globbed = glob.glob(os.path.join(self._dir, testdir, '*.py'))155 del globbed[globbed.index(os.path.join(self._dir, testdir, '__init__.py'))]156 for t in globbed:157 testfiles.append(158 '.'.join((PACKAGE_NAME, testdir,159 os.path.splitext(os.path.basename(t))[0])))160 tests = unittest.TestSuite()161 for t in testfiles:162 __import__(t)163 tests.addTests(164 unittest.defaultTestLoader.loadTestsFromModule(sys.modules[t]))165 unittest.TextTestRunner(verbosity=verbosity).run(tests)166 del sys.path[0]167class CoverageCommand(TestCommand):168 """Check test coverage169 API for coverage-python:170 http://nedbatchelder.com/code/coverage/api.html171 """172 description = "Check test coverage"173 user_options = []174 def initialize_options(self):175 # distutils.core.Command is an old-style class.176 TestCommand.initialize_options(self)177 if '.coverage' in os.listdir(self._dir):178 os.unlink('.coverage')179 def finalize_options(self):180 pass181 def run(self):182 import coverage183 self.cov = coverage.coverage()184 self.cov.start()185 TestCommand.run(self)186 self.cov.stop()187 self.cov.save()188 # Somehow os gets set to None.189 import os190 self.cov.report(file=sys.stdout)191 self.cov.html_report(directory=COVERAGE_PATH)192 # Somehow os gets set to None.193 import os194 print os.path.join(COVERAGE_PATH, 'index.html')195class CleanCommand(distutils.core.Command):196 description = "Cleans directories of .pyc files and documentation"197 user_options = []198 def initialize_options(self):199 self._clean_me = []200 for root, dirs, files in os.walk('.'):201 for f in files:202 if f.endswith('.pyc'):203 self._clean_me.append(os.path.join(root, f))204 def finalize_options(self):205 print "Clean."206 def run(self):207 for clean_me in self._clean_me:208 try:209 os.unlink(clean_me)210 except:211 pass212class PurgeCommand(CleanCommand):213 description = "Purges directories of .pyc files, caches, and documentation"214 user_options = []215 def initialize_options(self):216 CleanCommand.initialize_options(self)217 def finalize_options(self):218 print "Purged."219 def run(self):220 doc_dir = os.path.join(DIRECTORY, 'doc')221 if os.path.isdir(doc_dir):222 shutil.rmtree(doc_dir)223 build_dir = os.path.join(DIRECTORY, 'build')224 if os.path.isdir(build_dir):225 shutil.rmtree(build_dir)226 dist_dir = os.path.join(DIRECTORY, 'dist')227 if os.path.isdir(dist_dir):228 shutil.rmtree(dist_dir)229 CleanCommand.run(self)230class ProfileCommand(distutils.core.Command):231 description = "Run profiler on a test script"232 user_options = [('file=', 'f', 'A file to profile (required)'), ]233 def initialize_options(self):234 self.file = None235 def finalize_options(self):236 pass237 def run(self):238 if not self.file:239 print >>sys.stderr, ('Please give a file to profile: '240 'setup.py profile --file=path/to/file')241 return 1242 import cProfile243 cProfile.run("import imp;imp.load_source('_', '%s')" % self.file)244class REPLCommand(distutils.core.Command):245 description = "Launch a REPL with the library loaded"246 user_options = []247 def initialize_options(self):248 pass249 def finalize_options(self):250 pass251 def run(self):252 import code253 import readline254 console = code.InteractiveConsole()255 map(console.runsource, """\256import sys257import os258import libcchdo as L259import libcchdo.db.model.legacy as DBML260import libcchdo.db.model.convert as DBMC261import libcchdo.db.model.std as STD...
setuptools_cythonize.py
Source:setuptools_cythonize.py
...30 if self._use_cython:31 self.distribution.has_ext_modules = lambda: True32 else:33 self.distribution.has_ext_modules = lambda: Distribution.has_ext_modules(self.distribution)34 def initialize_options(self):35 build.initialize_options(self)36 self.cythonize = None37 def finalize_options(self):38 build.finalize_options(self)39class CythonizeBuildPy(build_py):40 """41 This patch moves python files from build_py to build_ext (handled by Cython)42 except __init__.py to keep Python package integrity.43 """44 user_options = build.user_options + [('cythonize', None, 'compile pure python file using cython')]45 boolean_options = build.boolean_options + ['cythonize']46 def initialize_options(self):47 build_py.initialize_options(self)48 self.cythonize = None49 self.exclude_cythonize = []50 def finalize_options(self):51 build_py.finalize_options(self)52 self.set_undefined_options('build', ('cythonize', 'cythonize'))53 if self.cythonize:54 self.compile = self.optimize = False # noqa55 def run(self):56 if self.cythonize:57 # Move py files to ext_modules except __init__.py58 if not self.distribution.ext_modules:59 self.distribution.ext_modules = []60 for (pkg, mod, pth) in build_py.find_all_modules(self):61 if self.is_to_cythonize(pkg, mod):62 self.distribution.ext_modules.append(63 Extension(64 ".".join([pkg, mod]),65 [pth],66 cython_c_in_temp=True,67 # Ensure cython compiles with a relevent language mode setting68 cython_directives={'language_level': sys.version[0]}69 )70 )71 return build_py.run(self)72 def find_all_modules(self):73 """List all modules that have to be kept in pure Python.74 """75 return [(pkg, mod, pth) for pkg, mod, pth in build_py.find_all_modules(self)76 if not self.is_to_cythonize(pkg, mod)]77 def build_module(self, module_name, module_file, package_name):78 if self.is_to_cythonize(package_name, module_name):79 return80 return build_py.build_module(self, module_name, module_file, package_name)81 def is_to_cythonize(self, package_name, module_name):82 """Return True if the given module has to be compiled.83 """84 if not self.cythonize:85 return False86 cythonize = True87 if self.exclude_cythonize:88 pkgmod = '.'.join([package_name, module_name])89 for pat in self.exclude_cythonize:90 if fnmatchcase(pkgmod, pat):91 cythonize = False92 return cythonize and module_name not in ['__init__', '__main__']93class CythonizeInstall(install):94 """95 This patch add the option :option:`--cythonize` to the install96 command97 """98 user_options = install.user_options + [('cythonize', None, 'compile pure python file using cython')]99 boolean_options = install.boolean_options + ['cythonize']100 def initialize_options(self):101 install.initialize_options(self)102 self.cythonize = None103 def finalize_options(self):104 install.finalize_options(self)105 # Pass option to the build command106 build_cmd = self.get_finalized_command('build')107 if build_cmd.cythonize is None:108 build_cmd.cythonize = self.cythonize109 def run(self):110 # It seems absurd, but doing that force setuptools to execute the111 # old-style "install" command instead of the "egg_install" one.112 # This is due to frame inspection in setuptools.command.install.install.run()113 install.run(self)114class CythonizeBdist(bdist):115 """116 This patch add the option :option:`--cythonize` to the bdist117 command and configure 'wheel' as default format.118 """119 user_options = bdist.user_options + [('cythonize', None, 'compile pure python file using cython')]120 boolean_options = bdist.boolean_options + ['cythonize']121 @classmethod122 def set_default_wheel_format(cls):123 """Set 'wheel' as default format.124 """125 if "wheel" not in cls.format_commands:126 cls.format_command['wheel'] = ('bdist_wheel', "Python .whl file")127 cls.format_commands.append('wheel')128 for keyos in cls.default_format:129 cls.default_format[keyos] = 'wheel'130 def initialize_options(self):131 bdist.initialize_options(self)132 self.cythonize = None133 def finalize_options(self):134 bdist.finalize_options(self)135 # Pass option to the build command136 build_cmd = self.get_finalized_command('build')137 if build_cmd.cythonize is None:138 build_cmd.cythonize = self.cythonize139class CythonizeBdistWheel(bdist_wheel):140 """141 This patch add the option :option:`--cythonize` to the bdist_wheel142 command.143 """144 user_options = bdist_wheel.user_options + [('cythonize', None, 'compile pure python file using cython')]145 boolean_options = bdist_wheel.boolean_options + ['cythonize']146 def initialize_options(self):147 bdist_wheel.initialize_options(self)148 self.cythonize = None149 def finalize_options(self):150 """151 Prepare the build command (do it before 'bdist_wheel' to patch152 the detection of C-compiled sources)153 """154 build_cmd = self.get_finalized_command('build')155 if build_cmd.cythonize is None:156 build_cmd.cythonize = self.cythonize157 bdist_wheel.finalize_options(self)158def get_cmdclass(wheel_default=True):159 if wheel_default:160 CythonizeBdist.set_default_wheel_format()161 return {'build': CythonizeBuild,...
translation.py
Source:translation.py
...26from distutils.errors import DistutilsOptionError27from babel import Locale28import os, shutil, re29class extract_messages(_extract_messages):30 def initialize_options(self):31 _extract_messages.initialize_options(self)32 self.output_file = 'r2/i18n/r2.pot'33 self.mapping_file = 'babel.cfg'34class new_catalog(_new_catalog):35 def initialize_options(self):36 _new_catalog.initialize_options(self)37 self.output_dir = 'r2/i18n'38 self.input_file = 'r2/i18n/r2.pot'39 self.domain = 'r2'40 def finalize_options(self):41 _new_catalog.finalize_options(self)42 if os.path.exists(self.output_file):43 file2 = self.output_file + "-sav"44 print " --> backing up existing PO file to '%s'" % file245 shutil.copyfile(self.output_file, file2)46class commit_translation(Command):47 description = 'Turns PO into MO files'48 user_options = [('locale=', 'l',49 'locale for the new localized catalog'), ]50 def initialize_options(self):51 self.locale = None52 self.output_dir = 'r2/i18n'53 self.domain = 'r2'54 self.string = '_what_'55 def finalize_options(self):56 if not self.locale:57 raise DistutilsOptionError('you must provide a locale for the '58 'catalog')59 self.input_file = os.path.join(self.output_dir, self.locale,60 'LC_MESSAGES', self.domain + '.po')61 self.output_file = os.path.join(self.output_dir, self.locale,62 'LC_MESSAGES', self.domain + '.mo')63 def run(self):64 cmd = 'msgfmt -o "%s" "%s"' % (self.output_file, self.input_file)65 handle = os.popen(cmd)66 print handle.read(),67 handle.close()68class test_translation(new_catalog):69 description = 'makes a mock-up PO and MO file for testing and sets to en'70 user_options = [('locale=', 'l',71 'locale for the new localized catalog'),72 ('string=', 's',73 'global string substitution on translation'),74 ]75 def initialize_options(self):76 new_catalog.initialize_options(self)77 self.locale = 'en'78 self.string = '_what_'79 def finalize_options(self):80 self.output_file = os.path.join(self.output_dir, self.locale,81 'LC_MESSAGES', self.domain + '.po')82 self.output_file_mo = os.path.join(self.output_dir, self.locale,83 'LC_MESSAGES', self.domain + '.mo')84 if not os.path.exists(os.path.dirname(self.output_file)):85 os.makedirs(os.path.dirname(self.output_file))86 self._locale = Locale.parse('en')87 self._locale.language = self.locale88 def run(self):89 new_catalog.run(self)90 handle = open(self.output_file)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!