Best Python code snippet using autotest_python
utils_cgroup.py
Source:utils_cgroup.py
...48 self.root = modules.get_pwd(self.module)49 if not self.root:50 raise error.TestError("cg.initialize(): Module %s not found"51 % self.module)52 def __get_cgroup_pwd(self, cgroup):53 """54 Get cgroup's full path55 :param: cgroup: cgroup name56 :return: cgroup's full path57 """58 if not isinstance(cgroup, str):59 raise error.TestError("cgroup type isn't string!")60 return os.path.join(self.root, cgroup) + '/'61 def get_cgroup_name(self, pwd=None):62 """63 Get cgroup's name64 :param: pwd: cgroup name65 :return: cgroup's name66 """67 if pwd is None:68 # root cgroup69 return None70 if isinstance(pwd, int):71 pwd = self.cgroups[pwd]72 # self.root is "/cgroup/blkio," not "/cgroup/blkio/"73 # cgroup is "/cgroup/blkio/test" or "/cgroup/blkio/test/test"74 # expected cgroup name is test/ or test/test/75 if pwd.startswith(self.root + '/'):76 return pwd[len(self.root) + 1:]77 return None78 def get_cgroup_index(self, cgroup):79 """80 Get cgroup's index in cgroups81 :param: cgroup: cgroup name82 :return: index of cgroup83 """84 try:85 if self.__get_cgroup_pwd(cgroup) not in self.cgroups:86 raise error.TestFail("%s not exists!" % cgroup)87 cgroup_pwd = self.__get_cgroup_pwd(cgroup)88 return self.cgroups.index(cgroup_pwd)89 except error.CmdError:90 raise error.TestFail("Find index failed!")91 def mk_cgroup_cgcreate(self, pwd=None, cgroup=None):92 """93 Make a cgroup by cgcreate command94 :params: cgroup: Maked cgroup name95 :return: last cgroup index96 """97 try:98 parent_cgroup = self.get_cgroup_name(pwd)99 if cgroup is None:100 range = "abcdefghijklmnopqrstuvwxyz0123456789"101 sub_cgroup = "cgroup-" + "".join(random.sample(range +102 range.upper(), 6))103 else:104 sub_cgroup = cgroup105 if parent_cgroup is None:106 cgroup = sub_cgroup107 else:108 # Parent cgroup:test. Created cgroup:test1.109 # Whole cgroup name is "test/test1"110 cgroup = os.path.join(parent_cgroup, sub_cgroup)111 if self.__get_cgroup_pwd(cgroup) in self.cgroups:112 raise error.TestFail("%s exists!" % cgroup)113 cgcreate_cmd = "cgcreate -g %s:%s" % (self.module, cgroup)114 utils.run(cgcreate_cmd, ignore_status=False)115 pwd = self.__get_cgroup_pwd(cgroup)116 self.cgroups.append(pwd)117 return len(self.cgroups) - 1118 except error.CmdError:119 raise error.TestFail("Make cgroup by cgcreate failed!")120 def mk_cgroup(self, pwd=None, cgroup=None):121 """122 Creates new temporary cgroup123 :param pwd: where to create this cgroup (default: self.root)124 :param cgroup: desired cgroup name125 :return: last cgroup index126 """127 if pwd is None:128 pwd = self.root129 if isinstance(pwd, int):130 pwd = self.cgroups[pwd]131 try:132 if cgroup and self.__get_cgroup_pwd(cgroup) in self.cgroups:133 raise error.TestFail("%s exists!" % cgroup)134 if not cgroup:135 pwd = mkdtemp(prefix='cgroup-', dir=pwd) + '/'136 else:137 pwd = os.path.join(pwd, cgroup) + '/'138 if not os.path.exists(pwd):139 os.mkdir(pwd)140 except Exception, inst:141 raise error.TestError("cg.mk_cgroup(): %s" % inst)142 self.cgroups.append(pwd)143 return len(self.cgroups) - 1144 def cgexec(self, cgroup, cmd, args=""):145 """146 Execute command in desired cgroup147 :param: cgroup: Desired cgroup148 :param: cmd: Executed command149 :param: args: Executed command's parameters150 """151 try:152 args_str = ""153 if len(args):154 args_str = " ".join(args)155 cgexec_cmd = ("cgexec -g %s:%s %s %s" %156 (self.module, cgroup, cmd, args_str))157 status, output = commands.getstatusoutput(cgexec_cmd)158 return status, output159 except error.CmdError, detail:160 raise error.TestFail("Execute %s in cgroup failed!\n%s" %161 (cmd, detail))162 def rm_cgroup(self, pwd):163 """164 Removes cgroup.165 :param pwd: cgroup directory.166 """167 if isinstance(pwd, int):168 pwd = self.cgroups[pwd]169 try:170 os.rmdir(pwd)171 self.cgroups.remove(pwd)172 except ValueError:173 logging.warn("cg.rm_cgroup(): Removed cgroup which wasn't created"174 "using this Cgroup")175 except Exception, inst:176 raise error.TestError("cg.rm_cgroup(): %s" % inst)177 def cgdelete_all_cgroups(self):178 """179 Delete all cgroups in the module180 """181 try:182 for cgroup_pwd in self.cgroups:183 # Ignore sub cgroup184 cgroup = self.get_cgroup_name(cgroup_pwd)185 if cgroup.count("/") > 0:186 continue187 self.cgdelete_cgroup(cgroup, True)188 except error.CmdError:189 raise error.TestFail("cgdelete all cgroups in %s failed!"190 % self.module)191 def cgdelete_cgroup(self, cgroup, recursive=False):192 """193 Delete desired cgroup.194 :params cgroup: desired cgroup195 :params force:If true, sub cgroup can be deleted with parent cgroup196 """197 try:198 cgroup_pwd = self.__get_cgroup_pwd(cgroup)199 if cgroup_pwd not in self.cgroups:200 raise error.TestError("%s doesn't exist!" % cgroup)201 cmd = "cgdelete %s:%s" % (self.module, cgroup)202 if recursive:203 cmd += " -r"204 utils.run(cmd, ignore_status=False)205 self.cgroups.remove(cgroup_pwd)206 except error.CmdError, detail:207 raise error.TestFail("cgdelete %s failed!\n%s" %208 (cgroup, detail))209 def cgclassify_cgroup(self, pid, cgroup):210 """211 Classify pid into cgroup212 :param pid: pid of the process213 :param cgroup: cgroup name214 """215 try:216 cgroup_pwd = self.__get_cgroup_pwd(cgroup)217 if cgroup_pwd not in self.cgroups:218 raise error.TestError("%s doesn't exist!" % cgroup)219 cgclassify_cmd = ("cgclassify -g %s:%s %d" %220 (self.module, cgroup, pid))221 utils.run(cgclassify_cmd, ignore_status=False)222 except error.CmdError, detail:223 raise error.TestFail("Classify process to tasks file failed!:%s" %224 detail)225 def get_pids(self, pwd=None):226 """227 Get all pids in cgroup228 :params: pwd: cgroup directory229 :return: all pids(list)230 """231 if pwd is None:232 pwd = self.root233 if isinstance(pwd, int):234 pwd = self.cgroups[pwd]235 try:236 return [_.strip() for _ in open(os.path.join(pwd, 'tasks'), 'r')]237 except Exception, inst:238 raise error.TestError("cg.get_pids(): %s" % inst)239 def test(self, cmd):240 """241 Executes cgroup_client.py with cmd parameter.242 :param cmd: command to be executed243 :return: subprocess.Popen() process244 """245 logging.debug("cg.test(): executing parallel process '%s'", cmd)246 cmd = self._client + ' ' + cmd247 process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,248 stdout=subprocess.PIPE,249 stderr=subprocess.PIPE, close_fds=True)250 return process251 def is_cgroup(self, pid, pwd):252 """253 Checks if the 'pid' process is in 'pwd' cgroup254 :param pid: pid of the process255 :param pwd: cgroup directory256 :return: 0 when is 'pwd' member257 """258 if isinstance(pwd, int):259 pwd = self.cgroups[pwd]260 if open(os.path.join(pwd, 'tasks')).readlines().count("%d\n" % pid) > 0:261 return 0262 else:263 return -1264 def is_root_cgroup(self, pid):265 """266 Checks if the 'pid' process is in root cgroup (WO cgroup)267 :param pid: pid of the process268 :return: 0 when is 'root' member269 """270 return self.is_cgroup(pid, self.root)271 def set_cgroup(self, pid, pwd=None):272 """273 Sets cgroup membership274 :param pid: pid of the process275 :param pwd: cgroup directory276 """277 if pwd is None:278 pwd = self.root279 if isinstance(pwd, int):280 pwd = self.cgroups[pwd]281 try:282 open(os.path.join(pwd, 'tasks'), 'w').write(str(pid))283 except Exception, inst:284 raise error.TestError("cg.set_cgroup(): %s" % inst)285 if self.is_cgroup(pid, pwd):286 raise error.TestError("cg.set_cgroup(): Setting %d pid into %s "287 "cgroup failed" % (pid, pwd))288 def refresh_cgroups(self):289 """290 Refresh all cgroups path.291 """292 try:293 cgroups = utils.run("lscgroup").stdout.strip()294 cgroup_list = []295 for line in cgroups.splitlines():296 controllers = line.split(":")[0]297 if set(self.module.split(",")) != set(controllers.split(",")):298 continue299 cgroup_name = line.split(":")[-1]300 if cgroup_name != "/":301 cgroup_list.append(cgroup_name[1:])302 except error.CmdError:303 raise error.TestFail("Get cgroup in %s failed!" % self.module)304 self.cgroups = []305 for cgroup in cgroup_list:306 pwd = self.__get_cgroup_pwd(cgroup)307 self.cgroups.append(pwd)308 def set_root_cgroup(self, pid):309 """310 Resets the cgroup membership (sets to root)311 :param pid: pid of the process312 :return: 0 when PASSED313 """314 return self.set_cgroup(pid, self.root)315 def get_property(self, prop, pwd=None):316 """317 Gets the property value318 :param prop: property name (file)319 :param pwd: cgroup directory320 :return: [] values or None when FAILED...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!