Best Python code snippet using tempest_python
unifi.py
Source:unifi.py
1import re2from typing import Dict, NoReturn, Union, cast3from .monitor import Monitor, register4try:5 from paramiko.client import SSHClient, RejectPolicy6 from paramiko.ssh_exception import SSHException7 ssh2_available = True8except ImportError:9 ssh2_available = False10@register11class MonitorUnifiFailover(Monitor):12 monitor_type = "unifi_failover"13 def __init__(self, name: str, config_options: dict) -> None:14 if "gap" not in config_options:15 config_options["gap"] = 300 # 5 mins16 super().__init__(name, config_options)17 self._router_address = cast(18 str, self.get_config_option("router_address", required=True)19 )20 self._username = cast(21 str, self.get_config_option("router_username", required=True)22 )23 self._password = self.get_config_option("router_password", required=False)24 self._ssh_key = self.get_config_option("ssh_key", required=False)25 if self._ssh_key is None and self._password is None:26 raise ValueError("must specify only one of router_password or ssh_key")27 self._check_interface = cast(28 str, self.get_config_option("check_interface", default="eth2")29 )30 def run_test(self) -> Union[NoReturn, bool]:31 if not ssh2_available:32 return self.record_fail("ssh2_python library is not installed")33 try:34 with SSHClient() as client:35 client.set_missing_host_key_policy(RejectPolicy)36 client.load_system_host_keys()37 client.connect(38 hostname=self._router_address,39 username=self._username,40 password=self._password,41 key_filename=self._ssh_key,42 )43 _, stdout, _ = client.exec_command( # nosec44 "sudo /usr/sbin/ubnt-hal wlbGetStatus"45 )46 data = {} # type: Dict[str, Dict[str, str]]47 data_block = {} # type: Dict[str, str]48 interface = ""49 for _line in stdout.readlines():50 line = _line.strip()51 matches = re.match(r"interface +: (\w+)", line)52 if matches:53 if interface != "" and len(data_block) > 0:54 data[interface] = data_block55 data_block = {}56 interface = matches.group(1)57 data_block = {}58 continue59 matches = re.match(r"carrier +: (\w+)", line)60 if matches:61 data_block["carrier"] = matches.group(1)62 continue63 matches = re.match(r"status +: (\w+)", line)64 if matches:65 data_block["status"] = matches.group(1)66 continue67 matches = re.match(r"gateway +: (\w+)", line)68 if matches:69 data_block["gateway"] = matches.group(1)70 if interface != "" and len(data_block) > 0:71 data[interface] = data_block72 except SSHException as error:73 self.monitor_logger.exception("Failed to ssh to USG")74 return self.record_fail("Failed to ssh to USG: {}".format(error))75 if self._check_interface not in data:76 self.monitor_logger.debug("processed data was %s", data)77 return self.record_fail(78 "Could not get status for interface {}".format(self._check_interface)79 )80 if data[self._check_interface]["carrier"] != "up":81 return self.record_fail(82 "Interface {} carrier is in status {} (wanted 'up')".format(83 self._check_interface, data[self._check_interface]["carrier"]84 )85 )86 if data[self._check_interface]["status"] != "failover":87 return self.record_fail(88 "Interface {} is in status {} (wanted 'failover')".format(89 self._check_interface, data[self._check_interface]["status"]90 )91 )92 if data[self._check_interface]["gateway"] == "unknown":93 return self.record_fail(94 "Interface {} has gateway {}".format(95 self._check_interface, data[self._check_interface]["gateway"]96 )97 )98 return self.record_success(99 "Interface {} is {} with status {}".format(100 self._check_interface,101 data[self._check_interface]["carrier"],102 data[self._check_interface]["status"],103 )104 )105 def describe(self) -> str:106 return "Checking USG at {} has interface {} up and not failed over".format(107 self._router_address, self._check_interface108 )109@register110class MonitorUnifiFailoverWatchdog(Monitor):111 monitor_type = "unifi_watchdog"112 def __init__(self, name: str, config_options: dict) -> None:113 if "gap" not in config_options:114 config_options["gap"] = 300 # 5 mins115 super().__init__(name, config_options)116 self._router_address = cast(117 str, self.get_config_option("router_address", required=True)118 )119 self._username = cast(120 str, self.get_config_option("router_username", required=True)121 )122 self._password = self.get_config_option("router_password", required=False)123 self._ssh_key = self.get_config_option("ssh_key", required=False)124 if self._ssh_key is None and self._password is None:125 raise ValueError("must specify only one of router_password or ssh_key")126 self._primary_interface = cast(127 str, self.get_config_option("primary_interface", default="pppoe0")128 )129 self._secondary_interface = cast(130 str, self.get_config_option("secondary_interface", default="eth2")131 )132 def run_test(self) -> Union[NoReturn, bool]:133 if not ssh2_available:134 return self.record_fail("ssh2_python library is not installed")135 try:136 with SSHClient() as client:137 client.set_missing_host_key_policy(RejectPolicy)138 client.load_system_host_keys()139 client.connect(140 hostname=self._router_address,141 username=self._username,142 password=self._password,143 key_filename=self._ssh_key,144 )145 _, stdout, _ = client.exec_command( # nosec146 "/usr/sbin/ubnt-hal wlbGetWdStatus"147 )148 data = {} # type: Dict[str, Dict[str, str]]149 data_block = {} # type: Dict[str, str]150 interface = ""151 for _line in stdout.readlines():152 line = _line.strip()153 matches = re.match(154 r"([a-z]+[0-9])", line155 ) # two spaces and an if name156 if matches:157 if interface != "" and len(data_block) > 0:158 data[interface] = data_block159 data_block = {}160 interface = matches.group(1)161 data_block = {}162 continue163 matches = re.match(r"status: (\w+)", line)164 if matches:165 data_block["status"] = matches.group(1)166 continue167 matches = re.match(r"ping gateway: ([^ ]+) - (\w+)", line)168 if matches:169 data_block["gateway"] = matches.group(1)170 data_block["ping_status"] = matches.group(2)171 continue172 if interface != "" and len(data_block) > 0:173 data[interface] = data_block174 except SSHException as error:175 self.monitor_logger.exception("Failed to ssh to USG")176 return self.record_fail("Failed to ssh to USG: {}".format(error))177 for interface in [self._primary_interface, self._secondary_interface]:178 if interface not in data:179 self.monitor_logger.debug("processed data was %s", data)180 return self.record_fail(181 "Could not get status for interface {}".format(interface)182 )183 if data[interface]["status"] != "Running":184 return self.record_fail(185 "Interface {} in status {} (wanted 'Running')".format(186 interface, data[interface]["status"]187 )188 )189 if data[interface]["ping_status"] != "REACHABLE":190 return self.record_fail(191 "Interface {} ping ({}) is {} (wanted 'REACHABLE')".format(192 interface,193 data[interface]["gateway"],194 data[interface]["ping_status"],195 )196 )197 return self.record_success(198 "Interfaces {} and {} both running and pinging".format(199 self._primary_interface, self._secondary_interface200 )201 )202 def describe(self) -> str:203 return "Checking USG at {} has interface {} and {} running and pinging".format(204 self._router_address, self._primary_interface, self._secondary_interface...
test_ovsdb_lib.py
Source:test_ovsdb_lib.py
...45 # Make sure exceptions pass through by calling do_post_commit directly46 mock.patch.object(47 impl_idl.OvsVsctlTransaction, 'post_commit',48 side_effect=impl_idl.OvsVsctlTransaction.do_post_commit).start()49 def _check_interface(self, port, parameter, expected_value):50 def check_value():51 return (self._ovsdb.db_get(52 'Interface', port, parameter).execute() == expected_value)53 self.assertTrue(base.wait_until_true(check_value, timeout=2,54 sleep=0.5))55 def _add_port(self, bridge, port, may_exist=True):56 with self._ovsdb.transaction() as txn:57 txn.add(self._ovsdb.add_port(bridge, port, may_exist=may_exist))58 txn.add(self._ovsdb.db_set('Interface', port,59 ('type', 'internal')))60 self.assertIn(port, self._list_ports_in_bridge(bridge))61 def _list_ports_in_bridge(self, bridge):62 return self._ovsdb.list_ports(bridge).execute()63 def _check_bridge(self, name):64 return self._ovsdb.br_exists(name).execute()65 def _add_bridge(self, name, may_exist=True, datapath_type=None):66 self._ovsdb.add_br(name, may_exist=may_exist,67 datapath_type=datapath_type).execute()68 self.assertTrue(self._check_bridge(name))69 def _del_bridge(self, name):70 self._ovsdb.del_br(name).execute()71 def test__set_mtu_request(self):72 port_name = 'port1-' + self.interface73 self._add_bridge(self.brname)74 self.addCleanup(self._del_bridge, self.brname)75 self._add_port(self.brname, port_name)76 if self.ovs._ovs_supports_mtu_requests():77 self.ovs._set_mtu_request(port_name, 1000)78 self._check_interface(port_name, 'mtu', 1000)79 self.ovs._set_mtu_request(port_name, 1500)80 self._check_interface(port_name, 'mtu', 1500)81 else:82 self.skipTest('Current version of Open vSwitch does not support '83 '"mtu_request" parameter')84 def test_create_ovs_vif_port(self):85 port_name = 'port2-' + self.interface86 iface_id = 'iface_id'87 mac = 'ca:fe:ca:fe:ca:fe'88 instance_id = uuidutils.generate_uuid()89 interface_type = constants.OVS_VHOSTUSER_INTERFACE_TYPE90 vhost_server_path = '/fake/path'91 mtu = 150092 self._add_bridge(self.brname)93 self.addCleanup(self._del_bridge, self.brname)94 self.ovs.create_ovs_vif_port(self.brname, port_name, iface_id, mac,95 instance_id, mtu=mtu,96 interface_type=interface_type,97 vhost_server_path=vhost_server_path)98 expected_external_ids = {'iface-status': 'active',99 'iface-id': iface_id,100 'attached-mac': mac,101 'vm-uuid': instance_id}102 self._check_interface(port_name, 'external_ids', expected_external_ids)103 self._check_interface(port_name, 'type', interface_type)104 expected_vhost_server_path = {'vhost-server-path': vhost_server_path}105 self._check_interface(port_name, 'options', expected_vhost_server_path)106 @mock.patch.object(linux_net, 'delete_net_dev')107 def test_delete_ovs_vif_port(self, *mock):108 port_name = 'port3-' + self.interface109 self._add_bridge(self.brname)110 self.addCleanup(self._del_bridge, self.brname)111 self._add_port(self.brname, port_name)112 self.ovs.delete_ovs_vif_port(self.brname, port_name)113 self.assertNotIn(port_name, self._list_ports_in_bridge(self.brname))114 def test_ensure_ovs_bridge(self):115 bridge_name = 'bridge2-' + self.interface116 self.ovs.ensure_ovs_bridge(bridge_name, constants.OVS_DATAPATH_SYSTEM)117 self.assertTrue(self._check_bridge(bridge_name))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!