Best Python code snippet using lisa_python
nestedperf.py
Source:nestedperf.py
...336 client_nic_name=self._NIC_NAME,337 test_case_name=inspect.stack()[1][3],338 )339 finally:340 self._linux_cleanup_nat(server_l1, self._BR_NAME, log)341 self._linux_cleanup_nat(client_l1, self._BR_NAME, log)342 @TestCaseMetadata(343 description="""344 This script runs ntttcp test on two nested VMs on different L1 guests345 connected with NAT346 """,347 priority=3,348 timeout=_TIME_OUT,349 requirement=simple_requirement(350 min_count=2,351 supported_os=[Windows],352 supported_platform_type=[AZURE, READY],353 ),354 )355 def perf_nested_hyperv_ntttcp_different_l1_nat(356 self,357 result: TestResult,358 variables: Dict[str, Any],359 log: Logger,360 ) -> None:361 environment = result.environment362 assert environment, "fail to get environment from testresult"363 server_l1 = cast(RemoteNode, environment.nodes[0])364 client_l1 = cast(RemoteNode, environment.nodes[1])365 # parse nested image variables366 (367 nested_image_username,368 nested_image_password,369 _,370 nested_image_url,371 ) = parse_nested_image_variables(variables)372 try:373 # setup nested vm on server in NAT configuration374 server_l2 = self._windows_setup_nat(375 node=server_l1,376 nested_vm_name="server_l2",377 guest_username=nested_image_username,378 guest_password=nested_image_password,379 guest_port=self._SERVER_HOST_FWD_PORT,380 guest_image_url=nested_image_url,381 )382 # setup nested vm on client in NAT configuration383 client_l2 = self._windows_setup_nat(384 node=client_l1,385 nested_vm_name="client_l2",386 guest_username=nested_image_username,387 guest_password=nested_image_password,388 guest_port=self._CLIENT_HOST_FWD_PORT,389 guest_image_url=nested_image_url,390 )391 # run ntttcp test392 perf_ntttcp(393 result,394 server_l2,395 client_l2,396 test_case_name=inspect.stack()[1][3],397 )398 finally:399 # cleanup server400 try:401 hyperv_remove_nested_vm(server_l1, "server_l2")402 except Exception as e:403 log.debug(f"Failed to clean up server vm: {e}")404 server_l1.mark_dirty()405 # cleanup client406 try:407 hyperv_remove_nested_vm(client_l1, "client_l2")408 except Exception as e:409 log.debug(f"Failed to clean up client vm: {e}")410 client_l1.mark_dirty()411 @TestCaseMetadata(412 description="""413 This script runs netperf test on two nested VMs on different L1 guests414 connected with NAT415 """,416 priority=3,417 timeout=_TIME_OUT,418 requirement=simple_requirement(419 min_count=2,420 network_interface=schema.NetworkInterfaceOptionSettings(421 nic_count=search_space.IntRange(min=2),422 ),423 disk=schema.DiskOptionSettings(424 data_disk_count=search_space.IntRange(min=1),425 data_disk_size=search_space.IntRange(min=12),426 ),427 ),428 )429 def perf_nested_kvm_netperf_pps_nat(430 self,431 result: TestResult,432 variables: Dict[str, Any],433 log: Logger,434 ) -> None:435 environment = result.environment436 assert environment, "fail to get environment from testresult"437 server_l1 = cast(RemoteNode, environment.nodes[0])438 client_l1 = cast(RemoteNode, environment.nodes[1])439 # parse nested image variables440 (441 nested_image_username,442 nested_image_password,443 _,444 nested_image_url,445 ) = parse_nested_image_variables(variables)446 try:447 # setup nested vm on server in NAT configuration448 server_l2 = self._linux_setup_nat(449 node=server_l1,450 nested_vm_name="server_l2",451 guest_username=nested_image_username,452 guest_password=nested_image_password,453 guest_port=self._SERVER_HOST_FWD_PORT,454 guest_image_url=nested_image_url,455 guest_internal_ip=self._SERVER_IP_ADDR,456 guest_default_nic=self._NIC_NAME,457 bridge_name=self._BR_NAME,458 bridge_network=self._BR_NETWORK,459 bridge_cidr=self._BR_CIDR,460 bridge_gateway=self._BR_GATEWAY,461 )462 # setup nested vm on client in NAT configuration463 client_l2 = self._linux_setup_nat(464 node=client_l1,465 nested_vm_name="client_l2",466 guest_username=nested_image_username,467 guest_password=nested_image_password,468 guest_port=self._CLIENT_HOST_FWD_PORT,469 guest_image_url=nested_image_url,470 guest_internal_ip=self._CLIENT_IP_ADDR,471 guest_default_nic=self._NIC_NAME,472 bridge_name=self._BR_NAME,473 bridge_network=self._BR_NETWORK,474 bridge_cidr=self._BR_CIDR,475 bridge_gateway=self._BR_GATEWAY,476 )477 # run netperf test478 perf_tcp_pps(result, "singlepps", server_l2, client_l2)479 finally:480 self._linux_cleanup_nat(server_l1, self._BR_NAME, log)481 self._linux_cleanup_nat(client_l1, self._BR_NAME, log)482 def _linux_setup_nat(483 self,484 node: RemoteNode,485 nested_vm_name: str,486 guest_username: str,487 guest_password: str,488 guest_port: int,489 guest_image_url: str,490 guest_internal_ip: str,491 guest_default_nic: str,492 bridge_name: str,493 bridge_network: str,494 bridge_cidr: str,495 bridge_gateway: str,496 ) -> RemoteNode:497 """498 Setup NAT on the node with following configurations:499 1. Forward traffic on node's eth0 interface and port `guest_port`500 to the nested VM's port 22.501 2. Forward all traffic on node's eth1 interface to the nested VM.502 """503 # get core count504 core_count = node.tools[Lscpu].get_core_count()505 node_eth1_ip = node.nics.get_nic("eth1").ip_addr506 bridge_dhcp_range = f"{guest_internal_ip},{guest_internal_ip}"507 # enable ip forwarding508 node.tools[Sysctl].write("net.ipv4.ip_forward", "1")509 # setup bridge510 node.tools[Ip].setup_bridge(bridge_name, f"{bridge_gateway}/{bridge_cidr}")511 node.tools[Ip].set_bridge_configuration(bridge_name, "stp_state", "0")512 node.tools[Ip].set_bridge_configuration(bridge_name, "forward_delay", "0")513 # reset bridge lease file to remove old dns leases514 node.execute(515 f"cp /dev/null /var/run/qemu-dnsmasq-{bridge_name}.leases", sudo=True516 )517 # start dnsmasq518 node.tools[Dnsmasq].start(bridge_name, bridge_gateway, bridge_dhcp_range)519 # reset filter table to accept all traffic520 node.tools[Iptables].reset_table()521 # reset nat table and setup nat forwarding522 node.tools[Iptables].reset_table("nat")523 node.tools[Iptables].run(524 f"-t nat -A POSTROUTING -s {bridge_network}/{bridge_cidr} -j MASQUERADE",525 sudo=True,526 force_run=True,527 )528 # start nested vm529 nested_vm = qemu_connect_nested_vm(530 node,531 guest_username,532 guest_password,533 guest_port,534 guest_image_url,535 taps=1,536 cores=core_count,537 bridge=bridge_name,538 stop_existing_vm=True,539 name=nested_vm_name,540 )541 # configure rc.local to run dhclient on reboot542 nested_vm.tools[StartConfiguration].add_command("ip link set dev ens4 up")543 nested_vm.tools[StartConfiguration].add_command("dhclient ens4")544 # reboot nested vm and close ssh connection545 nested_vm.execute("reboot", sudo=True)546 nested_vm.close()547 # route traffic on `eth0` and port `guest_port` on l1 vm to548 # port 22 on l2 vm549 node.tools[Iptables].run(550 f"-t nat -A PREROUTING -i eth0 -p tcp --dport {guest_port} "551 f"-j DNAT --to {guest_internal_ip}:22",552 sudo=True,553 force_run=True,554 )555 # route all tcp traffic on `eth1` port on l1 vm to l2 vm556 node.tools[Iptables].run(557 f"-t nat -A PREROUTING -i eth1 -d {node_eth1_ip} "558 f"-p tcp -j DNAT --to {guest_internal_ip}",559 sudo=True,560 force_run=True,561 )562 # wait till nested vm is up563 try_connect(564 schema.ConnectionInfo(565 address=node.public_address,566 port=guest_port,567 username=guest_username,568 password=guest_password,569 )570 )571 # set default nic interfaces on l2 vm572 nested_vm.internal_address = node_eth1_ip573 nested_vm.capability.network_interface = Synthetic()574 return nested_vm575 def _linux_cleanup_nat(576 self,577 node: RemoteNode,578 bridge_name: str,579 log: Logger,580 ) -> None:581 try:582 # stop running QEMU instances583 node.tools[Qemu].delete_vm()584 # clear bridge and taps585 node.tools[Ip].delete_interface(bridge_name)586 # flush ip tables587 node.tools[Iptables].reset_table()588 node.tools[Iptables].reset_table("nat")589 except Exception as e:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!