Best Python code snippet using autotest_python
qos_tc_impl.py
Source:qos_tc_impl.py
1"""2Copyright 2020 The Magma Authors.3This source code is licensed under the BSD-style license found in the4LICENSE file in the root directory of this source tree.5Unless required by applicable law or agreed to in writing, software6distributed under the License is distributed on an "AS IS" BASIS,7WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.8See the License for the specific language governing permissions and9limitations under the License.10"""11from typing import List, Optional # noqa12import os13import shlex14import subprocess15import logging16from lte.protos.policydb_pb2 import FlowMatch17from .types import QosInfo18from .utils import IdManager19LOG = logging.getLogger('pipelined.qos.qos_tc_impl')20# LOG.setLevel(logging.DEBUG)21# this code can run in either a docker container(CWAG) or as a native22# python process(AG). When we are running as a root there is no need for23# using sudo. (related to task T63499189 where tc commands failed since24# sudo wasn't available in the docker container)25def argSplit(cmd: str) -> List[str]:26 args = [] if os.geteuid() == 0 else ["sudo"]27 args.extend(shlex.split(cmd))28 return args29def run_cmd(cmd_list, show_error=True) -> int:30 err = 031 for cmd in cmd_list:32 LOG.debug("running %s", cmd)33 try:34 args = argSplit(cmd)35 subprocess.check_call(args)36 except subprocess.CalledProcessError as e:37 err = e.returncode38 if show_error:39 LOG.error("%s error running %s ", str(e), cmd)40 return err41# TODO - replace this implementation with pyroute2 tc42ROOT_QID = 6553443DEFAULT_RATE = '12Kbit'44DEFAULT_INTF_SPEED = '1000'45class TrafficClass:46 """47 Creates/Deletes queues in linux. Using Qdiscs for flow based48 rate limiting(traffic shaping) of user traffic.49 """50 @staticmethod51 def delete_class(intf: str, qid: int, show_error=True) -> int:52 qid_hex = hex(qid)53 # delete filter if this is a leaf class54 filter_cmd = "tc filter del dev {intf} protocol ip parent 1: prio 1 "55 filter_cmd += "handle {qid} fw flowid 1:{qid}"56 filter_cmd = filter_cmd.format(intf=intf, qid=qid_hex)57 tc_cmd = "tc class del dev {intf} classid 1:{qid}".format(intf=intf,58 qid=qid_hex)59 return run_cmd([filter_cmd, tc_cmd], show_error)60 @staticmethod61 def create_class(intf: str, qid: int, max_bw: int, rate=None,62 parent_qid=None, show_error=True) -> int:63 if not rate:64 rate = DEFAULT_RATE65 if not parent_qid:66 parent_qid = ROOT_QID67 if parent_qid == qid:68 # parent qid should only be self for root case, everything else69 # should be the child of root class70 LOG.error('parent and self qid equal, setting parent_qid to root')71 parent_qid = ROOT_QID72 qid_hex = hex(qid)73 parent_qid_hex = hex(parent_qid)74 tc_cmd = "tc class add dev {intf} parent 1:{parent_qid} "75 tc_cmd += "classid 1:{qid} htb rate {rate} ceil {maxbw} prio 2"76 tc_cmd = tc_cmd.format(intf=intf, parent_qid=parent_qid_hex,77 qid=qid_hex, rate=rate, maxbw=max_bw)78 # delete if exists79 TrafficClass.delete_class(intf, qid, show_error=False)80 run_cmd([tc_cmd], show_error=show_error)81 # add filter82 filter_cmd = "tc filter add dev {intf} protocol ip parent 1: prio 1 "83 filter_cmd += "handle {qid} fw flowid 1:{qid}"84 filter_cmd = filter_cmd.format(intf=intf, qid=qid_hex)85 # add qdisc and filter86 return run_cmd([filter_cmd], show_error)87 @staticmethod88 def init_qdisc(intf: str, show_error=False) -> int:89 speed = DEFAULT_INTF_SPEED90 qid_hex = hex(ROOT_QID)91 fn = "/sys/class/net/{intf}/speed".format(intf=intf)92 try:93 with open(fn) as f:94 speed = f.read().strip()95 except OSError:96 LOG.error('unable to read speed from %s defaulting to %s', fn, speed)97 qdisc_cmd = "tc qdisc add dev {intf} root handle 1: htb".format(intf=intf)98 parent_q_cmd = "tc class add dev {intf} parent 1: classid 1:{root_qid} htb "99 parent_q_cmd +="rate {speed}Mbit ceil {speed}Mbit"100 parent_q_cmd = parent_q_cmd.format(intf=intf, root_qid=qid_hex, speed=speed)101 tc_cmd = "tc class add dev {intf} parent 1:{root_qid} classid 1:1 htb "102 tc_cmd += "rate {rate} ceil {speed}Mbit"103 tc_cmd = tc_cmd.format(intf=intf, root_qid=qid_hex, rate=DEFAULT_RATE,104 speed=speed)105 return run_cmd([qdisc_cmd, parent_q_cmd, tc_cmd], show_error)106 @staticmethod107 def read_all_classes(intf: str):108 qid_list = []109 # example output of this command110 # b'class htb 1:1 parent 1:fffe prio 0 rate 12Kbit ceil 1Gbit burst \111 # 1599b cburst 1375b \nclass htb 1:fffe root rate 1Gbit ceil 1Gbit \112 # burst 1375b cburst 1375b \n'113 # we need to parse this output and extract class ids from here114 tc_cmd = "tc class show dev {}".format(intf)115 args = argSplit(tc_cmd)116 try:117 output = subprocess.check_output(args)118 for ln in output.decode('utf-8').split("\n"):119 ln = ln.strip()120 if not ln:121 continue122 tok = ln.split()123 if len(tok) < 5:124 continue125 if tok[1] != "htb":126 continue127 if tok[3] == 'root':128 continue129 qid_str = tok[2].split(':')[1]130 qid = int(qid_str, 16)131 pqid_str = tok[4].split(':')[1]132 pqid = int(pqid_str, 16)133 qid_list.append((qid, pqid))134 except subprocess.CalledProcessError as e:135 LOG.error('failed extracting classids from tc %s', str(e))136 return qid_list137 @staticmethod138 def dump_class_state(intf: str, qid: int):139 qid_hex = hex(qid)140 tc_cmd = "tc -s -d class show dev {} classid 1:{}".format(intf,141 qid_hex)142 args = argSplit(tc_cmd)143 try:144 output = subprocess.check_output(args)145 print(output.decode())146 except subprocess.CalledProcessError:147 print("Exception dumping Qos State for %s", intf)148 @staticmethod149 def get_class_rate(intf: str, qid: int) -> Optional[str]:150 qid_hex = hex(qid)151 tc_cmd = "tc class show dev {} classid 1:{}".format(intf, qid_hex)152 args = argSplit(tc_cmd)153 try:154 # output: class htb 1:3 parent 1:2 prio 2 rate 250Kbit ceil 500Kbit burst 1600b cburst 1600b155 output = subprocess.check_output(args)156 # return all config from 'rate' onwards157 config = output.split("rate")158 return config[1]159 except subprocess.CalledProcessError:160 print("Exception dumping Qos State for %s", intf)161class TCManager(object):162 """163 Creates/Deletes queues in linux. Using Qdiscs for flow based164 rate limiting(traffic shaping) of user traffic.165 Queues are created on an egress interface and flows166 in OVS are programmed with qid to filter traffic to the queue.167 Traffic matching a specific flow is filtered to a queue and is168 rate limited based on configured value.169 Traffic to flows with no QoS configuration are sent to a170 default queue and are not rate limited.171 """172 def __init__(self,173 datapath,174 loop,175 config) -> None:176 self._datapath = datapath177 self._loop = loop178 self._uplink = config['nat_iface']179 self._downlink = config['enodeb_iface']180 self._max_rate = config["qos"]["max_rate"]181 self._start_idx, self._max_idx = (config['qos']['linux_tc']['min_idx'],182 config['qos']['linux_tc']['max_idx'])183 self._id_manager = IdManager(self._start_idx, self._max_idx)184 self._initialized = True185 LOG.info("Init LinuxTC module uplink:%s downlink:%s",186 config['nat_iface'], config['enodeb_iface'])187 def destroy(self, ):188 LOG.info("destroying existing qos classes")189 # ensure ordering during deletion of classes, children should be deleted190 # prior to the parent class ids191 for intf in [self._uplink, self._downlink]:192 qid_list = TrafficClass.read_all_classes(intf)193 for qid_tuple in qid_list:194 (qid, pqid) = qid_tuple195 if self._start_idx <= qid < (self._max_idx - 1):196 LOG.info("Attemting to delete class idx %d", qid)197 TrafficClass.delete_class(intf, qid, show_error=False)198 if self._start_idx <= pqid < (self._max_idx - 1):199 LOG.info("Attemting to delete parent class idx %d", pqid)200 TrafficClass.delete_class(intf, pqid, show_error=False)201 def setup(self, ):202 # initialize new qdisc203 TrafficClass.init_qdisc(self._uplink)204 TrafficClass.init_qdisc(self._downlink)205 def get_action_instruction(self, qid: int):206 # return an action and an instruction corresponding to this qid207 if qid < self._start_idx or qid > (self._max_idx - 1):208 LOG.error("invalid qid %d, no action/inst returned", qid)209 return210 parser = self._datapath.ofproto_parser211 return parser.OFPActionSetField(pkt_mark=qid), None212 def add_qos(self, d: FlowMatch.Direction, qos_info: QosInfo,213 parent=None) -> int:214 LOG.debug("add QoS: %s", qos_info)215 qid = self._id_manager.allocate_idx()216 intf = self._uplink if d == FlowMatch.UPLINK else self._downlink217 TrafficClass.create_class(intf, qid, qos_info.mbr,218 rate=qos_info.gbr,219 parent_qid=parent)220 LOG.debug("assigned qid: %d", qid)221 return qid222 def remove_qos(self, qid: int, d: FlowMatch.Direction,223 recovery_mode=False):224 if not self._initialized and not recovery_mode:225 return226 if qid < self._start_idx or qid > (self._max_idx - 1):227 LOG.error("invalid qid %d, removal failed", qid)228 return229 LOG.debug("deleting qos_handle %s", qid)230 intf = self._uplink if d == FlowMatch.UPLINK else self._downlink231 err = TrafficClass.delete_class(intf, qid)232 if err == 0:233 self._id_manager.release_idx(qid)234 else:235 LOG.error('error deleting class %d, not releasing idx', qid)236 return237 def read_all_state(self, ):238 LOG.debug("read_all_state")239 st = {}240 ul_qid_list = TrafficClass.read_all_classes(self._uplink)241 dl_qid_list = TrafficClass.read_all_classes(self._downlink)242 for (d, qid_list) in ((FlowMatch.UPLINK, ul_qid_list),243 (FlowMatch.DOWNLINK, dl_qid_list)):244 for qid_tuple in qid_list:245 qid, pqid = qid_tuple246 if qid < self._start_idx or qid > (self._max_idx - 1):247 continue248 st[qid] = {249 'direction': d,250 'ambr_qid': pqid if pqid != self._max_idx else 0,251 }252 self._id_manager.restore_state(st)253 fut = self._loop.create_future()254 LOG.debug("map -> %s", st)255 fut.set_result(st)256 return fut257 def same_qos_config(self, d: FlowMatch.Direction,258 qid1: int, qid2: int) -> bool:259 intf = self._uplink if d == FlowMatch.UPLINK else self._downlink260 config1 = TrafficClass.get_class_rate(intf, qid1)261 config2 = TrafficClass.get_class_rate(intf, qid2)...
set_netem_only.py
Source:set_netem_only.py
1#!/usr/bin/python2# -*- coding: utf-8 -*-3# vim:softtabstop=4:shiftwidth=4:expandtab4# python imports5from logging import info, debug, warn, error6from twisted.internet import defer, reactor7import os8import sys9import time10# umic-mesh imports11from um_measurement import measurement, tests12from um_functions import call13class DumbbellEvaluationMeasurement(measurement.Measurement):14 """This Measurement will run tests of several scenarios:15 - Each scenario is defined by it's flowgrind options.16 - One test of a scenario consists of parallel runs (flows)17 between all pairs defined in the pairs file.18 - One measurement-iteration will run one test of each scenario.19 - The number of iterations is determined by the "iterations" variable.20 """21 def __init__(self):22 """Constructor of the object"""23 self.logprefix=""24 measurement.Measurement.__init__(self)25 self.parser.set_defaults(pairfile = "vmesh_dumbbell_flowgrind_measurement_pairs.lst",26 offset=None, log_dir="out")27 self.parser.add_option('-f', '--pairfile', metavar="PAIRFILE",28 action = 'store', type = 'string', dest = 'pairfile',29 help = 'Set file to load node pairs from [default: %default]')30 self.parser.add_option('-o', '--offset', metavar="OFFSET",31 action = 'store', type = 'string', dest = 'offset',32 help = 'Offset for routers [default: %default]')33 self.later_args_list = []34 def set_option(self):35 """Set options"""36 measurement.Measurement.set_option(self)37 if (self.options.offset == None):38 error("Please give an offset!")39 sys.exit(1)40 @defer.inlineCallbacks41 def run_netem(self, reorder, ackreor, rdelay, delay, ackloss, limit, bottleneckbw, mode):42 fdnode="vmrouter%s" %(int(self.options.offset) + 9) # forward path delay43 frnode="vmrouter%s" %(int(self.options.offset) + 11) # forward path reordering44 qlnode="vmrouter%s" %(int(self.options.offset) + 12) # queue limit45 rrnode="vmrouter%s" %(int(self.options.offset) + 13) # reverse path reordering46 rdnode="vmrouter%s" %(int(self.options.offset) + 15) # reverse path delay47 alnode="vmrouter%s" %(int(self.options.offset) + 14) # ack loss48 info("Setting netem..")49 #forward path delay50 #if not delay == None:51 # tc_cmd = "sudo tc qdisc %s dev eth0 parent 1:5 handle 50: netem delay %ums %ums 20%%" %(mode, delay, (int)(delay * 0.1))52 # rc = yield self.remote_execute(fdnode, tc_cmd)53 #forward path reordering54 if reorder == 0:55 tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:2 handle 10: netem reorder 0%%" %(mode)56 else:57 tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:2 handle 10: netem reorder %u%% \58 reorderdelay %ums %ums 20%%" %(mode, reorder, rdelay, (int)(rdelay * 0.1))59 rc = yield self.remote_execute(frnode, tc_cmd, log_file=sys.stdout)60 #queue limit61 if not limit == None:62 tc_cmd = "sudo tc qdisc %s dev eth0 parent 1:1 handle 10: netem limit %u; \63 sudo tc qdisc %s dev eth0 parent 1:2 handle 20: netem limit %u" %(mode, limit, mode, limit)64 rc = yield self.remote_execute(qlnode, tc_cmd, log_file=sys.stdout)65 #bottleneck bandwidth66 if not bottleneckbw == None:67 tc_cmd = "sudo tc class %s dev eth0 parent 1: classid 1:1 htb rate %umbit; \68 sudo tc class %s dev eth0 parent 1: classid 1:2 htb rate %umbit" %(mode, bottleneckbw, mode, bottleneckbw)69 rc = yield self.remote_execute(qlnode, tc_cmd, log_file=sys.stdout)70 #reverse path reordering71 if ackreor == 0:72 tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:1 handle 10: netem reorder 0%%" %(mode)73 else:74 tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:1 handle 10: netem reorder %u%% \75 reorderdelay %ums %ums 20%%" %(mode, ackreor, rdelay, (int)(rdelay * 0.1))76 rc = yield self.remote_execute(rrnode, tc_cmd, log_file=sys.stdout)77 #reverse path delay78 #if not delay == None:79 # tc_cmd = "sudo tc qdisc %s dev eth0 parent 1:1 handle 50: netem delay %ums %ums 20%%" %(mode, delay, (int)(delay * 0.1))80 # rc = yield self.remote_execute(rdnode, tc_cmd, log_file=sys.stdout)81 #ack loss82 if ackloss == 0:83 tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:1 handle 10: netem drop 0%%" %(mode)84 else:85 tc_cmd = "sudo tc-enhanced-netem qdisc %s dev eth0 parent 1:1 handle 10: netem drop %u%%" %(mode, ackloss)86 rc = yield self.remote_execute(alnode, tc_cmd, log_file=sys.stdout)87 @defer.inlineCallbacks88 def run(self):89 """Main method"""90 delay = 2091 rate = 2092 qlen = int((2*delay*rate)/11.44)+193 rrate = 094 rdelay = delay95 #initial settings for netem96 yield self.run_netem(0, 0, 0, 0, 0, 1000, 100, "add")97 yield self.run_netem(rrate, 0 , rdelay, delay, 0, qlen, rate, "change")98 #reorder, ackreor, rdelay, delay, ackloss, limit, bottleneckbw99 reactor.stop()100 def main(self):101 self.parse_option()102 self.set_option()103 self.run()104 reactor.run()105if __name__ == "__main__":...
tc_controller.py
Source:tc_controller.py
1from flask import Blueprint, request, jsonify2from tc_rules import get, post, delete3from config import if_name4import logging, os5logger = logging.getLogger(__name__)6tc_controller_bp = Blueprint('ai_app', '__name__')7@tc_controller_bp.route('/tc_rules', methods=['GET', 'POST', 'DELETE'])8def handle_uav_data():9 if request.method == 'GET': 10 return jsonify(get())11 if request.method == 'POST': 12 create_rules(request.get_json())13 post(request.get_json())14 return jsonify(get())15 if request.method == 'DELETE': 16 delete_rules()17 delete()18 return jsonify({'result': 'Rules deleted.'})19def create_rules(rules):20 if not rules['apply']: return21 tc_cmd = f"tcset {if_name}"22 tc_cmd_ai = f"tcset {if_name}"23 if 'delay' in rules:24 tc_cmd += f" --delay {rules['delay']} "25 if 'rate' in rules:26 tc_cmd += f" --rate {rules['rate']}"27 tc_cmd_ai += f" --rate {rules['rate']}"28 if 'loss' in rules:29 tc_cmd += f" --loss {rules['loss']}"30 tc_cmd += " --overwrite"31 tc_cmd_ai += " --direction incoming --overwrite"32 # delete_rules()33 # exec_and_log(f"{tc_cmd} --direction incoming")34 # exec_and_log(f"{tc_cmd} --direction outgoing")35 exec_and_log(f"{tc_cmd}")36 exec_and_log(f"sshpass -p 'ffmpeg' ssh root@10.10.21.11 {tc_cmd_ai}")37def delete_rules():38 tc_cmd = f"tcdel {if_name} --all"39 exec_and_log(tc_cmd)40def exec_and_log(cmd):41 logger.info(f"Executing command: \t'{cmd}'")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!