Best Python code snippet using autotest_python
net_utils.py
Source:net_utils.py
...224 return 'enabled' in output225 return False226 def enable_promisc(self):227 bin_utils.system('ifconfig %s promisc' % self._name)228 def disable_promisc(self):229 bin_utils.system('ifconfig %s -promisc' % self._name)230 def get_hwaddr(self):231 f = open('/sys/class/net/%s/address' % self._name)232 hwaddr = f.read().strip()233 f.close()234 return hwaddr235 def set_hwaddr(self, hwaddr):236 bin_utils.system('ifconfig %s hw ether %s' % (self._name, hwaddr))237 def add_maddr(self, maddr):238 bin_utils.system('ip maddr add %s dev %s' % (maddr, self._name))239 def del_maddr(self, maddr):240 bin_utils.system('ip maddr del %s dev %s' % (maddr, self._name))241 def get_ipaddr(self):242 ipaddr = "0.0.0.0"...
testing.py
Source:testing.py
...53 return54def enable_promisc(interface):55 #enable promiscuous listening mode on every interface:56 os.system(f'ifconfig {interface} promisc')57def disable_promisc(interface):58 #disable promiscuous listening mode on every interface:59 os.system(f'ifconfig {interface} -promisc')60def format_packet_data(packet):61 packet_data = {'timestamp' : packet.sniff_timestamp,}62 if 'ETH' in str(packet.layers):63 packet_data['eth'] = { 'src' : packet.eth.src,64 'dst' : packet.eth.dst,}65 if 'TCP' in str(packet.layers) or 'UDP' in str(packet.layers):66 packet_data['ip'] = { 'src' : packet.ip.src,67 'dst' : packet.ip.dst,}68 if 'TCP' in str(packet.layers):69 packet_data['tcp'] = { 'src' : packet.tcp.srcport,70 'dst' : packet.tcp.dstport,}71 if 'UDP' in str(packet.layers):72 packet_data['udp'] = { 'src' : packet.udp.srcport,73 'dst' : packet.udp.dstport,}74 if 'ARP' in str(packet.layers):75 packet_data['arp'] = { 'mac' : { 'src' : packet.arp.src_hw_mac,76 'dst' : packet.arp.dst_hw_mac,77 },78 'ip' : { 'src' : packet.arp.src_proto_ipv4,79 'dst' : packet.arp.dst_proto_ipv4,80 },81 'code': packet.arp.opcode,}82 # packet.pretty_print()83 return json.dumps(packet_data)84def live_capture(interface=None):85 #start a live packet capture on the specified interface86 print(f'waiting for packets on {interface}')87 enable_promisc(interface)88 capture = pyshark.LiveCapture(interface=interface, only_summaries=False)89 for packet in capture.sniff_continuously():90 with open(f'./pcap_{interface}.log', 'a') as log:91 log.write(format_packet_data(packet) + '\n') 92 disable_promisc(interface)93 return94def create_captures():95 #create a seperate capture for each interface96 captures =[]97 find_interfaces()98 for interface in interfaces:99 p = multiprocessing.Process(target=live_capture, args=(interface,))100 captures.append(p)101 p.start()102 return captures103def add_new_data_from(logfile):104 new_packets = []105 with open(logfile, 'r') as log:106 for packet in log:...
test-net-dev.py
Source:test-net-dev.py
1#!/usr/bin/env python32import os3import shutil4import sys5import time6from pwn import *7DIRECTORY = os.environ.get('VMM_DIRECTORY', '/app')8DEBUG_CONSOLE_PATH = os.environ.get('VMM_DEBUG_CONSOLE_PATH', '/app/ooowsserial-debug.py')9DISK_IMAGE_PATH = os.environ.get('VMM_DISK_IMAGE_PATH', '/app/disk')10OS_BOOTED = b"OOO OS BOOTED"11PREPARE_NET = b"prepare_net"12SET_DRIVER_FEATURES = b"net_set_driver_features"13ENABLE_CHKSUM_TX = b"net_enable_chksum_tx"14DISABLE_CHKSUM_TX = b"net_disable_chksum_tx"15ENABLE_ETH_CRC_TX = b"net_enable_tx_eth"16DISABLE_ETH_CRC_TX = b"net_disable_tx_eth"17ENABLE_PROMISC = b"net_enable_promisc"18DISABLE_PROMISC = b"net_disable_promisc"19TX_NORMAL_PACKET = b"tx_pkt"20TX_IPV4_PACKET = b"tx_ipv4_pkt"21RX_PACKET = b"rx_pkt"22LISTEN_ONE_MSG_CMD = ["mosquitto_sub", "-C", "1", "-N", "-t", "VPC"]23SEND_ONE_MSG_CMD = ["mosquitto_pub", "-s", "-t", "VPC"]24NORMAL_PACKET = b"DSTMACSRCMACLN\x61TLIDFOTP11IPIPDTDTDTDTAAA"25NORMAL_PACKET_CRC = b"\x4d\x1a\x85\xa2"26IPV4_NORMAL_PACKET = b"DSTMACSRCMACLN\x45GTLIDFOTP__IPIPDTDTDTDTAA"27IPV4_NORMAL_PACKET_CRC = b"\x7b\xa2\x62\xfa"28IPV4_CHKSUM_PACKET = b"DSTMACSRCMACLN\x45GTLIDFOTP<tIPIPDTDTDTDTAA"29BROADCAST_PACKET =b"TAAG\xff\xff\xff\xff\xff\xffSRCMACBodyOfPacketYo"30NON_BROADCAST_PACKET =b"TAAG\x11\x11\x11\x11\x11\xffSRCMACBodyOfPacketYo"31# context.log_level = 'debug'32def check_pkt_tx(p, cmd, correct):33 # Try to receive a packet34 listen = process(LISTEN_ONE_MSG_CMD)35 p.sendline(cmd)36 received_packet = listen.readall()37 assert(len(received_packet) > 4)38 pkt_data = received_packet[4:]39 if len(pkt_data) != len(correct):40 print("PUBLIC: packet tx fail: incorrect size")41 sys.exit(-1)42 if pkt_data != correct:43 print("PUBLIC: packet tx fail: incorrect data received")44 sys.exit(-1)45 listen.kill()46def check_pkt_rx(p, cmd, pkt, expected):47 p.sendline(cmd)48 p.recvuntil(cmd)49 # give it time to call into the driver50 time.sleep(10)51 send = process(SEND_ONE_MSG_CMD)52 send.send(pkt)53 send.shutdown()54 hex_size_regex = b"([0-9a-bA-B]+)\n"55 result = p.recvregex(hex_size_regex)56 match = re.search(hex_size_regex, result)57 size_hex = match.group(1)58 size = int(size_hex, 16)59 if size != len(expected):60 print("PUBLIC: packet rx fail: incorrect size")61 sys.exit(-1)62 raw_pkt = p.recvn(size)63 if raw_pkt != expected:64 print("PUBLIC: packet rx fail: incorrect data received")65 sys.exit(-1)66 send.kill()67def main():68 os.chdir(DIRECTORY)69 # Check if we need to run mosquitto server70 mosquitto = None71 if not 'DO_NOT_START_MOSQUITTO' in os.environ:72 mosquitto = process(["/usr/sbin/mosquitto", "-c", "/etc/mosquitto/mosquitto.conf"])73 shutil.copyfile(DEBUG_CONSOLE_PATH, "./devices-bin/ooowsserial.py")74 p = process(["./vmm", "test", DISK_IMAGE_PATH, "1", "devices.config"])75 p.recvuntil(OS_BOOTED)76 # important, need to initialize the structures77 p.recvuntil(b'$')78 p.sendline(PREPARE_NET)79 p.recvuntil(b'$')80 p.sendline(SET_DRIVER_FEATURES)81 p.recvuntil(b'$')82 check_pkt_tx(p, TX_NORMAL_PACKET, NORMAL_PACKET)83 p.recvuntil(b'$')84 check_pkt_tx(p, TX_IPV4_PACKET, IPV4_NORMAL_PACKET)85 p.recvuntil(b'$')86 p.sendline(ENABLE_CHKSUM_TX)87 p.recvuntil(b'$')88 check_pkt_tx(p, TX_IPV4_PACKET, IPV4_CHKSUM_PACKET)89 # Verify that non IPv4 packets are not affected90 p.recvuntil(b'$')91 check_pkt_tx(p, TX_NORMAL_PACKET, NORMAL_PACKET)92 p.recvuntil(b'$')93 p.sendline(DISABLE_CHKSUM_TX)94 p.recvuntil(b'$')95 p.sendline(ENABLE_ETH_CRC_TX)96 p.recvuntil(b'$')97 check_pkt_tx(p, TX_NORMAL_PACKET, NORMAL_PACKET + NORMAL_PACKET_CRC)98 p.recvuntil(b'$')99 check_pkt_tx(p, TX_IPV4_PACKET, IPV4_NORMAL_PACKET + IPV4_NORMAL_PACKET_CRC)100 p.recvuntil(b'$')101 check_pkt_rx(p, RX_PACKET, BROADCAST_PACKET, BROADCAST_PACKET[4:])102 p.recvuntil(b'$')103 p.sendline(ENABLE_PROMISC)104 p.recvuntil(b'$')105 check_pkt_rx(p, RX_PACKET, NON_BROADCAST_PACKET, NON_BROADCAST_PACKET[4:])106 p.kill()107 if mosquitto:108 mosquitto.kill()109if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!