Best Python code snippet using autotest_python
gvsoc_control.py
Source:gvsoc_control.py
1#2# Copyright (C) 2021 GreenWaves Technologies, SAS, ETH Zurich and University of Bologna3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15#16import socket17import threading18import socket19import os20class Proxy(object):21 """22 A class used to control GVSOC through the socket proxy23 :param host: str,24 a string giving the hostname where the proxy is running25 :param port: int,26 the port where to connect27 """28 class _Socket_proxy_reader_thread(threading.Thread):29 def __init__(self, socket):30 super(Proxy._Socket_proxy_reader_thread, self).__init__()31 self.socket = socket32 self.lock = threading.Lock()33 self.condition = threading.Condition(self.lock)34 self.replies = {}35 self.matches = {}36 self.payloads = {}37 self.running = False38 self.timestamp = 039 self.exit_callback = None40 def run(self):41 while True:42 reply = ""43 try:44 while True:45 byte = self.socket.recv(1).decode('utf-8')46 reply += byte47 if byte == '\n':48 break49 except:50 return51 req = None52 is_stop = None53 is_run = None54 msg = ""55 err = None56 err_msg = None57 for arg in reply.split(';'):58 name, value = arg.split('=', 1)59 if name == 'req':60 req = int(value)61 elif name == 'exit':62 self.lock.acquire()63 if self.exit_callback is not None:64 self.lock.release()65 self.exit_callback[0](int(value), *self.exit_callback[1], **self.exit_callback[2])66 else:67 self.lock.release()68 os._exit(int(value))69 exit(int(value))70 elif name == 'msg':71 msg = value72 if msg.find('stopped') == 0:73 is_stop = int(value.split('=')[1])74 elif msg.find('running') == 0:75 is_run = int(value.split('=')[1])76 elif name == 'err':77 err = value78 elif name == 'err_msg':79 err_msg = value80 elif name == 'payload':81 callback = self.matches.get('%s' % req)82 if callback is not None:83 callback[0](*callback[1], **callback[2])84 else:85 payload = bytearray()86 size = int(value)87 while len(payload) < size:88 response = self.socket.recv(size - len(payload))89 payload += response90 self.lock.acquire()91 self.payloads[req] = payload92 self.condition.notify_all()93 self.lock.release()94 if req is None:95 raise RuntimeError('Unknown reply: ' + req)96 self.lock.acquire()97 if is_stop is not None:98 self.timestamp = is_stop99 self.running = False100 elif is_run is not None:101 self.running = True102 self.replies[req] = msg103 self.condition.notify_all()104 self.lock.release()105 def _get_payload(self, req):106 self.lock.acquire()107 while self.payloads.get(req) is None:108 self.condition.wait()109 payload = self.payloads[req]110 del self.payloads[req]111 self.lock.release()112 return payload113 def wait_reply(self, req):114 self.lock.acquire()115 while self.replies.get(req) is None:116 self.condition.wait()117 reply = self.replies[req]118 del self.replies[req]119 self.lock.release()120 return reply121 def wait_stopped(self, timestamp=None):122 self.lock.acquire()123 while self.running or (timestamp is not None and self.timestamp < timestamp):124 self.condition.wait()125 self.lock.release()126 def wait_timestamp(self, timestamp):127 self.lock.acquire()128 while self.timestamp < timestamp:129 self.condition.wait()130 self.lock.release()131 def wait_running(self):132 self.lock.acquire()133 while not self.running:134 self.condition.wait()135 self.lock.release()136 def register_callback(self, req, callback, *kargs, **kwargs):137 match = '%s' % req138 self.lock.acquire()139 self.matches[match] = callback, kargs, kwargs140 self.lock.release()141 def unregister_callback(self, req):142 match = '%s' % req143 self.lock.acquire()144 self.matches[match] = None145 self.lock.release()146 def handle_err(self, error, error_str=None):147 if error != 0:148 if error_str is None:149 raise RuntimeError("Proxy command failed with status %s" % error)150 else:151 raise RuntimeError("Proxy command failed with message: %s" % error_str)152 def register_exit_callback(self, callback, *kargs, **kwargs):153 self.lock.acquire()154 self.exit_callback = [ callback, kargs, kwargs ]155 self.lock.release()156 def __init__(self, host: str = 'localhost', port: int = 42951):157 self.req_id = 0158 self.lock = threading.Lock()159 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)160 self.socket.connect((host, port))161 self.reader = self._Socket_proxy_reader_thread(self.socket)162 self.reader.start()163 def _get_req(self):164 self.lock.acquire()165 req = self.req_id166 self.req_id += 1167 self.lock.release()168 return req169 def _send_cmd(self, cmd, wait_reply=True, keep_lock=False):170 self.lock.acquire()171 req = self.req_id172 self.req_id += 1173 self.socket.send(('req=%d;cmd=%s\n' % (req, cmd)).encode('ascii'))174 if not keep_lock:175 self.lock.release()176 if wait_reply:177 return self.reader.wait_reply(req)178 else:179 return req180 def _unlock_cmd(self):181 self.lock.release()182 def wait_stop(self):183 """Wait until execution stops.184 This will block the caller until gvsoc stops execution.185 """186 self.reader.wait_stopped()187 def wait_running(self):188 """Wait until GVSOC is running.189 This will block the caller until gvsoc starts execution.190 """191 self.reader.wait_running()192 def stop(self):193 """Stop execution.194 """195 self._send_cmd('stop')196 def close(self):197 """Close the proxy.198 This will free resources and close threads so that simulation can properly exit.199 """200 self.socket.shutdown(socket.SHUT_WR)201 self.socket.close()202 self.reader.join()203 def trace_add(self, trace: str):204 """Enable a trace.205 :param trace: A regular expression used to enable traces206 """207 self._send_cmd('trace add %s' % trace)208 def trace_remove(self, trace: str):209 """Disable a trace.210 :param trace: A regular expression used to disable traces211 """212 self._send_cmd('trace remove %s' % trace)213 def trace_level(self, level: str):214 """Changes the trace level.215 :param level: The trace level, can be "error", "warning", "info", "debug" or "trace"216 """217 self._send_cmd('trace level %s' % level)218 def event_add(self, event: str):219 """Enable an event.220 :param event: A regular expression used to enable events221 """222 self._send_cmd('event add %s' % event)223 def event_remove(self, event: str):224 """Disable a trace.225 :param event: A regular expression used to enable events226 """227 self._send_cmd('event remove %s' % event)228 def run(self, duration: int = None):229 """Starts execution.230 :param duration: Specify the duration of the execution in picoseconds (will execute forever by default)231 """232 if duration is not None:233 timestamp = self.reader.timestamp + duration234 timestamp = self._send_cmd('step %d' % (duration))235 self.reader.wait_timestamp(int(timestamp))236 else:237 self._send_cmd('run')238 def quit(self, status: int = 0):239 """Exit simulation.240 :param status: Specify the status value.241 """242 self._send_cmd('quit %d' % status)243 def _get_component(self, path):244 result = self._send_cmd('get_component %s' % path)245 return result.replace('\n', '')246 def register_exit_callback(self, callback, *kargs, **kwargs):247 """Register exit callback248 The callback is called when GVSOC exits. If no callback is registered,249 os._exit is called when GVSOC exits.250 :param callback: The function to be called when GVSOC exits251 :param kargs: Arguments propagated to the callback252 :param kwargs: Arguments propagated to the callback253 """254 self.reader.register_exit_callback(callback, *kargs, **kwargs)255class Router(object):256 """257 A class used to inject memory accesses into a router258 :param proxy: The proxy object. This class will use it to send command to GVSOC through the proxy connection.259 :param path: The path to the router in the architecture.260 """261 def __init__(self, proxy: Proxy, path: str = '**/chip/soc/axi_ico'):262 self.proxy = proxy263 self.component = proxy._get_component(path)264 def mem_write(self, addr: int, size: int, values: bytes):265 """Inject a memory write.266 The access is generated by the router where this class is connected and is267 injected as a debug request to not disturb the timing.268 :param addr: The address of the access.269 :param size: The size of the access in bytes.270 :param values: The sequence of bytes to be written, in little endian byte ordering.271 :raises: RuntimeError, if the access generates an error in the architecture.272 """273 cmd = 'component %s mem_write 0x%x 0x%x' % (self.component, addr, size)274 # Since we need to send a command and right after the data,275 # we have to keep the command queue locked to avoid mixing our data276 # with another command277 req = self.proxy._send_cmd(cmd, keep_lock=True, wait_reply=False)278 self.proxy.socket.send(values)279 self.proxy._unlock_cmd()280 self.proxy.reader.wait_reply(req)281 def mem_read(self, addr: int, size: int) -> bytes:282 """Inject a memory read.283 The access is generated by the router where this class is connected and is284 injected as a debug request to not disturb the timing.285 :param addr: int, The address of the access.286 :param size: int, The size of the access in bytes.287 :return: bytes, The sequence of bytes read, in little endian byte ordering.288 :raises: RuntimeError, if the access generates an error in the architecture.289 """290 # Since we need to send a command and right after we receive the data,291 # we have to keep the command queue locked to avoid mixing our data292 # with another command293 self.read_size = size294 cmd = 'component %s mem_read 0x%x 0x%x' % (self.component, addr, size)295 req = self.proxy._send_cmd(cmd, keep_lock=True, wait_reply=False)296 reply = self.proxy.reader._get_payload(req)297 self.proxy._unlock_cmd()298 self.proxy.reader.wait_reply(req)299 return reply300 def mem_write_int(self, addr: int, size: int, value: int):301 """Write an integer.302 The access is generated by the router where this class is connected and is303 injected as a debug request to not disturb the timing.304 :param addr: int, The address of the access.305 :param size: int, The size of the access in bytes.306 :param value: int, The integer to be written.307 :raises: RuntimeError, if the access generates an error in the architecture.308 """309 return self.mem_write(addr, size, value.to_bytes(size, byteorder='little'))310 def mem_read_int(self, addr: int, size: int) -> int:311 """Read an integer.312 The access is generated by the router where this class is connected and is313 injected as a debug request to not disturb the timing.314 :param addr: int, The address of the access.315 :param size: int, The size of the access in bytes.316 :return: int, The integer read.317 :raises: RuntimeError, if the access generates an error in the architecture.318 """319 values = self.mem_read(addr, size)320 return int.from_bytes(values, byteorder='little')321class Testbench(object):322 """Testbench class.323 This class can be instantiated to get access to the testbench.324 :param proxy: Proxy, The proxy object. This class will use it to send command to GVSOC through the proxy connection.325 :param path: string, optional, The path to the testbench in the architecture.326 """327 def __init__(self, proxy: Proxy, path: str = '**/testbench/testbench'):328 self.proxy = proxy329 self.component = proxy._get_component(path)330 def i2s_get(self, id: int = 0):331 """Open an SAI.332 Open an SAI and return an object which can be used to interact with it.333 :param id: int, optional, The SAI identifier.334 :return: Testbench_i2s, An object which can be used to access the specified SAI.335 """336 return Testbench_i2s(self.proxy, self.component, id)337 def uart_get(self, id: int = 0):338 """Open a uart interface.339 Open a uart interface and return an object which can be used to interact with it.340 :param id: int, optional, The uart interface identifier.341 :return: Testbench_uart, An object which can be used to access the specified uart interface.342 """343 return Testbench_uart(self.proxy, self.component, id)344class Testbench_uart(object):345 """Class instantiated for each manipulated uart interface.346 It can used to interact with the uart interface, like injecting streams.347 :param proxy: Proxy, The proxy object. This class will use it to send command to GVSOC through the proxy connection.348 :param testbench: int, The testbench object.349 :param id: int, optional, The identifier of the uart interface.350 """351 def __init__(self, proxy: Proxy, testbench: Testbench, id=0):352 self.id = id353 self.proxy = proxy354 self.testbench = testbench355 self.callback = None356 self.lock = threading.Lock()357 self.condition = threading.Condition(self.lock)358 self.pending_rx_bytes = bytearray()359 self.req = None360 def open(self, baudrate: int, word_size: int=8, stop_bits: int=1, parity_mode: bool=False, ctrl_flow: bool=True,361 is_usart: bool=False, usart_polarity: int=0, usart_phase: int=0):362 """Open and configure a uart interface.363 :param baudrate: int, Specify the uart baudrate in bps364 :param word_size: int, optional, Specify the size in bits of the uart bytes.365 :param stop_bits: int, optional, Specify the number of stop bits.366 :param parity_mode: bool, optional, True if parity is enabled.367 :param ctrl_flow: bool, optional, True if control flow is enabled.368 :param is_usart: bool, optional, True if uart is in usart mode.369 :param usart_polarity: int, optional, Usart polarity.370 :param usart_phase: int, optional, Usart phase.371 :raises: RuntimeError, if there is any invalid parameter.372 """373 options = ''374 options += ' itf=%d' % self.id375 options += ' enabled=1'376 options += ' baudrate=%d' % baudrate377 options += ' word_size=%d' % word_size378 options += ' stop_bits=%d' % stop_bits379 options += ' parity_mode=%d' % parity_mode380 options += ' ctrl_flow=%d' % ctrl_flow381 options += ' is_usart=%d' % is_usart382 options += ' usart_polarity=%d' % usart_polarity383 options += ' usart_phase=%d' % usart_phase384 cmd = 'component %s uart setup %s' % (self.testbench, options)385 self.proxy._send_cmd(cmd)386 def close(self):387 """Close the uart interface.388 :raises: RuntimeError, if there is any error while closing.389 """390 options = ''391 options += ' itf=%d' % self.id392 options += ' enabled=0'393 cmd = 'component %s uart setup %s' % (self.testbench, options)394 self.proxy._send_cmd(cmd)395 def tx(self, values: bytes):396 """Send data to the uart.397 This enqueues an array of bytes to be transmitted. If previous transfers are not finished,398 these bytes will be transfered after.399 :param values: bytes, The sequence of bytes to be sent, in little endian byte ordering.400 :raises: RuntimeError, if the access generates an error in the architecture.401 """402 cmd = 'component %s uart tx %d %d' % (self.testbench, self.id, len(values))403 # Since we need to send a command and right after the data,404 # we have to keep the command queue locked to avoid mixing our data405 # with another command406 req = self.proxy._send_cmd(cmd, keep_lock=True, wait_reply=False)407 self.proxy.socket.send(values)408 self.proxy._unlock_cmd()409 self.proxy.reader.wait_reply(req)410 def rx(self, size=None):411 """Read data from the uart.412 Once reception on the uart is enabled, the received bytes are pushed to a fifo. This method413 can be called to pop received bytes from the FIFO.414 :param size: int, The number of bytes to be read. If it is None, it returns the bytes which has already been received.415 :return: bytes, The sequence of bytes received, in little endian byte ordering.416 :raises: RuntimeError, If the access generates an error in the architecture.417 """418 self.lock.acquire()419 if size is not None:420 while len(self.pending_rx_bytes) < size:421 self.condition.wait()422 else:423 size = len(self.pending_rx_bytes)424 reply = self.pending_rx_bytes[0:size]425 self.pending_rx_bytes = self.pending_rx_bytes[size:]426 self.lock.release()427 return reply428 def rx_enable(self):429 """Enable receiving bytes from the uart.430 Any byte received from the uart either triggers the callback execution if it has been registered,431 or is pushed to a FIFO which can read.432 :raises: RuntimeError, if the access generates an error in the architecture.433 """434 self.req = self.proxy._get_req()435 self.proxy.reader.register_callback(self.req, self.__handle_rx)436 cmd = 'component %s uart rx %d 1 %d' % (self.testbench, self.id, self.req)437 self.proxy._send_cmd(cmd)438 def rx_disable(self):439 """Disable receiving bytes from the uart.440 :raises: RuntimeError, if the access generates an error in the architecture.441 """442 self.proxy.reader.unregister_callback(self.req)443 cmd = 'component %s uart rx %d 0' % (self.testbench, self.id)444 self.proxy._send_cmd(cmd)445 def __handle_rx(self):446 self.lock.acquire()447 reply = self.proxy.socket.recv(1)448 if self.callback is not None:449 self.callback[0](1, reply, *self.callback[1], **self.callback[2])450 else:451 self.pending_rx_bytes += reply452 self.condition.notify()453 self.lock.release()454 def rx_attach_callback(self, callback, *kargs, **kwargs):455 """Attach callback for receiving bytes from the uart.456 All bytes received from the uart now triggers the execution of the specified callback.457 This must be called only when uart reception is disabled.458 The callback will be called asynchronously by a different thread and so special care must be taken459 to access shared variables using locks. Also the proxy can not be used from the callback.460 :param callback: The function to be called when bytes are received from the uart. First parameters will contain461 number of bytes and received, and second one will be the bytes received.462 :param kargs: Arguments propagated to the callback463 :param kwargs: Arguments propagated to the callback464 :return: bytes, The sequence of bytes received, in little endian byte ordering.465 :raises: RuntimeError, if the access generates an error in the architecture.466 """467 self.callback = callback, kargs, kwargs468 def rx_detach_callback(self):469 """Detach a callback.470 The callback previously attached won't be called anymore.471 This must be called only when uart reception is disabled.472 :raises: RuntimeError, if the access generates an error in the architecture.473 """474 self.callback = None475class Testbench_i2s(object):476 """Class instantiated for each manipulated SAI.477 It can used to interact with the SAI, like injecting streams.478 :param proxy: Proxy, The proxy object. This class will use it to send command to GVSOC through the proxy connection.479 :param testbench: int, The testbench object.480 :param id: int, optional, The identifier of the SAI interface.481 """482 def __init__(self, proxy: Proxy, testbench: Testbench, id=0):483 self.id = id484 self.proxy = proxy485 self.testbench = testbench486 def open(self, word_size: int = 16, sampling_freq: int = -1, nb_slots: int = 1, is_pdm: bool = False,487 is_full_duplex: bool = False, is_ext_clk: bool = False, is_ext_ws: bool = False, is_sai0_clk: bool = False,488 is_sai0_ws: bool = False, clk_polarity: int = 0, ws_polarity: int = 0, ws_delay: int = 1):489 """Open and configure SAI.490 :param word_size: int, optional, Specify the frame word size in bits.491 :param sampling_freq: int, optional, Specify the sampling frequency. This is used either to generate the clock when492 it is external or to check that internally generated one is correct.493 :param nb_slots: int, optional,494 Number of slots in the frame.495 :param is_pdm: bool, optional,496 True if the stream is a PDM stream.497 :param is_full_duplex: bool, optional,498 True if the SAI is used in full duplex mode.499 :param is_ext_clk: bool, optional,500 True is the clock is generated by the testbench.501 :param is_ext_ws: bool, optional,502 True is the word strobe is generated by the testbench.503 :param is_sai0_clk: bool, optional,504 True is the the clock should be taken from SAI0.505 :param is_sai0_ws: bool, optional,506 True is the the word strobe should be taken from SAI0.507 :param clk_polarity: int, optional,508 Clock polarity, definition is the same as SAI0 specifications.509 :param ws_polarity: int, optional,510 Word strobe polarity, definition is the same as SAI0 specifications.511 :param ws_delay: int, optional,512 Word strobe delay, definition is the same as SAI0 specifications.513 :raises: RuntimeError, if there is any invalid parameter.514 """515 options = ''516 options += ' itf=%d' % self.id517 options += ' enabled=1'518 options += ' sampling_freq=%d' % sampling_freq519 options += ' word_size=%d' % word_size520 options += ' nb_slots=%d' % nb_slots521 options += ' is_pdm=%d' % is_pdm522 options += ' is_full_duplex=%d' % is_full_duplex523 options += ' is_ext_clk=%d' % is_ext_clk524 options += ' is_ext_ws=%d' % is_ext_ws525 options += ' is_sai0_clk=%d' % is_sai0_clk526 options += ' is_sai0_ws=%d' % is_sai0_ws527 options += ' clk_polarity=%d' % clk_polarity528 options += ' ws_polarity=%d' % ws_polarity529 options += ' ws_delay=%d' % ws_delay530 cmd = 'component %s i2s setup %s' % (self.testbench, options)531 self.proxy._send_cmd(cmd)532 def close(self):533 """Close SAI.534 :raises: RuntimeError, if there is any error while closing.535 """536 options = ''537 options += ' itf=%d' % self.id538 options += ' enabled=0'539 cmd = 'component %s i2s setup %s' % (self.testbench, options)540 self.proxy._send_cmd(cmd)541 def clk_start(self):542 """Start clock.543 This can be used when the clock is generated by the testbench to start the generation.544 :raises: RuntimeError, if there is any error while starting the clock.545 """546 cmd = 'component %s i2s clk_start %d' % (self.testbench, self.id)547 self.proxy._send_cmd(cmd)548 def clk_stop(self):549 """Stop clock.550 This can be used when the clock is generated by the testbench to stop the generation.551 :raises: RuntimeError, if there is any error while stopping the clock.552 """553 cmd = 'component %s i2s clk_stop %d' % (self.testbench, self.id)554 self.proxy._send_cmd(cmd)555 def slot_open(self, slot: int = 0, is_rx: bool = True, word_size: int = 16, is_msb: bool = True,556 sign_extend: bool = False, left_align: bool = False):557 """Open and configure a slot.558 :param slot: int, optional,559 Slot identifier560 :param is_rx: bool, optional,561 True if gap receives the samples.562 :param word_size: int, optional,563 Slot width in number of bits.564 :param is_msb: bool, optional,565 True if the samples are received or sent with MSB first.566 :param sign_extend: bool, optional,567 True if the samples are sign-extended.568 :param left_align: bool, optional,569 True if the samples are left aligned.570 :raises: RuntimeError, if there is any invalid parameter.571 """572 options = ''573 options += ' itf=%d' % self.id574 options += ' slot=%d' % slot575 options += ' is_rx=%d' % is_rx576 options += ' enabled=1'577 options += ' word_size=%d' % word_size578 options += ' format=%d' % (is_msb | (left_align << 1) | (sign_extend << 1))579 cmd = 'component %s i2s slot_setup %s' % (self.testbench, options)580 self.proxy._send_cmd(cmd)581 def slot_close(self, slot: int = 0):582 """Close a slot.583 :param slot: int, optional,584 Slot identifier585 :raises: RuntimeError, if there is any invalid parameter.586 """587 options = ''588 options += ' itf=%d' % self.id589 options += ' slot=%d' % slot590 options += ' enabled=0'591 cmd = 'component %s i2s slot_setup %s' % (self.testbench, options)592 self.proxy._send_cmd(cmd)593 def slot_rx_file_reader(self, slot: int = None, slots: list = [], filetype: str = "wav",594 filepath: str = None, encoding: str = "asis", channel: int = 0, width: int = 0):595 """Read a stream of samples from a file.596 This will open a file and stream it to the SAI so that gap receives the samples.597 It can be used either in mono-channel mode with the slot parameter or multi-channel mode with the598 slots parameter. In multi-channel mode, the slots parameters give the list of slots associated to each channel.599 To allow empty channels, a slot of -1 can be given.600 :param slot: int, optional,601 Slot identifier602 :param slots: list, optional,603 List of slots when using multi-channel mode. slot must be None if this one is not empty.604 :param filetype: string, optional,605 Describes the type of the file, can be "wav", "raw", "bin" or "au".606 :param width: int, optional,607 width of the samples, in case the file is in binary format608 :param filepath: string, optional,609 Path to the file.610 :param encoding: string, optional,611 Encoding type for binary files, can be: "asis", "plusminus"612 :param channel: int, optional,613 If the format supports it, this will get the samples from the specified channel in the input file.614 :raises: RuntimeError, if there is any invalid parameter.615 """616 options = ''617 options += ' itf=%d' % self.id618 if slot is not None:619 options += ' slot=%d' % slot620 for slot in slots:621 options += ' slot=%d' % slot622 options += ' filetype=%s' % filetype623 options += ' filepath=%s' % filepath624 options += ' encoding=%s' % encoding625 options += ' channel=%d' % channel626 options += ' width=%d' % width627 cmd = 'component %s i2s slot_rx_file_reader %s' % (self.testbench, options)628 self.proxy._send_cmd(cmd)629 def slot_tx_file_dumper(self, slot: int = None, slots: list = [], filetype: str = "wav",630 filepath: str = None, encoding: str = "asis", channel: int = 0, width: int = 0):631 """Write a stream of samples to a file.632 This will open a file and write to it all the samples received from gap.633 It can be used either in mono-channel mode with the slot parameter or multi-channel mode with the634 slots parameter. In multi-channel mode, the slots parameters give the list of slots associated to each channel.635 To allow empty channels, a slot of -1 can be given. A slot can be given several times in order to push the samples636 to several channels.637 :param slot: int, optional,638 Slot identifier639 :param slots: list, optional,640 List of slots when using multi-channel mode. slot must be None if this one is not empty.641 :param filetype: string, optional,642 Describes the type of the file, can be "wav", "raw", "bin" or "au".643 :param encoding: string, optional,644 Encoding type for binary files, can be: "asis", "plusminus"645 :param width: int, optional,646 width of the samples, in case the file is in binary format647 :param filepath: string, optional,648 Path to the file.649 :param channel: int, optional,650 If the format supports it, this will dump the samples to the specified channel in the output file.651 :raises: RuntimeError, if there is any invalid parameter.652 """653 options = ''654 options += ' itf=%d' % self.id655 if slot is not None:656 options += ' slot=%d' % slot657 for slot in slots:658 options += ' slot=%d' % slot659 options += ' filetype=%s' % filetype660 options += ' filepath=%s' % filepath661 options += ' encoding=%s' % encoding662 options += ' channel=%d' % channel663 options += ' width=%d' % width664 cmd = 'component %s i2s slot_tx_file_dumper %s' % (self.testbench, options)665 self.proxy._send_cmd(cmd)666 def slot_stop(self, slot: int = 0, stop_rx: bool = True, stop_tx: bool = True):667 """Stop a slot.668 This will stop the streamings (file reader or dumper) configured on the specified slot.669 :param slot: int, optional,670 Slot identifier671 :param stop_rx: bool, optional,672 Stop the stream sent to gap.673 :param stop_tx: bool, optional,674 Stop the stream received from gap.675 :raises: RuntimeError, if there is any invalid parameter.676 """677 options = ''678 options += ' itf=%d' % self.id679 options += ' slot=%d' % slot680 options += ' stop_rx=%d' % stop_rx681 options += ' stop_tx=%d' % stop_tx682 cmd = 'component %s i2s slot_stop %s' % (self.testbench, options)...
solaris10.py
Source:solaris10.py
1"""2:synopsis: Working with network interfaces3.. moduleauthor: Paul Diaconescu <p@afajl.com>4"""5MIN_INTERFACE_SPEED = 1006import time7import re8import os9import sy10from sy._internal import _missing11class Interface(object):12 '''13 Represents a network interface. Attributes are:14 .. attribute:: name15 Name of the interface16 .. attribute:: ipaddress17 18 IP address of the interface19 .. attribute:: netmask20 21 Netmask of the interface22 .. attribute:: ether23 24 Ethernet address (MAC) of the interface25 26 .. attribute:: has_link27 28 True if dladm reports that the interface has link29 .. attribute:: is_physical30 31 Interface is not logical, ex hme032 .. attribute:: is_logical33 34 Interface is not physical, ex hme0:135 36 .. attribute:: is_configured37 38 Interface is plumbed and shows up in ifconfig -a39 .. attribute:: is_full_duplex40 41 Interface is running in full duplex42 .. attribute:: is_up43 44 Interface is UP45 .. attribute:: is_standby46 47 Interface is set as STANDBY in an IPMP group48 .. attribute:: is_failed49 50 Interface is FAILED in an IPMP group51 .. attribute:: is_inactive52 53 Interface is INACTIVE54 .. attribute:: is_dhcp55 56 Interface is configured by DHCP57 .. attribute:: speed58 59 Interface speed nr of MB reported by dladm60 .. attribute:: group61 62 IPMP group the interface belongs to63 .. attribute:: zone64 65 Zone the interface belongs to66 67 '''68 __slots__ = set([69 'name',70 'has_link',71 'is_physical',72 'is_logical',73 'is_configured',74 'is_full_duplex',75 'is_up',76 'is_standby',77 'is_failed',78 'is_inactive',79 'is_dhcp',80 'speed',81 'ipaddress',82 'netmask',83 'ether',84 'group',85 'zone',86 ])87 def __init__(self, **kwargs):88 for attr in self.__slots__:89 setattr(self, attr, kwargs.get(attr, None))90 bad_keys = set(kwargs.keys()).difference(self.__slots__)91 if bad_keys:92 raise AttributeError('Bad arguments passed to init: %s' % 93 ', '.join(bad_keys))94 95 def update(self, other):96 assert isinstance(other, Interface)97 for attr in self.__slots__:98 other_attr = getattr(other, attr, None)99 if other_attr is not None:100 setattr(self, attr, other_attr)101 def __repr__(self):102 ret = ['<Interface:\n']103 for attr in self.__slots__:104 ret.append( ' %s: %s\n' % (attr, str(getattr(self, attr, None))))105 ret.append('>')106 return ''.join(ret)107 __str__ = __repr__108# _______________________________109# physical interfaces110_dladm_rx = re.compile(r'''111 ^ (?P<ifname>\w+) # name of the interface112 \s+113 link: \s (?P<link>\w+) # link status114 \s+115 speed: \s (?P<speed>\d+) \s+ Mbps # link speed116 \s+117 duplex: \s (?P<duplex>\w+) # link duplex118 ''', re.X119 ) 120def _get_physical():121 interfaces = {}122 for line in sy.cmd.outlines('dladm show-dev'):123 match = _dladm_rx.search( line )124 if match:125 ifname = match.group('ifname')126 intf = Interface()127 intf.name = ifname128 intf.is_physical = True129 intf.has_link = match.group('link') == 'up'130 intf.speed = int( match.group('speed') )131 intf.is_full_duplex = match.group('duplex') == 'full'132 interfaces[ifname] = intf133 return interfaces134# _______________________________135# configured interfaces136_ifconfig_if_rx = re.compile(r'''137 ^(?P<ifname>[\w:]+):\s.*? # interface name138 <(?P<flags>.*?)> # flags139 ''', re.X140 )141_ifconfig_ip_rx = re.compile(r'''142 \s inet \s (?P<ipaddress>[\d\.]+) # ip address143 \s netmask \s (?P<netmask>\w+) \s # netmask144 # ! we ignore broadcast since145 # it can be deduced146 ''', re.X147 )148_ifconfig_mac_rx = re.compile(r'''149 \s ether \s (?P<ether>[\w:]+) # ether/mac address150 ''', re.X151 )152_ifconfig_group_rx = re.compile(r'''153 \s groupname \s (?P<group>\w+) # IPMP group154 ''', re.X155 )156_ifconfig_zone_rx = re.compile(r'''157 \s zone \s (?P<zone>\w+) \s # zone name158 ''', re.X159 )160 161def _get_configured():162 interfaces = {}163 intf = None164 for line in sy.cmd.outlines('ifconfig -a'):165 match = _ifconfig_if_rx.search(line)166 if match:167 if intf:168 interfaces[intf.name] = intf169 intf = Interface()170 intf.name = match.group('ifname')171 intf.is_configured = True172 if ':' in intf.name:173 intf.is_logical = True174 # set flags175 flags = set( match.group('flags').split(',') )176 intf.is_up = 'UP' in flags177 intf.is_standby = 'STANDBY' in flags178 intf.is_inactive = 'INACTIVE' in flags179 intf.is_failed = 'FAILED' in flags180 intf.is_dhcp = 'DHCP' in flags181 continue182 match = _ifconfig_ip_rx.search(line)183 if match:184 intf.ipaddress = match.group('ipaddress')185 mask = match.group('netmask')186 # step through hexmask (ffff0000) and create list of 187 # octets ['255','255' ...] 188 mask_bits = [int(a+b, 16) for a, b in zip(mask[0::2], mask[1::2])]189 intf.netmask = '.'.join([str(b) for b in mask_bits]) 190 continue191 match = _ifconfig_mac_rx.search(line)192 if match:193 intf.ether = match.group('ether')194 continue195 match = _ifconfig_group_rx.search(line)196 if match:197 intf.group = match.group('group')198 continue199 match = _ifconfig_zone_rx.search(line)200 if match:201 intf.zone = match.group('zone')202 continue203 if intf:204 interfaces[intf.name] = intf205 return interfaces206 207_interfaces_cache = {}208def _get_all():209 global _interfaces_cache210 if _interfaces_cache:211 return _interfaces_cache212 physical_intfs = _get_physical() 213 configured_intfs = _get_configured() 214 all_ifnames = set(physical_intfs.keys()) | set(configured_intfs.keys()) 215 for ifname in all_ifnames:216 # create a empty interface217 intf = Interface()218 # populate from physical data (dladm) if it exists219 physical_intf = physical_intfs.get(ifname)220 if physical_intf:221 intf.update(physical_intf)222 # populate from configured data (ifconfig) if it exists 223 configured_intf = configured_intfs.get(ifname)224 if configured_intf:225 intf.update(configured_intf)226 _interfaces_cache[ifname] = intf227 return _interfaces_cache228def _wait_for_status(ifname, timeout=10):229 waited = 0230 intf = get(ifname)231 while waited < timeout:232 if intf.speed != 0:233 return intf 234 intf = get(ifname)235 time.sleep(3)236 refresh()237 waited += 3238 return intf239# _______________________________________________________________________240# Public functions241def find(default=_missing, **kwargs):242 ''' Return all interfaces that matches the keyword arguments and values243 Keyword argument must match the attributes of the :class:`Interface`. Example::244 import sy245 sy.net.intf.find(ipaddress='10.0.0.1')246 ['<Interface: e1000g0>']247 sy.net.intf.find(is_logical=True, is_up=True)248 ['<Interface: e1000g0:1>', '<Interface: e1000g0:2>']249 :returns: List of :class:`Interface` matching keyword=values250 '''251 matched_interfaces = []252 for intf in _get_all().values():253 matches = 0254 # go through all key word arguments, like name=e1000g0 and 255 # match against interface256 for key, value in kwargs.items():257 intf_value = getattr(intf, key, _missing) 258 if intf_value is _missing:259 raise sy.Error('Key does not exists in "Interface": ' + key)260 if intf_value == value:261 matches += 1262 if matches == len(kwargs):263 matched_interfaces.append(intf)264 return matched_interfaces265 266def get(ifname, default=_missing):267 ''' Return :class:`Interface` with the ifname 268 269 :arg default: What to return if the interface is missing.270 :returns: Matching :class:`Interface`271 :raises: :exc:`sy.Error` if the interface does not exist and ``default``272 is not supplied.273 '''274 intf = _get_all().get(ifname, None)275 if intf is None:276 # no interface found277 if default is not _missing:278 return default279 else:280 raise sy.Error('Interfaces %s does not exist' % ifname)281 return intf282def all():283 ''' Return a list of :class:`Interface` objects for every interface on the 284 server (even unconfigured) 285 '''286 return _get_all()287def refresh():288 ''' Clear the cache on interfaces. Useful if interfaces are changed by289 something else then this module.290 '''291 global _interfaces_cache292 sy.log.debug('Refreshing interface cache')293 _interfaces_cache = {}294#def get_physical_interface(ifname, cached_ok=False, wait=0):295 #intf = physical_interfaces().get(ifname)296 #if not intf:297 #raise sy.Error('Interface %s does not exist' % ifname)298 #if cached_ok:299 #return intf300 #if wait == 0:301 #return intf302 ## wait for status303 #waited = 0304 #while intf['speed'] == 0:305 #log('debug', 'Waiting for interface status for: %s', ifname)306 #if waited > wait:307 #raise sy.Error('Timeout waiting for status for %s' % ifname)308 #time.sleep(3)309 #waited += 3310 #intf = physical_interfaces()[ifname]311 #return intf312def check(ifname, ipaddress=None, netmask=None, group=None,313 min_interface_speed=MIN_INTERFACE_SPEED):314 """ check if a interface is up and configured as specified315 :arg ipaddress: IP address in dotted decimal to check316 :arg netmask: Netmask in dotted decimal to check317 :arg group: Group that the interface should belong to 318 :arg min_interface_speed: minimum interface speed in (Mbit)319 :raises: :exc:`sy.Error` if the interface does not conform320 """ 321 intf = _wait_for_status(ifname)322 if not intf.is_up:323 raise sy.Error('Interface %s is not up' % ifname)324 if netmask and intf.netmask != netmask:325 raise sy.Error('Interface %s has the wrong netmask: %s' % 326 (ifname, intf.netmask))327 if ipaddress and intf.ipaddress != ipaddress:328 raise sy.Error('Interface %s has the wrong IP address: %s' % (329 ifname, intf.ipaddress))330 if group and intf.group != group:331 raise sy.Error('Interface %s has the wrong group: %s' % 332 (ifname, intf.group))333 if intf.speed < min_interface_speed:334 raise sy.Error(335 'Interface %s speed is to low: %d' % (ifname, intf.speed))336 if not intf.is_full_duplex:337 raise sy.Error('Interface %s is not in full duplex' % ifname)338 339def unconfigure(ifname=None, ipaddress=None):340 """ Unconfigure a interface permanently341 You must supply one of the arguments 342 :arg ifname: interface name to unconfigure, i.e 'e1000g0'343 :arg ipaddress: IP address of interface to unconfigure 344 :returns : None or raises :exc:`CommandError` if unable to remove interface 345 """ 346 if ifname is not None:347 existing_intfs = [get(ifname)]348 elif ipaddress is not None:349 existing_intfs = find(ipaddress=ipaddress)350 if not existing_intfs:351 raise sy.Error('No interface found with IP "%s"' % ipaddress)352 else:353 raise sy.Error('You must specify ifname or ipaddress')354 for existing_intf in existing_intfs:355 if not existing_intf.is_configured:356 continue357 if existing_intf.is_logical:358 _unconfigure_logical(existing_intf)359 else:360 _unconfigure_physical(existing_intf)361def _unconfigure_physical(intf):362 ifname = intf.name363 sy.log.info('Unconfiguring interface %s' % ifname)364 try:365 sy.cmd.do('ifconfig {} unplumb', ifname,366 prefix='Unable to unconfigure interface %s' % ifname)367 finally:368 refresh()369 hostname_path = '/etc/hostname.' + ifname370 if os.path.exists(hostname_path):371 os.unlink(hostname_path)372 dhcp_path = '/etc/dhcp.' + ifname373 if os.path.exists(dhcp_path):374 os.unlink(dhcp_path)375def _unconfigure_logical(intf):376 physical_ifname, _ = intf.name.split(':') 377 cmd = ['ifconfig', physical_ifname, 'removeif', intf.ipaddress]378 try:379 sy.cmd.do('ifconfig {} removeif {}', 380 physical_ifname, intf.ipaddress,381 prefix='Unable to unconfigure logical interface %s' %382 intf.name)383 finally:384 refresh()385 hostname_path = '/etc/hostname.' + physical_ifname386 if os.path.exists(hostname_path):387 sy.path.remove_lines(hostname_path, '^addif ' + intf.ipaddress) 388 389def _check_network(ipaddress, netmask):390 if not sy.net.ip.is_valid(ipaddress):391 raise sy.Error('IP address is not valid: %s' % ipaddress)392 if not sy.net.ip.is_valid(netmask):393 raise sy.Error('Netmask is not valid: %s' % netmask)394 configured_intfs = find(ipaddress=ipaddress)395 if configured_intfs:396 raise sy.Error('IP address %s already configured on %s' % (397 ipaddress, 398 ','.join([i.name for i in configured_intfs])))399 400def _add_network(ipaddress, netmask):401 # add the network to netmasks402 network = sy.net.ip.get_network(ipaddress, netmask)403 sy.net.ip.add_netmask(network, netmask)404 405def configure(ifname, ipaddress=None, netmask=None, group=None, 406 standby=False, hostname=None, permanent=True):407 ''' Configures a physical interface408 Configures the interface, add the netmask to netmasks, adds hostname409 to /etc/hosts and administers orgasms410 :arg ifname: Name of the interface to configure411 :arg ipaddress: IP address in dotted decimal to set412 :arg netmask: Netmask in dotted decimal to set413 :arg group: Group that the interface should belong to 414 :arg standby: The interface should be a standby IPMP interface415 :arg hostname: Hostname to set416 :arg permanent: If the interface should be configured on reboot417 :returns: None or raises exc:`CommandError` if unable to remove interface 418 '''419 420 sy.log.debug('Configuring interface %s' % ifname)421 if not get(ifname).is_physical:422 raise sy.Error('Interface %s does not exist' % ifname)423 unconfigure(ifname)424 cmd = ['ifconfig', ifname, 'plumb']425 ifconfig = []426 if ipaddress:427 # Add network to /etc/inet/netmasks428 _check_network(ipaddress, netmask)429 _add_network(ipaddress, netmask)430 ifconfig.append(ipaddress)431 if netmask:432 ifconfig.extend(['netmask', netmask])433 if group:434 ifconfig.extend(['group', group])435 if ipaddress:436 ifconfig.append('deprecated')437 if standby:438 ifconfig.append('-failover')439 ifconfig.append('up')440 cmd = ' '.join(cmd + ifconfig)441 try:442 sy.cmd.do(cmd, prefix='Unable to configure interface %s' % ifname)443 finally:444 refresh()445 try:446 check(ifname, ipaddress, netmask, group) 447 if permanent:448 if hostname:449 sy.net.ip.add_hostentry(ipaddress, hostname)450 # line to add to hostname.<interface>451 hostname_conf = ' '.join(ifconfig) + '\n'452 sy.path.dump('/etc/hostname.' + ifname, hostname_conf)453 except Exception, e:454 unconfigure(ifname)455 raise e456def addif(ifname, ipaddress, netmask, hostname=None, permanent=True):457 ''' Adds a logical interface458 Configures the interface on another, add the netmask to netmasks, 459 adds hostname to /etc/hosts460 :arg ifname: Name of the interface to add this on to461 :arg ipaddress: IP address in dotted decimal to set462 :arg netmask: Netmask in dotted decimal to set463 :arg hostname: Hostname to set464 :arg permanent: If the interface should be configured on reboot465 :returns: None or raises exc:`CommandError` if unable to remove interface 466 '''467 468 469 interfaces = all()470 physical_intf = get(ifname, None)471 if physical_intf is None:472 raise sy.Error('Physical interface, "%s", was not found' % ifname)473 if not physical_intf.is_configured:474 raise sy.Error('Physical interface, "%s", is not configured' % ifname)475 # Add network to /etc/inet/netmasks476 _check_network(ipaddress, netmask)477 _add_network(ipaddress, netmask)478 ifconfig = ['addif', ipaddress, 'netmask', netmask, 'up']479 cmd = ' '.join(['ifconfig', ifname] + ifconfig)480 try:481 sy.cmd.do(cmd, prefix='Unable to add IP %s' % ifname)482 finally:483 refresh()484 if permanent:485 try:486 if hostname:487 sy.net.ip.add_hostentry(ipaddress, hostname)488 # line to add to hostname.<interface>489 hostname_conf = ' '.join(ifconfig) + '\n'490 sy.path.append('/etc/hostname.' + ifname, hostname_conf)491 except Exception, e:492 unconfigure(ipaddress=ipaddress)...
cdp.py
Source:cdp.py
...337 338 def get_duplex(self):339 return CDPElement.get_data(self) 340 341 def is_full_duplex(self):342 return self.get_duplex()==0x1343 344class VLAN(CDPElement):345 Type = 0xa346 347 def get_type(self):348 return VLAN.Type349 350 def get_vlan_number(self):351 return CDPElement.get_data(self)352class TrustBitmap(CDPElement):353 Type = 0x12354 355 def get_type(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!