Best Python code snippet using autotest_python
client.py
Source:client.py
...37 res = urlparse.SplitResult('', address, '', '', '')38 self.host = res.hostname39 self.port = res.port40 self.url_prefix = '/' + self.version + '/'41 def _do_request(self, method, action, body=None):42 conn = httplib.HTTPConnection(self.host, self.port)43 url = self.url_prefix + action44 headers = {}45 if body is not None:46 body = json.dumps(body)47 headers['Content-Type'] = 'application/json'48 conn.request(method, url, body, headers)49 res = conn.getresponse()50 if res.status in (httplib.OK,51 httplib.CREATED,52 httplib.ACCEPTED,53 httplib.NO_CONTENT):54 return res55 raise httplib.HTTPException(56 res, 'code %d reason %s' % (res.status, res.reason),57 res.getheaders(), res.read())58 def _do_request_read(self, method, action):59 res = self._do_request(method, action)60 return res.read()61class OFPClientV1_0(RyuClientBase):62 version = 'v1.0'63 # /networks/{network_id}/{dpid}_{port}/macs/{mac_address}64 path_networks = 'networks'65 path_network = path_networks + '/%s'66 path_port = path_network + '/%s_%s'67 path_macs = path_port + '/macs'68 path_mac = path_macs + '/%s'69 def __init__(self, address):70 super(OFPClientV1_0, self).__init__(OFPClientV1_0.version, address)71 def get_networks(self):72 return self._do_request_read('GET', self.path_networks)73 def create_network(self, network_id):74 self._do_request('POST', self.path_network % network_id)75 def update_network(self, network_id):76 self._do_request('PUT', self.path_network % network_id)77 def delete_network(self, network_id):78 self._do_request('DELETE', self.path_network % network_id)79 def get_ports(self, network_id):80 return self._do_request_read('GET', self.path_network % network_id)81 def create_port(self, network_id, dpid, port):82 self._do_request('POST', self.path_port % (network_id, dpid, port))83 def update_port(self, network_id, dpid, port):84 self._do_request('PUT', self.path_port % (network_id, dpid, port))85 def delete_port(self, network_id, dpid, port):86 self._do_request('DELETE', self.path_port % (network_id, dpid, port))87 def list_macs(self, network_id, dpid, port):88 return self._do_request_read('GET',89 self.path_macs % (network_id, dpid, port))90 def create_mac(self, network_id, dpid, port, mac_address):91 self._do_request('POST', self.path_mac % (network_id, dpid, port,92 mac_address))93 def update_mac(self, network_id, dpid, port, mac_address):94 self._do_request('PUT', self.path_mac % (network_id, dpid, port,95 mac_address))96OFPClient = OFPClientV1_097class TunnelClientV1_0(RyuClientBase):98 version = 'v1.0'99 # /tunnels/networks/{network-id}/key/{tunnel_key}100 # /tunnels/switches/{dpid}/ports/{port-id}/{remote_dpip}101 path_tunnels = 'tunnels'102 path_key = path_tunnels + '/networks/%(network_id)s/key'103 path_tunnel_key = path_key + '/%(tunnel_key)s'104 path_ports = path_tunnels + '/switches/%(dpid)s/ports'105 path_port = path_ports + '/%(port_no)s'106 path_remote_dpid = path_port + '/%(remote_dpid)s'107 def __init__(self, address):108 super(TunnelClientV1_0, self).__init__(self.version, address)109 def get_tunnel_key(self, network_id):110 return self._do_request_read('GET', self.path_key % locals())111 def delete_tunnel_key(self, network_id):112 return self._do_request_read('DELETE', self.path_key % locals())113 def create_tunnel_key(self, network_id, tunnel_key):114 self._do_request('POST', self.path_tunnel_key % locals())115 def update_tunnel_key(self, network_id, tunnel_key):116 self._do_request('PUT', self.path_tunnel_key % locals())117 def list_ports(self, dpid):118 return self._do_request_read('GET', self.path_ports % locals())119 def delete_port(self, dpid, port_no):120 return self._do_request_read('DELETE', self.path_port % locals())121 def get_remote_dpid(self, dpid, port_no):122 return self._do_request_read('GET', self.path_port % locals())123 def create_remote_dpid(self, dpid, port_no, remote_dpid):124 self._do_request('POST', self.path_remote_dpid % locals())125 def update_remote_dpid(self, dpid, port_no, remote_dpid):126 self._do_request('PUT', self.path_remote_dpid % locals())127TunnelClient = TunnelClientV1_0128class SwitchConfClientV1_0(RyuClientBase):129 version = 'v1.0'130 # /conf/switches131 # /conf/switches/<dpid>132 # /conf/switches/<dpid>/<key>133 path_conf_switches = 'conf/switches'134 path_switch = path_conf_switches + '/%(dpid)s'135 path_key = path_switch + '/%(key)s'136 def __init__(self, address):137 super(SwitchConfClientV1_0, self).__init__(self.version, address)138 def list_switches(self):139 return self._do_request_read('GET', self.path_conf_switches)140 def delete_switch(self, dpid):141 self._do_request('DELETE', self.path_switch % locals())142 def list_keys(self, dpid):143 return self._do_request_read('GET', self.path_switch % locals())144 def set_key(self, dpid, key, value):145 self._do_request('PUT', self.path_key % locals(), value)146 def get_key(self, dpid, key):147 return self._do_request_read('GET', self.path_key % locals())148 def delete_key(self, dpid, key):149 self._do_request('DELETE', self.path_key % locals())150SwitchConfClient = SwitchConfClientV1_0151class QuantumIfaceClientV1_0(RyuClientBase):152 version = 'v1.0'153 # /quantum/ports154 # /quantum/ports/{iface_id}155 # /quantum/ports/{iface_id}/keys/156 # /quantum/ports/{iface_id}/keys/{key}/{value}157 path_quantum_ports = 'quantum/ports'158 path_iface_id = path_quantum_ports + '/%(iface_id)s'159 path_keys = path_iface_id + '/keys'160 path_key = path_keys + '/%(key)s'161 path_value = path_key + '/%(value)s'162 def __init__(self, address):163 super(QuantumIfaceClientV1_0, self).__init__(self.version, address)164 def list_ifaces(self):165 return self._do_request_read('GET', self.path_quantum_ports)166 def delete_iface(self, iface_id):167 self._do_request('DELETE', self.path_iface_id % locals())168 def list_keys(self, iface_id):169 return self._do_request_read('GET', self.path_keys % locals())170 def get_key(self, iface_id, key):171 return self._do_request_read('GET', self.path_key % locals())172 def create_key(self, iface_id, key, value):173 self._do_request('POST', self.path_value % locals())174 def update_key(self, iface_id, key, value):175 self._do_request('PUT', self.path_value % locals())176 # for convenience177 def get_network_id(self, iface_id):178 return self.get_key(iface_id, 'network_id')179 def create_network_id(self, iface_id, network_id):180 self.create_key(iface_id, 'network_id', network_id)181 def update_network_id(self, iface_id, network_id):182 self.update_key(iface_id, 'network_id', network_id)183QuantumIfaceClient = QuantumIfaceClientV1_0184NeutronIfaceClient = QuantumIfaceClient # project rename quantum -> neutron185class TopologyClientV1_0(RyuClientBase):186 version = 'v1.0'187 # /topology/switches188 # /topology/switches/{dpid}189 # /topology/links190 # /topology/links/{dpid}191 _path_switches = 'topology/switches'192 _path_links = 'topology/links'193 def __init__(self, address):194 super(TopologyClientV1_0, self).__init__(self.version, address)195 # dpid: string representation (see ryu.lib.dpid)196 # if None, get all197 def list_switches(self, dpid=None):198 uri = self._path_switches199 if dpid:200 uri += '/%s' % (dpid)201 return self._do_request('GET', uri)202 # dpid: string representation (see ryu.lib.dpid)203 # if None, get all204 def list_links(self, dpid=None):205 uri = self._path_links206 if dpid:207 uri += '/%s' % (dpid)208 return self._do_request('GET', uri)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!