Best Python code snippet using lisa_python
osutils.py
Source:osutils.py
1# Copyright 2017 Red Hat, Inc. All rights reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License"); you may4# not use this file except in compliance with the License. You may obtain5# a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the12# License for the specific language governing permissions and limitations13# under the License.14import errno15import ipaddress16import os17import shutil18import stat19import subprocess20import distro21import jinja222from oslo_config import cfg23from oslo_log import log as logging24import six25import webob26from werkzeug import exceptions27from octavia.common import constants as consts28from octavia.common import exceptions as octavia_exceptions29from octavia.common import utils30CONF = cfg.CONF31LOG = logging.getLogger(__name__)32j2_env = jinja2.Environment(autoescape=True, loader=jinja2.FileSystemLoader(33 os.path.dirname(os.path.realpath(__file__)) + consts.AGENT_API_TEMPLATES))34class BaseOS(object):35 PACKAGE_NAME_MAP = {}36 def __init__(self, os_name):37 self.os_name = os_name38 @classmethod39 def _get_subclasses(cls):40 for subclass in cls.__subclasses__():41 for sc in subclass._get_subclasses():42 yield sc43 yield subclass44 @classmethod45 def get_os_util(cls):46 os_name = distro.id()47 for subclass in cls._get_subclasses():48 if subclass.is_os_name(os_name):49 return subclass(os_name)50 raise octavia_exceptions.InvalidAmphoraOperatingSystem(os_name=os_name)51 @classmethod52 def _map_package_name(cls, package_name):53 return cls.PACKAGE_NAME_MAP.get(package_name, package_name)54 def get_network_interface_file(self, interface):55 if CONF.amphora_agent.agent_server_network_file:56 return CONF.amphora_agent.agent_server_network_file57 if CONF.amphora_agent.agent_server_network_dir:58 return os.path.join(CONF.amphora_agent.agent_server_network_dir,59 interface)60 network_dir = consts.UBUNTU_AMP_NET_DIR_TEMPLATE.format(61 netns=consts.AMPHORA_NAMESPACE)62 return os.path.join(network_dir, interface)63 def create_netns_dir(self, network_dir, netns_network_dir, ignore=None):64 # We need to setup the netns network directory so that the ifup65 # commands used here and in the startup scripts "sees" the right66 # interfaces and scripts.67 try:68 os.makedirs('/etc/netns/' + consts.AMPHORA_NAMESPACE)69 shutil.copytree(70 network_dir,71 '/etc/netns/{netns}/{net_dir}'.format(72 netns=consts.AMPHORA_NAMESPACE,73 net_dir=netns_network_dir),74 symlinks=True,75 ignore=ignore)76 except OSError as e:77 # Raise the error if it's not "File exists" otherwise pass78 if e.errno != errno.EEXIST:79 raise80 def write_vip_interface_file(self, interface_file_path,81 primary_interface, vip, ip, broadcast,82 netmask, gateway, mtu, vrrp_ip, vrrp_version,83 render_host_routes, template_vip):84 # write interface file85 mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH86 # If we are using a consolidated interfaces file, just append87 # otherwise clear the per interface file as we are rewriting it88 # TODO(johnsom): We need a way to clean out old interfaces records89 if CONF.amphora_agent.agent_server_network_file:90 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND91 else:92 flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC93 with os.fdopen(os.open(interface_file_path, flags, mode),94 'w') as text_file:95 text = template_vip.render(96 consts=consts,97 interface=primary_interface,98 vip=vip,99 vip_ipv6=ip.version == 6,100 # For ipv6 the netmask is already the prefix101 prefix=(netmask if ip.version == 6102 else utils.netmask_to_prefix(netmask)),103 broadcast=broadcast,104 netmask=netmask,105 gateway=gateway,106 network=utils.ip_netmask_to_cidr(vip, netmask),107 mtu=mtu,108 vrrp_ip=vrrp_ip,109 vrrp_ipv6=vrrp_version == 6,110 host_routes=render_host_routes,111 topology=CONF.controller_worker.loadbalancer_topology,112 )113 text_file.write(text)114 def write_port_interface_file(self, netns_interface, fixed_ips, mtu,115 interface_file_path, template_port):116 # write interface file117 # If we are using a consolidated interfaces file, just append118 # otherwise clear the per interface file as we are rewriting it119 # TODO(johnsom): We need a way to clean out old interfaces records120 if CONF.amphora_agent.agent_server_network_file:121 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND122 else:123 flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC124 # mode 00644125 mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH126 with os.fdopen(os.open(interface_file_path, flags, mode),127 'w') as text_file:128 text = self._generate_network_file_text(129 netns_interface, fixed_ips, mtu, template_port)130 text_file.write(text)131 @classmethod132 def _generate_network_file_text(cls, netns_interface, fixed_ips, mtu,133 template_port):134 text = ''135 if fixed_ips is None:136 text = template_port.render(interface=netns_interface)137 else:138 for index, fixed_ip in enumerate(fixed_ips, -1):139 try:140 ip_addr = fixed_ip['ip_address']141 cidr = fixed_ip['subnet_cidr']142 ip = ipaddress.ip_address(ip_addr if isinstance(143 ip_addr, six.text_type) else six.u(ip_addr))144 network = ipaddress.ip_network(145 cidr if isinstance(146 cidr, six.text_type) else six.u(cidr))147 broadcast = network.broadcast_address.exploded148 netmask = (network.prefixlen if ip.version == 6149 else network.netmask.exploded)150 host_routes = cls.get_host_routes(fixed_ip)151 except ValueError:152 return webob.Response(153 json=dict(message="Invalid network IP"), status=400)154 new_text = template_port.render(interface=netns_interface,155 ipv6=ip.version == 6,156 ip_address=ip.exploded,157 broadcast=broadcast,158 netmask=netmask,159 mtu=mtu,160 host_routes=host_routes)161 text = '\n'.join([text, new_text])162 return text163 @classmethod164 def get_host_routes(cls, fixed_ip):165 host_routes = []166 for hr in fixed_ip.get('host_routes', []):167 network = ipaddress.ip_network(168 hr['destination'] if isinstance(169 hr['destination'], six.text_type) else170 six.u(hr['destination']))171 host_routes.append({'network': network, 'gw': hr['nexthop']})172 return host_routes173 @classmethod174 def _bring_if_up(cls, interface, what, flush=True):175 # Note, we are not using pyroute2 for this as it is not /etc/netns176 # aware.177 # Work around for bug:178 # https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=845121179 int_up = "ip netns exec {ns} ip link set {int} up".format(180 ns=consts.AMPHORA_NAMESPACE, int=interface)181 addr_flush = "ip netns exec {ns} ip addr flush {int}".format(182 ns=consts.AMPHORA_NAMESPACE, int=interface)183 cmd = ("ip netns exec {ns} ifup {params}".format(184 ns=consts.AMPHORA_NAMESPACE, params=interface))185 try:186 out = subprocess.check_output(int_up.split(),187 stderr=subprocess.STDOUT)188 LOG.debug(out)189 if flush:190 out = subprocess.check_output(addr_flush.split(),191 stderr=subprocess.STDOUT)192 LOG.debug(out)193 out = subprocess.check_output(cmd.split(),194 stderr=subprocess.STDOUT)195 LOG.debug(out)196 except subprocess.CalledProcessError as e:197 LOG.error('Failed to ifup %s due to error: %s %s', interface, e,198 e.output)199 raise exceptions.HTTPException(200 response=webob.Response(json=dict(201 message='Error plugging {0}'.format(what),202 details=e.output), status=500))203 @classmethod204 def _bring_if_down(cls, interface):205 # Note, we are not using pyroute2 for this as it is not /etc/netns206 # aware.207 cmd = ("ip netns exec {ns} ifdown {params}".format(208 ns=consts.AMPHORA_NAMESPACE, params=interface))209 try:210 subprocess.check_output(cmd.split(), stderr=subprocess.STDOUT)211 except subprocess.CalledProcessError as e:212 LOG.info('Ignoring failure to ifdown %s due to error: %s %s',213 interface, e, e.output)214 @classmethod215 def bring_interfaces_up(cls, ip, primary_interface, secondary_interface):216 cls._bring_if_down(primary_interface)217 if secondary_interface:218 cls._bring_if_down(secondary_interface)219 cls._bring_if_up(primary_interface, 'VIP')220 if secondary_interface:221 cls._bring_if_up(secondary_interface, 'VIP', flush=False)222 def has_ifup_all(self):223 return True224class Ubuntu(BaseOS):225 ETH_X_PORT_CONF = 'plug_port_ethX.conf.j2'226 ETH_X_VIP_CONF = 'plug_vip_ethX.conf.j2'227 @classmethod228 def is_os_name(cls, os_name):229 return os_name in ['ubuntu']230 def cmd_get_version_of_installed_package(self, package_name):231 name = self._map_package_name(package_name)232 return "dpkg-query -W -f=${{Version}} {name}".format(name=name)233 def get_network_interface_file(self, interface):234 if CONF.amphora_agent.agent_server_network_file:235 return CONF.amphora_agent.agent_server_network_file236 if CONF.amphora_agent.agent_server_network_dir:237 return os.path.join(CONF.amphora_agent.agent_server_network_dir,238 interface + '.cfg')239 network_dir = consts.UBUNTU_AMP_NET_DIR_TEMPLATE.format(240 netns=consts.AMPHORA_NAMESPACE)241 return os.path.join(network_dir, interface + '.cfg')242 def get_network_path(self):243 return '/etc/network'244 def get_netns_network_dir(self):245 network_dir = self.get_network_path()246 return os.path.basename(network_dir)247 def create_netns_dir(248 self, network_dir=None, netns_network_dir=None, ignore=None):249 if not netns_network_dir:250 netns_network_dir = self.get_netns_network_dir()251 if not network_dir:252 network_dir = self.get_network_path()253 if not ignore:254 ignore = shutil.ignore_patterns('eth0*', 'openssh*')255 super(Ubuntu, self).create_netns_dir(256 network_dir, netns_network_dir, ignore)257 def write_interfaces_file(self):258 name = '/etc/netns/{}/network/interfaces'.format(259 consts.AMPHORA_NAMESPACE)260 flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC261 # mode 00644262 mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH263 with os.fdopen(os.open(name, flags, mode), 'w') as int_file:264 int_file.write('auto lo\n')265 int_file.write('iface lo inet loopback\n')266 if not CONF.amphora_agent.agent_server_network_file:267 int_file.write('source /etc/netns/{}/network/'268 'interfaces.d/*.cfg\n'.format(269 consts.AMPHORA_NAMESPACE))270 def write_vip_interface_file(self, interface_file_path,271 primary_interface, vip, ip, broadcast,272 netmask, gateway, mtu, vrrp_ip, vrrp_version,273 render_host_routes, template_vip=None):274 if not template_vip:275 template_vip = j2_env.get_template(self.ETH_X_VIP_CONF)276 super(Ubuntu, self).write_vip_interface_file(277 interface_file_path, primary_interface, vip, ip, broadcast,278 netmask, gateway, mtu, vrrp_ip, vrrp_version, render_host_routes,279 template_vip)280 def write_port_interface_file(self, netns_interface, fixed_ips, mtu,281 interface_file_path=None,282 template_port=None):283 if not interface_file_path:284 interface_file_path = self.get_network_interface_file(285 netns_interface)286 if not template_port:287 template_port = j2_env.get_template(self.ETH_X_PORT_CONF)288 super(Ubuntu, self).write_port_interface_file(289 netns_interface, fixed_ips, mtu, interface_file_path,290 template_port)291 def has_ifup_all(self):292 return True293class RH(BaseOS):294 ETH_X_PORT_CONF = 'rh_plug_port_ethX.conf.j2'295 ETH_X_VIP_CONF = 'rh_plug_vip_ethX.conf.j2'296 ETH_X_ALIAS_VIP_CONF = 'rh_plug_vip_ethX_alias.conf.j2'297 ROUTE_ETH_X_CONF = 'rh_route_ethX.conf.j2'298 RULE_ETH_X_CONF = 'rh_rule_ethX.conf.j2'299 # The reason of make them as jinja templates is the current scripts force300 # to add the iptables, so leave it now for future extending if possible.301 ETH_IFUP_LOCAL_SCRIPT = 'rh_plug_port_eth_ifup_local.conf.j2'302 ETH_IFDOWN_LOCAL_SCRIPT = 'rh_plug_port_eth_ifdown_local.conf.j2'303 @classmethod304 def is_os_name(cls, os_name):305 return os_name in ['fedora', 'rhel']306 def cmd_get_version_of_installed_package(self, package_name):307 name = self._map_package_name(package_name)308 return "rpm -q --queryformat %{{VERSION}} {name}".format(name=name)309 @staticmethod310 def _get_network_interface_file(prefix, interface):311 if CONF.amphora_agent.agent_server_network_file:312 return CONF.amphora_agent.agent_server_network_file313 if CONF.amphora_agent.agent_server_network_dir:314 network_dir = CONF.amphora_agent.agent_server_network_dir315 else:316 network_dir = consts.RH_AMP_NET_DIR_TEMPLATE.format(317 netns=consts.AMPHORA_NAMESPACE)318 return os.path.join(network_dir, prefix + interface)319 def get_network_interface_file(self, interface):320 return self._get_network_interface_file('ifcfg-', interface)321 def get_alias_network_interface_file(self, interface):322 return self.get_network_interface_file(interface + ':0')323 def get_static_routes_interface_file(self, interface, version):324 route = 'route6-' if version == 6 else 'route-'325 return self._get_network_interface_file(route, interface)326 def get_route_rules_interface_file(self, interface, version):327 rule = 'rule6-' if version == 6 else 'rule-'328 return self._get_network_interface_file(rule, interface)329 def get_network_path(self):330 return '/etc/sysconfig/network-scripts'331 def get_netns_network_dir(self):332 network_full_path = self.get_network_path()333 network_basename = os.path.basename(network_full_path)334 network_dirname = os.path.dirname(network_full_path)335 network_prefixdir = os.path.basename(network_dirname)336 return os.path.join(network_prefixdir, network_basename)337 def create_netns_dir(338 self, network_dir=None, netns_network_dir=None, ignore=None):339 if not netns_network_dir:340 netns_network_dir = self.get_netns_network_dir()341 if not network_dir:342 network_dir = self.get_network_path()343 if not ignore:344 ignore = shutil.ignore_patterns('ifcfg-eth0*', 'ifcfg-lo*')345 super(RH, self).create_netns_dir(346 network_dir, netns_network_dir, ignore)347 # Copy /etc/sysconfig/network file348 src = '/etc/sysconfig/network'349 dst = '/etc/netns/{netns}/sysconfig'.format(350 netns=consts.AMPHORA_NAMESPACE)351 shutil.copy2(src, dst)352 def write_interfaces_file(self):353 # No interfaces file in RH based flavors354 return355 def write_vip_interface_file(self, interface_file_path,356 primary_interface, vip, ip, broadcast,357 netmask, gateway, mtu, vrrp_ip, vrrp_version,358 render_host_routes, template_vip=None):359 if not template_vip:360 template_vip = j2_env.get_template(self.ETH_X_VIP_CONF)361 super(RH, self).write_vip_interface_file(362 interface_file_path, primary_interface, vip, ip, broadcast,363 netmask, gateway, mtu, vrrp_ip, vrrp_version, render_host_routes,364 template_vip)365 # keepalived will handle the VIP if we are on active/standby366 if (ip.version == 4 and367 CONF.controller_worker.loadbalancer_topology ==368 consts.TOPOLOGY_SINGLE):369 # Create an IPv4 alias interface, needed in RH based flavors370 alias_interface_file_path = self.get_alias_network_interface_file(371 primary_interface)372 template_vip_alias = j2_env.get_template(self.ETH_X_ALIAS_VIP_CONF)373 super(RH, self).write_vip_interface_file(374 alias_interface_file_path, primary_interface, vip, ip,375 broadcast, netmask, gateway, mtu, vrrp_ip, vrrp_version,376 render_host_routes, template_vip_alias)377 routes_interface_file_path = (378 self.get_static_routes_interface_file(primary_interface,379 ip.version))380 template_routes = j2_env.get_template(self.ROUTE_ETH_X_CONF)381 self.write_static_routes_interface_file(382 routes_interface_file_path, primary_interface,383 render_host_routes, template_routes, gateway, vip, netmask)384 # keepalived will handle the rule(s) if we are on actvie/standby385 if (CONF.controller_worker.loadbalancer_topology ==386 consts.TOPOLOGY_SINGLE):387 route_rules_interface_file_path = (388 self.get_route_rules_interface_file(primary_interface,389 ip.version))390 template_rules = j2_env.get_template(self.RULE_ETH_X_CONF)391 self.write_static_routes_interface_file(392 route_rules_interface_file_path, primary_interface,393 render_host_routes, template_rules, gateway, vip, netmask)394 self._write_ifup_ifdown_local_scripts_if_possible()395 def write_static_routes_interface_file(self, interface_file_path,396 interface, host_routes,397 template_routes, gateway,398 vip, netmask):399 # write static routes interface file400 mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH401 # TODO(johnsom): We need a way to clean out old interfaces records402 if CONF.amphora_agent.agent_server_network_file:403 flags = os.O_WRONLY | os.O_CREAT | os.O_APPEND404 else:405 flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC406 with os.fdopen(os.open(interface_file_path, flags, mode),407 'w') as text_file:408 text = template_routes.render(409 consts=consts,410 interface=interface,411 host_routes=host_routes,412 gateway=gateway,413 network=utils.ip_netmask_to_cidr(vip, netmask),414 vip=vip,415 topology=CONF.controller_worker.loadbalancer_topology,416 )417 text_file.write(text)418 def write_port_interface_file(self, netns_interface, fixed_ips, mtu,419 interface_file_path=None,420 template_port=None):421 if not interface_file_path:422 interface_file_path = self.get_network_interface_file(423 netns_interface)424 if not template_port:425 template_port = j2_env.get_template(self.ETH_X_PORT_CONF)426 super(RH, self).write_port_interface_file(427 netns_interface, fixed_ips, mtu, interface_file_path,428 template_port)429 if fixed_ips:430 host_routes = []431 host_routes_ipv6 = []432 for fixed_ip in fixed_ips:433 ip_addr = fixed_ip['ip_address']434 ip = ipaddress.ip_address(ip_addr if isinstance(435 ip_addr, six.text_type) else six.u(ip_addr))436 if ip.version == 6:437 host_routes_ipv6.extend(self.get_host_routes(fixed_ip))438 else:439 host_routes.extend(self.get_host_routes(fixed_ip))440 routes_interface_file_path = (441 self.get_static_routes_interface_file(netns_interface, 4))442 template_routes = j2_env.get_template(self.ROUTE_ETH_X_CONF)443 self.write_static_routes_interface_file(444 routes_interface_file_path, netns_interface,445 host_routes, template_routes, None, None, None)446 routes_interface_file_path_ipv6 = (447 self.get_static_routes_interface_file(netns_interface, 6))448 template_routes = j2_env.get_template(self.ROUTE_ETH_X_CONF)449 self.write_static_routes_interface_file(450 routes_interface_file_path_ipv6, netns_interface,451 host_routes_ipv6, template_routes, None, None, None)452 self._write_ifup_ifdown_local_scripts_if_possible()453 @classmethod454 def bring_interfaces_up(cls, ip, primary_interface, secondary_interface):455 if ip.version == 4:456 super(RH, cls).bring_interfaces_up(457 ip, primary_interface, secondary_interface)458 else:459 # Secondary interface is not present in IPv6 configuration460 cls._bring_if_down(primary_interface)461 cls._bring_if_up(primary_interface, 'VIP')462 def has_ifup_all(self):463 return False464 def _write_ifup_ifdown_local_scripts_if_possible(self):465 if self._check_ifup_ifdown_local_scripts_exists():466 template_ifup_local = j2_env.get_template(467 self.ETH_IFUP_LOCAL_SCRIPT)468 self.write_port_interface_if_local_scripts(template_ifup_local)469 template_ifdown_local = j2_env.get_template(470 self.ETH_IFDOWN_LOCAL_SCRIPT)471 self.write_port_interface_if_local_scripts(template_ifdown_local,472 ifup=False)473 def _check_ifup_ifdown_local_scripts_exists(self):474 file_names = ['ifup-local', 'ifdown-local']475 target_dir = '/sbin/'476 res = []477 for file_name in file_names:478 if os.path.exists(os.path.join(target_dir, file_name)):479 res.append(True)480 else:481 res.append(False)482 # This means we only add the scripts when both of them are non-exists483 return not any(res)484 def write_port_interface_if_local_scripts(485 self, template_script, ifup=True):486 file_name = 'ifup' + '-local'487 mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH488 flags = os.O_WRONLY | os.O_CREAT | os.O_TRUNC489 if not ifup:490 file_name = 'ifdown' + '-local'491 with os.fdopen(492 os.open(os.path.join(493 '/sbin/', file_name), flags, mode), 'w') as text_file:494 text = template_script.render()495 text_file.write(text)496 os.chmod(os.path.join('/sbin/', file_name), stat.S_IEXEC)497class CentOS(RH):498 PACKAGE_NAME_MAP = {'haproxy': 'haproxy18'}499 @classmethod500 def is_os_name(cls, os_name):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!