Best Python code snippet using autotest_python
test_ovs.py
Source:test_ovs.py
...253 exists.return_value = True254 ovs.full_restart()255 service.assert_called_with('start', 'openvswitch-force-reload-kmod')256 @patch('subprocess.check_output')257 def test_port_to_br(self, check_output):258 check_output.return_value = b'br-ex'259 self.assertEqual(ovs.port_to_br('br-lb'),260 'br-ex')261 @patch('subprocess.check_output')262 def test_port_to_br_not_found(self, check_output):263 check_output.side_effect = subprocess.CalledProcessError(1, 'not found')264 self.assertEqual(ovs.port_to_br('br-lb'), None)265 @patch('subprocess.check_call')266 def test_enable_ipfix_defaults(self, check_call):267 ovs.enable_ipfix('br-int',268 '10.5.0.10:4739')269 check_call.assert_called_once_with([270 'ovs-vsctl', 'set', 'Bridge', 'br-int', 'ipfix=@i', '--',271 '--id=@i', 'create', 'IPFIX',272 'targets="10.5.0.10:4739"',273 'sampling=64',274 'cache_active_timeout=60',275 'cache_max_flows=128',276 ])277 @patch('subprocess.check_call')278 def test_enable_ipfix_values(self, check_call):...
__init__.py
Source:__init__.py
...79 import netifaces80 # NOTE(jamespage):81 # Older code supported addition of a linuxbridge directly82 # to an OVS bridge; ensure we don't break uses on upgrade83 existing_ovs_bridge = port_to_br(bridge)84 if existing_ovs_bridge is not None:85 log('Linuxbridge {} is already directly in use'86 ' by OVS bridge {}'.format(bridge, existing_ovs_bridge),87 level=INFO)88 return89 # NOTE(jamespage):90 # preserve existing naming because interfaces may already exist.91 ovsbridge_port = "veth-" + name92 linuxbridge_port = "veth-" + bridge93 if (len(ovsbridge_port) > MAX_KERNEL_INTERFACE_NAME_LEN or94 len(linuxbridge_port) > MAX_KERNEL_INTERFACE_NAME_LEN):95 # NOTE(jamespage):96 # use parts of hashed bridgename (openstack style) when97 # a bridge name exceeds 15 chars98 hashed_bridge = hashlib.sha256(bridge.encode('UTF-8')).hexdigest()99 base = '{}-{}'.format(hashed_bridge[:8], hashed_bridge[-2:])100 ovsbridge_port = "cvo{}".format(base)101 linuxbridge_port = "cvb{}".format(base)102 interfaces = netifaces.interfaces()103 for interface in interfaces:104 if interface == ovsbridge_port or interface == linuxbridge_port:105 log('Interface {} already exists'.format(interface), level=INFO)106 return107 log('Adding linuxbridge {} to ovsbridge {}'.format(bridge, name),108 level=INFO)109 check_for_eni_source()110 with open('/etc/network/interfaces.d/{}.cfg'.format(111 linuxbridge_port), 'w') as config:112 config.write(BRIDGE_TEMPLATE.format(linuxbridge_port=linuxbridge_port,113 ovsbridge_port=ovsbridge_port,114 bridge=bridge))115 subprocess.check_call(["ifup", linuxbridge_port])116 add_bridge_port(name, linuxbridge_port)117def is_linuxbridge_interface(port):118 ''' Check if the interface is a linuxbridge bridge119 :param port: Name of an interface to check whether it is a Linux bridge120 :returns: True if port is a Linux bridge'''121 if os.path.exists('/sys/class/net/' + port + '/bridge'):122 log('Interface {} is a Linux bridge'.format(port), level=DEBUG)123 return True124 else:125 log('Interface {} is not a Linux bridge'.format(port), level=DEBUG)126 return False127def set_manager(manager):128 ''' Set the controller for the local openvswitch '''129 log('Setting manager for local ovs to {}'.format(manager))130 subprocess.check_call(['ovs-vsctl', 'set-manager',131 'ssl:{}'.format(manager)])132def set_Open_vSwitch_column_value(column_value):133 """134 Calls ovs-vsctl and sets the 'column_value' in the Open_vSwitch table.135 :param column_value:136 See http://www.openvswitch.org//ovs-vswitchd.conf.db.5.pdf for137 details of the relevant values.138 :type str139 :raises CalledProcessException: possibly ovsdb-server is not running140 """141 log('Setting {} in the Open_vSwitch table'.format(column_value))142 subprocess.check_call(['ovs-vsctl', 'set', 'Open_vSwitch', '.', column_value])143CERT_PATH = '/etc/openvswitch/ovsclient-cert.pem'144def get_certificate():145 ''' Read openvswitch certificate from disk '''146 if os.path.exists(CERT_PATH):147 log('Reading ovs certificate from {}'.format(CERT_PATH))148 with open(CERT_PATH, 'r') as cert:149 full_cert = cert.read()150 begin_marker = "-----BEGIN CERTIFICATE-----"151 end_marker = "-----END CERTIFICATE-----"152 begin_index = full_cert.find(begin_marker)153 end_index = full_cert.rfind(end_marker)154 if end_index == -1 or begin_index == -1:155 raise RuntimeError("Certificate does not contain valid begin"156 " and end markers.")157 full_cert = full_cert[begin_index:(end_index + len(end_marker))]158 return full_cert159 else:160 log('Certificate not found', level=WARNING)161 return None162def check_for_eni_source():163 ''' Juju removes the source line when setting up interfaces,164 replace if missing '''165 with open('/etc/network/interfaces', 'r') as eni:166 for line in eni:167 if line == 'source /etc/network/interfaces.d/*':168 return169 with open('/etc/network/interfaces', 'a') as eni:170 eni.write('\nsource /etc/network/interfaces.d/*')171def full_restart():172 ''' Full restart and reload of openvswitch '''173 if os.path.exists('/etc/init/openvswitch-force-reload-kmod.conf'):174 service('start', 'openvswitch-force-reload-kmod')175 else:176 service('force-reload-kmod', 'openvswitch-switch')177def enable_ipfix(bridge, target):178 '''Enable IPfix on bridge to target.179 :param bridge: Bridge to monitor180 :param target: IPfix remote endpoint181 '''182 cmd = ['ovs-vsctl', 'set', 'Bridge', bridge, 'ipfix=@i', '--',183 '--id=@i', 'create', 'IPFIX', 'targets="{}"'.format(target)]184 log('Enabling IPfix on {}.'.format(bridge))185 subprocess.check_call(cmd)186def disable_ipfix(bridge):187 '''Diable IPfix on target bridge.188 :param bridge: Bridge to modify189 '''190 cmd = ['ovs-vsctl', 'clear', 'Bridge', bridge, 'ipfix']191 subprocess.check_call(cmd)192def port_to_br(port):193 '''Determine the bridge that contains a port194 :param port: Name of port to check for195 :returns str: OVS bridge containing port or None if not found196 '''197 try:198 return subprocess.check_output(199 ['ovs-vsctl', 'port-to-br', port]200 ).decode('UTF-8').strip()201 except subprocess.CalledProcessError:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!