Best Python code snippet using autotest_python
OvsContainer.py
Source:OvsContainer.py
...24 def init_get_br(self):25 ovs = get_ctl()26 br = ovs.list_br(self.ip, self.port).execute()27 for b in br:28 if not self.br_exist(b):29 self.br_list[b] = OvsBr(b, self)30 def get_br(self, br_name):31 return self.br_list[br_name] if self.br_exist(br_name) else None32 def get_all(self):33 return self.br_list34 def br_exist(self, br_name):35 return br_name in self.br_list36 def add_br(self, br_name):37 '''38 do39 ovs-vsctl add-br [br-name]40 :return:41 '''42 if self.br_exist(br_name):43 raise NameError44 ovs = get_ctl()45 try:46 ovs.add_br(br_name, remote_ovs=self.ip, remote_port=self.port).execute()47 except Exception as e:48 print 'add_br failed'49 br = OvsBr(br_name,self)50 self.br_list[br_name] = br51 return br52 def del_br(self, br_name):53 '''54 do55 ovs-vsctl del-br [br-name]56 :return:57 '''58 self.init_get_br()59 if not self.br_exist(br_name):60 raise NameError61 ovs = get_ctl()62 try:63 ovs.del_br(br_name, remote_ovs=self.ip, remote_port=self.port).execute()64 except Exception as e:65 print 'del_br failed'66 self.br_list.pop(br_name)67 return68 def __str__(self, level=0):69 ret = '\t'*level + ':'.join([self.ip, self.port]) + '\n'70 for br_name in self.br_list:71 ret += self.br_list[br_name].__str__()72 return ret73class OvsBr(object):...
global_statics.py
Source:global_statics.py
...98 def clean_self(self):99 try:100 for ovs_br in self.ovs_configure:101 access_container = _ovs_manager.get_container(self.entity_entry.entity_gate)102 if not access_container.br_exist(ovs_br['access_br_name']) or not access_container:103 raise NameError104 access_container.del_br(str(ovs_br['access_br_name']))105 base_br = access_container.get_br('br_base')106 base_br.del_port(''.join(['patch_', ovs_br['access_br_name'], '_master']))107 container = _ovs_manager.get_container(ovs_br['container_ip'])108 if not container.br_exist(ovs_br['agent_br_name']) or not container:109 raise NameError110 container.del_br(str(ovs_br['agent_br_name']))111 except Exception, e:112 self.LOG.exception(e)113 self.LOG.info("case {0} clean ovs ok".format(self.case_id))114 clean_data(self.case_id)115 self.LOG.info("case {0} clean database ok".format(self.case_id))...
EntityAccess.py
Source:EntityAccess.py
1from apps.parabola.ovs_manager import OvsManager2import apps.parabola.static as data3BASE_BR_NAME='br_base'4ovs_manager = OvsManager.OvsManager.get_instance()5class EntityAccess(object):6 def __init__(self, entity_gate, interface=data.OVS.EntityAccess.INTERFACE, db_port=data.OVS.MANAGERPORT):7 self.entity_gate = entity_gate8 self.db_port = db_port9 self.out_port_name = interface10 self.base_container = None11 self.init_base_container()12 def init_base_container(self):13 if not ovs_manager.get_container(self.entity_gate):14 self.base_container = ovs_manager.add_container(self.entity_gate, self.db_port)[0]15 self.out_br = self.base_container.br_list.get(BASE_BR_NAME, None)16 if not self.out_br:17 self.out_br = self.base_container.add_br(BASE_BR_NAME)18 self.out_port = self.out_br.port_list.get(self.out_port_name, None)19 if not self.out_port:20 self.out_port = self.out_br.add_port(self.out_port_name, port_type=data.TRUNK,trunk_permit=data.OVS.EntityAccess.TRUNK)21 def add_agent_access_br(self, br_name=None, patch_port_name=None, remote_port_name=None, agent_ip=None, vxlan_key=None, vlan_id=None):22 br_exist = self.base_container.br_list.get(br_name, None)23 out_br_patch_name = patch_port_name + '_master'24 br_name = str(br_name)25 if not br_exist:26 access_br = self.base_container.add_br(br_name=br_name)27 if access_br:28 if not access_br.port_exist(port_name=patch_port_name):29 access_br.add_port(patch_port_name, port_type=data.PATCH, patch_peer=out_br_patch_name)30 if not access_br.port_exist(port_name=remote_port_name):31 access_br.add_port(remote_port_name, port_type=data.VXLAN, vxlan_remote_ip=agent_ip, vxlan_key=vxlan_key)32 self.out_br.add_port(out_br_patch_name, vlan_vid=vlan_id, port_type=data.PATCH, patch_peer=patch_port_name)33class AgentConfig(object):34 def __init__(self, agent_ip, agent_port=data.OVS.MANAGERPORT):35 self.agent_ip = agent_ip36 self.agent_port = agent_port37 self.ovs_manager = ovs_manager38 ovs_manager.add_container(agent_ip, agent_port)39 def access_port(self, access_br=None, entity_gate=None, access_port=None, vxlan_key=None):40 container = self.ovs_manager.get_container(self.agent_ip)41 access_br = str(access_br)42 if access_br:43 access_datapath = container.get_br(access_br)44 if not access_datapath:45 access_datapath = container.add_br(access_br)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!