Best Python code snippet using autotest_python
utils_cgroup.py
Source:utils_cgroup.py
...61 """62 if not isinstance(cgroup, str):63 raise error.TestError("cgroup type isn't string!")64 return os.path.join(self.root, cgroup) + '/'65 def get_cgroup_name(self, pwd=None):66 """67 Get cgroup's name68 :param pwd: cgroup name69 :return: cgroup's name70 """71 if pwd is None:72 # root cgroup73 return None74 if isinstance(pwd, int):75 pwd = self.cgroups[pwd]76 # self.root is "/cgroup/blkio," not "/cgroup/blkio/"77 # cgroup is "/cgroup/blkio/test" or "/cgroup/blkio/test/test"78 # expected cgroup name is test/ or test/test/79 if pwd.startswith(self.root + '/'):80 return pwd[len(self.root) + 1:]81 return None82 def get_cgroup_index(self, cgroup):83 """84 Get cgroup's index in cgroups85 :param cgroup: cgroup name86 :return: index of cgroup87 """88 try:89 if self.__get_cgroup_pwd(cgroup) not in self.cgroups:90 raise error.TestFail("%s not exists!" % cgroup)91 cgroup_pwd = self.__get_cgroup_pwd(cgroup)92 return self.cgroups.index(cgroup_pwd)93 except error.CmdError:94 raise error.TestFail("Find index failed!")95 def mk_cgroup_cgcreate(self, pwd=None, cgroup=None):96 """97 Make a cgroup by executing the cgcreate command98 :params: cgroup: name of the cgroup to be created99 :return: last cgroup index100 """101 try:102 parent_cgroup = self.get_cgroup_name(pwd)103 if cgroup is None:104 range = "abcdefghijklmnopqrstuvwxyz0123456789"105 sub_cgroup = "cgroup-" + "".join(random.sample(range +106 range.upper(), 6))107 else:108 sub_cgroup = cgroup109 if parent_cgroup is None:110 cgroup = sub_cgroup111 else:112 # Parent cgroup:test. Created cgroup:test1.113 # Whole cgroup name is "test/test1"114 cgroup = os.path.join(parent_cgroup, sub_cgroup)115 if self.__get_cgroup_pwd(cgroup) in self.cgroups:116 raise error.TestFail("%s exists!" % cgroup)117 cgcreate_cmd = "cgcreate -g %s:%s" % (self.module, cgroup)118 utils.run(cgcreate_cmd, ignore_status=False)119 pwd = self.__get_cgroup_pwd(cgroup)120 self.cgroups.append(pwd)121 return len(self.cgroups) - 1122 except error.CmdError:123 raise error.TestFail("Make cgroup by cgcreate failed!")124 def mk_cgroup(self, pwd=None, cgroup=None):125 """126 Creates new temporary cgroup127 :param pwd: where to create this cgroup (default: self.root)128 :param cgroup: desired cgroup name129 :return: last cgroup index130 """131 if pwd is None:132 pwd = self.root133 if isinstance(pwd, int):134 pwd = self.cgroups[pwd]135 try:136 if cgroup and self.__get_cgroup_pwd(cgroup) in self.cgroups:137 raise error.TestFail("%s exists!" % cgroup)138 if not cgroup:139 pwd = mkdtemp(prefix='cgroup-', dir=pwd) + '/'140 else:141 pwd = os.path.join(pwd, cgroup) + '/'142 if not os.path.exists(pwd):143 os.mkdir(pwd)144 except Exception, inst:145 raise error.TestError("cg.mk_cgroup(): %s" % inst)146 self.cgroups.append(pwd)147 return len(self.cgroups) - 1148 def cgexec(self, cgroup, cmd, args=""):149 """150 Execute command in desired cgroup151 :param cgroup: Desired cgroup152 :param cmd: Executed command153 :param args: Executed command's parameters154 """155 try:156 args_str = ""157 if len(args):158 args_str = " ".join(args)159 cgexec_cmd = ("cgexec -g %s:%s %s %s" %160 (self.module, cgroup, cmd, args_str))161 status, output = commands.getstatusoutput(cgexec_cmd)162 return status, output163 except error.CmdError, detail:164 raise error.TestFail("Execute %s in cgroup failed!\n%s" %165 (cmd, detail))166 def rm_cgroup(self, pwd):167 """168 Removes cgroup.169 :param pwd: cgroup directory.170 """171 if isinstance(pwd, int):172 pwd = self.cgroups[pwd]173 try:174 os.rmdir(pwd)175 self.cgroups.remove(pwd)176 except ValueError:177 logging.warn("cg.rm_cgroup(): Removed cgroup which wasn't created"178 "using this Cgroup")179 except Exception, inst:180 raise error.TestError("cg.rm_cgroup(): %s" % inst)181 def cgdelete_all_cgroups(self):182 """183 Delete all cgroups in the module184 """185 try:186 for cgroup_pwd in self.cgroups:187 # Ignore sub cgroup188 cgroup = self.get_cgroup_name(cgroup_pwd)189 if cgroup.count("/") > 0:190 continue191 self.cgdelete_cgroup(cgroup, True)192 except error.CmdError:193 raise error.TestFail("cgdelete all cgroups in %s failed!"194 % self.module)195 def cgdelete_cgroup(self, cgroup, recursive=False):196 """197 Delete desired cgroup.198 :params cgroup: desired cgroup199 :params force:If true, sub cgroup can be deleted with parent cgroup200 """201 try:202 cgroup_pwd = self.__get_cgroup_pwd(cgroup)203 if cgroup_pwd not in self.cgroups:204 raise error.TestError("%s doesn't exist!" % cgroup)205 cmd = "cgdelete %s:%s" % (self.module, cgroup)206 if recursive:207 cmd += " -r"208 utils.run(cmd, ignore_status=False)209 self.cgroups.remove(cgroup_pwd)210 except error.CmdError, detail:211 raise error.TestFail("cgdelete %s failed!\n%s" %212 (cgroup, detail))213 def cgclassify_cgroup(self, pid, cgroup):214 """215 Classify pid into cgroup216 :param pid: pid of the process217 :param cgroup: cgroup name218 """219 try:220 cgroup_pwd = self.__get_cgroup_pwd(cgroup)221 if cgroup_pwd not in self.cgroups:222 raise error.TestError("%s doesn't exist!" % cgroup)223 cgclassify_cmd = ("cgclassify -g %s:%s %d" %224 (self.module, cgroup, pid))225 utils.run(cgclassify_cmd, ignore_status=False)226 except error.CmdError, detail:227 raise error.TestFail("Classify process to tasks file failed!:%s" %228 detail)229 def get_pids(self, pwd=None):230 """231 Get all pids in cgroup232 :params: pwd: cgroup directory233 :return: all pids(list)234 """235 if pwd is None:236 pwd = self.root237 if isinstance(pwd, int):238 pwd = self.cgroups[pwd]239 try:240 return [_.strip() for _ in open(os.path.join(pwd, 'tasks'), 'r')]241 except Exception, inst:242 raise error.TestError("cg.get_pids(): %s" % inst)243 def test(self, cmd):244 """245 Executes cgroup_client.py with cmd parameter.246 :param cmd: command to be executed247 :return: subprocess.Popen() process248 """249 logging.debug("cg.test(): executing parallel process '%s'", cmd)250 cmd = self._client + ' ' + cmd251 process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,252 stdout=subprocess.PIPE,253 stderr=subprocess.PIPE, close_fds=True)254 return process255 def is_cgroup(self, pid, pwd):256 """257 Checks if the 'pid' process is in 'pwd' cgroup258 :param pid: pid of the process259 :param pwd: cgroup directory260 :return: 0 when is 'pwd' member261 """262 if isinstance(pwd, int):263 pwd = self.cgroups[pwd]264 if open(os.path.join(pwd, 'tasks')).readlines().count("%d\n" % pid) > 0:265 return 0266 else:267 return -1268 def is_root_cgroup(self, pid):269 """270 Checks if the 'pid' process is in root cgroup (WO cgroup)271 :param pid: pid of the process272 :return: 0 when is 'root' member273 """274 return self.is_cgroup(pid, self.root)275 def set_cgroup(self, pid, pwd=None):276 """277 Sets cgroup membership278 :param pid: pid of the process279 :param pwd: cgroup directory280 """281 if pwd is None:282 pwd = self.root283 if isinstance(pwd, int):284 pwd = self.cgroups[pwd]285 try:286 open(os.path.join(pwd, 'tasks'), 'w').write(str(pid))287 except Exception, inst:288 raise error.TestError("cg.set_cgroup(): %s" % inst)289 if self.is_cgroup(pid, pwd):290 raise error.TestError("cg.set_cgroup(): Setting %d pid into %s "291 "cgroup failed" % (pid, pwd))292 def set_root_cgroup(self, pid):293 """294 Resets the cgroup membership (sets to root)295 :param pid: pid of the process296 :return: 0 when PASSED297 """298 return self.set_cgroup(pid, self.root)299 def get_property(self, prop, pwd=None):300 """301 Gets the property value302 :param prop: property name (file)303 :param pwd: cgroup directory304 :return: [] values or None when FAILED305 """306 if pwd is None:307 pwd = self.root308 if isinstance(pwd, int):309 pwd = self.cgroups[pwd]310 try:311 # Remove tailing '\n' from each line312 ret = [_[:-1] for _ in open(os.path.join(pwd, prop), 'r')]313 if ret:314 return ret315 else:316 return [""]317 except Exception, inst:318 raise error.TestError("cg.get_property(): %s" % inst)319 def set_property_h(self, prop, value, pwd=None, check=True, checkprop=None):320 """321 Sets the one-line property value concerning the K,M,G postfix322 :param prop: property name (file)323 :param value: desired value324 :param pwd: cgroup directory325 :param check: check the value after setup / override checking value326 :param checkprop: override prop when checking the value327 """328 _value = value329 try:330 value = str(value)331 human = {'B': 1,332 'K': 1024,333 'M': 1048576,334 'G': 1073741824,335 'T': 1099511627776336 }337 if human.has_key(value[-1]):338 value = int(value[:-1]) * human[value[-1]]339 except Exception:340 logging.warn("cg.set_prop() fallback into cg.set_property.")341 value = _value342 self.set_property(prop, value, pwd, check, checkprop)343 def set_property(self, prop, value, pwd=None, check=True, checkprop=None):344 """345 Sets the property value346 :param prop: property name (file)347 :param value: desired value348 :param pwd: cgroup directory349 :param check: check the value after setup / override checking value350 :param checkprop: override prop when checking the value351 """352 value = str(value)353 if pwd is None:354 pwd = self.root355 if isinstance(pwd, int):356 pwd = self.cgroups[pwd]357 try:358 open(os.path.join(pwd, prop), 'w').write(value)359 except Exception, inst:360 raise error.TestError("cg.set_property(): %s" % inst)361 if check is not False:362 if check is True:363 check = value364 if checkprop is None:365 checkprop = prop366 _values = self.get_property(checkprop, pwd)367 # Sanitize non printable characters before check368 check = " ".join(check.split())369 if check not in _values:370 raise error.TestError("cg.set_property(): Setting failed: "371 "desired = %s, real values = %s"372 % (repr(check), repr(_values)))373 def cgset_property(self, prop, value, pwd=None, check=True, checkprop=None):374 """375 Sets the property value by cgset command376 :param prop: property name (file)377 :param value: desired value378 :param pwd: cgroup directory379 :param check: check the value after setup / override checking value380 :param checkprop: override prop when checking the value381 """382 if pwd is None:383 pwd = self.root384 if isinstance(pwd, int):385 pwd = self.cgroups[pwd]386 try:387 cgroup = self.get_cgroup_name(pwd)388 cgset_cmd = "cgset -r %s='%s' %s" % (prop, value, cgroup)389 utils.run(cgset_cmd, ignore_status=False)390 except error.CmdError, detail:391 raise error.TestFail("Modify %s failed!:\n%s" % (prop, detail))392 if check is not False:393 if check is True:394 check = value395 if checkprop is None:396 checkprop = prop397 _values = self.get_property(checkprop,398 self.get_cgroup_index(cgroup))399 # Sanitize non printable characters before check400 check = " ".join(check.split())401 if check not in _values:...
mem_prof.py
Source:mem_prof.py
...21}22CmdGetCgroupMaxMemUsage = {23'start': 'cgget -g {cgroup_name} | grep memory.max_usage_in_bytes',24}25def get_cgroup_name():26 start_string = '/run/user/20001/limitmem_'27 end_string = '_cgroup_closer'28 grep_results = os.popen(CmdGetCgroupID['start']).read()29 # print grep_results30 # find the latest cgroup name31 start_index = grep_results.rfind(start_string) + len(start_string) 32 # this task executes error. 33 if start_index == -1:34 return "Err" 35 end_index = grep_results.find(end_string, start_index)36 if end_index == -1:37 return "Err"38 cgroup_num = grep_results[start_index: end_index]39 return "memory:limitmem_" + cgroup_num40stop_mark = False41mem_usages = list()42max_mem_usage = 043def cgroup_polling():44 global stop_mark45 global mem_usages46 global max_mem_usage47 while 1:48 cgroup_name = get_cgroup_name()49 if cgroup_name == "Err":50 continue51 break52 print "cgroup_name: " + cgroup_name53 while 1 and (not stop_mark):54 time.sleep(0.01)55 memusage_results = os.popen(CmdGetCgroupMemUsage['start'].format(cgroup_name=cgroup_name)).read()56 cur_memusage = int(memusage_results.rstrip("\n").split()[1])57 # print cur_memusage58 mem_usages.append(cur_memusage)59 max_memusage_results = os.popen(CmdGetCgroupMaxMemUsage['start'].format(cgroup_name=cgroup_name)).read()60 max_mem_usage = int(max_memusage_results.rstrip("\n").split()[1])61 # print max_mem_usage62def kill_keyword(task):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!