Best Python code snippet using lisa_python
get_interfaces.py
Source:get_interfaces.py
...56 if "local-address" in r1:57 tun["local_address"] = r1["local-address"]58 tun["remote_address"] = r1["remote-address"]59 return60 def get_mtu(self, r):61 if is_int(r.get("mtu")):62 return int(r["mtu"])63 if is_int(r.get("actual-mtu")):64 return int(r["actual-mtu"])65 return None66 def execute_cli(self):67 ifaces = {}68 misc = {}69 # Get ifIndex70 n_ifindex = {} # number -> ifIndex71 for n, f, r in self.cli_detail("/interface print oid without-paging"):72 n_ifindex[n] = int(r["name"].rsplit(".", 1)[-1])73 time.sleep(1)74 # Fill interfaces75 a = self.cli_detail("/interface print detail without-paging")76 for n, f, r in a:77 if r["type"] in self.ignored_types:78 continue79 if not r["type"] in "vlan": # TODO: Check other types80 ifaces[r["name"]] = {81 "name": r["name"],82 "type": self.type_map[r["type"]],83 "admin_status": "X" not in f,84 "oper_status": "R" in f,85 "enabled_protocols": [],86 "subinterfaces": [],87 }88 if "mac-address" in r:89 ifaces[r["name"]]["mac"] = r["mac-address"]90 if "mac" in r:91 ifaces[r["name"]]["mac"] = r["mac"]92 misc[r["name"]] = {"type": r["type"]}93 if n in n_ifindex:94 ifaces[r["name"]]["snmp_ifindex"] = n_ifindex[n]95 if "default-name" in r:96 ifaces[r["name"]]["default_name"] = r["default-name"]97 if (98 r["type"].startswith("ipip-")99 or r["type"].startswith("eoip-")100 or r["type"].startswith("gre-")101 ):102 self.si = {103 "name": r["name"],104 "admin_status": "X" not in f,105 "oper_status": "R" in f,106 "enabled_afi": ["IPv4"],107 "enabled_protocols": [],108 }109 if self.get_mtu(r) is not None:110 self.si["mtu"] = self.get_mtu(r)111 if r["type"].startswith("ipip-"):112 self.get_tunnel("IPIP", "R", "IPv4", ifaces)113 if r["type"].startswith("eoip-"):114 self.get_tunnel("EOIP", "R", "IPv4", ifaces)115 if r["type"].startswith("gre-"):116 self.get_tunnel("GRE", "R", "IPv4", ifaces)117 ifaces[r["name"]]["subinterfaces"] += [self.si]118 time.sleep(1)119 # Attach `vlan` subinterfaces to parent120 for n, f, r in self.cli_detail("/interface vlan print detail without-paging"):121 if r["interface"] in ifaces:122 i = ifaces[r["interface"]]123 self.si = {124 "name": r["name"],125 "mac": r.get("mac-address") or r.get("mac"),126 "admin_status": "X" not in f,127 "oper_status": "R" in f,128 "enabled_afi": [],129 "vlan_ids": [int(r["vlan-id"])],130 "enabled_protocols": [],131 }132 if self.get_mtu(r) is not None:133 self.si["mtu"] = self.get_mtu(r)134 i["subinterfaces"] += [self.si]135 # process internal `switch` ports and vlans136 vlan_tags = {}137 # "RB532", "x86", CCR1009 not support internal switch port138 try:139 v = self.cli_detail("/interface ethernet switch port print detail without-paging")140 for n, f, r in v:141 if "vlan-mode" not in r:142 continue143 if (144 r["vlan-mode"] in ["check", "secure"]145 and r["vlan-header"] in ["add-if-missing", "leave-as-is"]146 and r["default-vlan-id"] != "auto"147 and int(r["default-vlan-id"]) != 0148 ):149 vlan_tags[r["name"]] = True150 else:151 vlan_tags[r["name"]] = False152 except self.CLISyntaxError:153 pass154 # "RB532", "x86", CCR1009 not support internal switch port155 try:156 # Attach subinterfaces with `BRIDGE` AFI to parent157 v = self.cli_detail(158 "/interface ethernet switch vlan print detail without-paging", cached=True159 )160 for n, f, r in v:161 # vlan-id=auto ? Need more testing162 if not is_int(r["vlan-id"]):163 continue164 vlan_id = int(r["vlan-id"])165 ports = r["ports"].split(",")166 if not ports:167 continue168 for p in ports:169 if p not in ifaces:170 continue171 i = ifaces[p]172 self.si = {173 "name": p,174 "mac": i.get("mac"),175 "admin_status": i.get("admin_status"),176 "oper_status": i.get("oper_status"),177 "enabled_afi": ["BRIDGE"],178 "enabled_protocols": [],179 "tagged_vlans": [],180 }181 if self.get_mtu(i) is not None:182 self.si["mtu"] = self.get_mtu(i)183 if p in vlan_tags:184 if vlan_tags[p]:185 self.si["tagged_vlans"] += [vlan_id]186 else:187 self.si["untagged_vlan"] = vlan_id if vlan_id > 0 else 1188 # Try to find in already created subinterfaces189 found = False190 for sub in i["subinterfaces"]:191 if sub["name"] == p:192 if p in vlan_tags:193 if vlan_tags[p]:194 sub["tagged_vlans"] += [vlan_id]195 else:196 sub["utagged_vlan"] = vlan_id197 found = True198 if not found:199 i["subinterfaces"] += [self.si]200 except self.CLISyntaxError:201 pass202 # Vlans on bridge203 try:204 v = self.cli_detail("/interface bridge vlan print detail without-paging", cached=True)205 for n, f, d in v:206 if "X" in f or "D" in f:207 continue208 vlans = self.expand_rangelist(d["vlan-ids"])209 for vlan_id in vlans:210 untagged = d["untagged"].split(",")211 for p in untagged:212 if p not in ifaces:213 continue214 i = ifaces[p]215 self.si = {216 "name": p,217 "mac": i.get("mac"),218 "admin_status": i.get("admin_status"),219 "oper_status": i.get("oper_status"),220 "enabled_afi": ["BRIDGE"],221 "enabled_protocols": [],222 "utagged_vlans": vlan_id,223 }224 if self.get_mtu(i) is not None:225 self.si["mtu"] = self.get_mtu(i)226 # Try to find in already created subinterfaces227 for sub in i["subinterfaces"]:228 if sub["name"] == p:229 sub["utagged_vlan"] = vlan_id230 break231 else:232 i["subinterfaces"] += [self.si]233 tagged = d["tagged"].split(",")234 for p in tagged:235 if p not in ifaces:236 continue237 i = ifaces[p]238 self.si = {239 "name": p,240 "mac": i.get("mac"),241 "admin_status": i.get("admin_status"),242 "oper_status": i.get("oper_status"),243 "enabled_afi": ["BRIDGE"],244 "enabled_protocols": [],245 "tagged_vlans": [vlan_id],246 }247 if self.get_mtu(i) is not None:248 self.si["mtu"] = self.get_mtu(i)249 # Try to find in already created subinterfaces250 for sub in i["subinterfaces"]:251 if sub["name"] == p:252 if "tagged_vlans" in sub and vlan_id not in sub["tagged_vlans"]:253 sub["tagged_vlans"] += [vlan_id]254 else:255 sub["tagged_vlans"] = [vlan_id]256 break257 else:258 i["subinterfaces"] += [self.si]259 except self.CLISyntaxError:260 pass261 # Refine ip addresses262 for n, f, r in self.cli_detail("/ip address print detail without-paging"):263 if "X" in f:264 continue265 self.si = {}266 if r["interface"] in ifaces:267 i = ifaces[r["interface"]]268 t = misc[r["interface"]]269 if not i["subinterfaces"]:270 self.si = {271 "name": r["interface"],272 "enabled_afi": [],273 # XXX Workaround274 "admin_status": i["admin_status"],275 "oper_status": i["oper_status"],276 "enabled_protocols": [],277 }278 if "mac" in i:279 self.si["mac"] = i["mac"]280 i["subinterfaces"] += [self.si]281 else:282 for sub in i["subinterfaces"]:283 if sub["name"] == r["interface"]:284 self.logger.debug(285 "\nError: subinterfaces already exists in interface \n%s\n" % i286 )287 break288 else:289 self.si = {290 "name": r["interface"],291 "enabled_afi": [],292 # XXX Workaround293 "admin_status": i["admin_status"],294 "oper_status": i["oper_status"],295 "enabled_protocols": [],296 }297 if "mac" in i:298 self.si["mac"] = i["mac"]299 i["subinterfaces"] += [self.si]300 else:301 for i in ifaces:302 iface = ifaces[i]303 for s in iface["subinterfaces"]:304 if s["name"] == r["interface"]:305 self.si = s306 break307 if self.si:308 t = misc[i]309 break310 if not self.si:311 self.logger.debug("Error: Interface name not found!!!")312 continue313 afi = "IPv6" if ":" in r["address"] else "IPv4"314 if afi not in self.si["enabled_afi"]:315 self.si["enabled_afi"] += [afi]316 if afi == "IPv4":317 a = self.si.get("ipv4_addresses", [])318 a += [r["address"]]319 self.si["ipv4_addresses"] = a320 else:321 a = self.si.get("ipv6_addresses", [])322 a += [r["address"]]323 self.si["ipv6_addresses"] = a324 # Tunnel types325 # XXX /ip address print detail do not print tunnels !!!326 # Need reworks !!!327 if t["type"].startswith("ppp-"):328 self.get_tunnel("PPP", f, afi, r)329 if t["type"].startswith("pppoe-"):330 self.get_tunnel("PPPOE", f, afi, r)331 if t["type"].startswith("ovpn-"):332 self.si["tunnel"] = {}333 if t["type"].startswith("l2tp-"):334 self.get_tunnel("L2TP", f, afi, r)335 if t["type"].startswith("pptp-"):336 self.get_tunnel("PPTP", f, afi, r)337 if t["type"].startswith("ovpn-"):338 self.get_tunnel("PPP", f, afi, r)339 if t["type"].startswith("sstp-"):340 self.get_tunnel("SSTP", f, afi, r)341 # bridge342 for n, f, r in self.cli_detail("/interface bridge print detail without-paging"):343 self.si = {}344 if r["name"] in ifaces:345 i = ifaces[r["name"]]346 if not i["subinterfaces"]:347 self.si = {348 "name": r["name"],349 "enabled_afi": ["BRIDGE"],350 # XXX Workaround351 "admin_status": i["admin_status"],352 "oper_status": i["oper_status"],353 "mac": r["mac-address"],354 "enabled_protocols": [],355 }356 if self.get_mtu(i) is not None:357 self.si["mtu"] = self.get_mtu(i)358 i["subinterfaces"] += [self.si]359 else:360 i["subinterfaces"][0]["enabled_afi"] += ["BRIDGE"]361 if r["protocol-mode"] in ["stp", "rstp"]:362 i["enabled_protocols"] += ["STP"]363 # bonding364 for n, f, r in self.cli_detail("/interface bonding print detail without-paging"):365 self.si = {}366 if r["name"] in ifaces:367 i = ifaces[r["name"]]368 if not i["subinterfaces"]:369 self.si = {370 "name": r["name"],371 "enabled_afi": [],372 # XXX Workaround373 "admin_status": i["admin_status"],374 "oper_status": i["oper_status"],375 "mac": r["mac-address"],376 "enabled_protocols": [],377 }378 if self.get_mtu(i) is not None:379 self.si["mtu"] = self.get_mtu(i)380 i["subinterfaces"] += [self.si]381 if r["mode"] in ["802.3ad"]:382 i["enabled_protocols"] += ["LACP"]383 if r["slaves"]:384 slaves = r["slaves"].split(",")385 for s in slaves:386 if s in ifaces:387 ifaces[s]["aggregated_interface"] = r["name"]388 # OSPF389 try:390 ospf_result = self.cli_detail("/routing ospf interface print detail without-paging")391 for n, f, r in ospf_result:392 self.si = {}393 if r["interface"] in ifaces:...
wifi_tx_udp_file.py
Source:wifi_tx_udp_file.py
1from scapy.all import IP,UDP,LLC,SNAP,repr_hex2import socket3import time4import sys5import os6#################################################7############ Default FISSURE Header #############8#################################################9def getArguments():10 udp_source_ip = "192.168.1.200"11 udp_dest_ip = "192.168.1.5"12 udp_source_port = "14321"13 udp_dest_port = "55555"14 filepath = ''15 interval = .116 mtu = 100017 wifi_tx_udp_port = 5200118 wifi_tx_ip_address = "127.0.0.1"19 notes = 'Reads in bytes of data and puts it in a UDP payload. Messages are placed in UDP data and sent to a wifi_tx.py UDP port.'20 arg_names = ['udp_source_ip','udp_dest_ip','udp_source_port','udp_dest_port','filepath','interval','mtu','wifi_tx_udp_port','wifi_tx_ip_address','notes']21 arg_values = [udp_source_ip, udp_dest_ip, udp_source_port, udp_dest_port, filepath, interval, mtu, wifi_tx_udp_port, wifi_tx_ip_address, notes]22 return (arg_names,arg_values)23if __name__ == "__main__":24 # Default Values25 get_udp_source_ip = "192.168.1.200"26 get_udp_dest_ip = "192.168.1.5"27 get_udp_source_port = "14321"28 get_udp_dest_port = "55555"29 get_filepath = ''30 get_interval = .131 get_mtu = 100032 get_wifi_tx_udp_port = 5200133 get_wifi_tx_ip_address = "127.0.0.1"34 # Accept Command Line Arguments35 try:36 get_udp_source_ip = sys.argv[1]37 get_udp_dest_ip = sys.argv[2]38 get_udp_source_port = sys.argv[3]39 get_udp_dest_port = sys.argv[4]40 get_filepath = sys.argv[5]41 get_interval = float(sys.argv[6])42 get_mtu = int(sys.argv[7])43 get_wifi_tx_udp_port = int(sys.argv[8])44 get_wifi_tx_ip_address = sys.argv[9]45 except:46 pass47#################################################48 if len(get_filepath) > 0:49 # LLC/SNAP/IP/UDP Message in UDP Data -> "wifi_tx.py" Port50 udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)51 # LLC/SNAP/IP/UDP52 pip_bytes = LLC()/SNAP()/IP()/UDP()53 pip_bytes[IP].src = get_udp_source_ip54 pip_bytes[IP].dst = get_udp_dest_ip55 pip_bytes[IP].id = 0x000056 pip_bytes[IP].flags = 257 pip_bytes[UDP].sport = int(get_udp_source_port)58 pip_bytes[UDP].dport = int(get_udp_dest_port)59 60 number_of_bytes = os.path.getsize(get_filepath)61 file = open(get_filepath,"rb") # Open the file 62 63 # Send File Data64 for n in range(0, number_of_bytes, get_mtu):65 file.seek(n)66 transfer_data = file.read(get_mtu) 67 final_bytes = str(pip_bytes/transfer_data) 68 udp_socket.sendto(final_bytes,(get_wifi_tx_ip_address, get_wifi_tx_udp_port))69 time.sleep(get_interval)70 # Send Last File Read71 file_remainder = number_of_bytes % get_mtu72 transfer_data = file.read(file_remainder)73 final_bytes = str(pip_bytes/transfer_data)74 udp_socket.sendto(final_bytes,(get_wifi_tx_ip_address, get_wifi_tx_udp_port))75 file.close()76 print "File Relay Complete"77 78 else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!