Best Python code snippet using autotest_python
utils_cgroup.py
Source:utils_cgroup.py
...30 self.cgroups.sort(reverse=True)31 for pwd in self.cgroups[:]:32 for task in self.get_property("tasks", pwd):33 if task:34 self.set_root_cgroup(int(task))35 self.rm_cgroup(pwd)36 def initialize(self, modules):37 """38 Initializes object for use.39 @param modules: Array of all available cgroup modules.40 """41 self.root = modules.get_pwd(self.module)42 if not self.root:43 raise error.TestError("cg.initialize(): Module %s not found"44 % self.module)45 def mk_cgroup(self, pwd=None):46 """47 Creates new temporary cgroup48 @param pwd: where to create this cgroup (default: self.root)49 @return: 0 when PASSED50 """51 if pwd == None:52 pwd = self.root53 if isinstance(pwd, int):54 pwd = self.cgroups[pwd]55 try:56 pwd = mkdtemp(prefix='cgroup-', dir=pwd) + '/'57 except Exception, inst:58 raise error.TestError("cg.mk_cgroup(): %s" % inst)59 self.cgroups.append(pwd)60 return pwd61 def rm_cgroup(self, pwd):62 """63 Removes cgroup.64 @param pwd: cgroup directory.65 """66 if isinstance(pwd, int):67 pwd = self.cgroups[pwd]68 try:69 os.rmdir(pwd)70 self.cgroups.remove(pwd)71 except ValueError:72 logging.warn("cg.rm_cgroup(): Removed cgroup which wasn't created"73 "using this Cgroup")74 except Exception, inst:75 raise error.TestError("cg.rm_cgroup(): %s" % inst)76 def test(self, cmd):77 """78 Executes cgroup_client.py with cmd parameter.79 @param cmd: command to be executed80 @return: subprocess.Popen() process81 """82 logging.debug("cg.test(): executing paralel process '%s'", cmd)83 cmd = self._client + ' ' + cmd84 process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,85 stdout=subprocess.PIPE,86 stderr=subprocess.PIPE, close_fds=True)87 return process88 def is_cgroup(self, pid, pwd):89 """90 Checks if the 'pid' process is in 'pwd' cgroup91 @param pid: pid of the process92 @param pwd: cgroup directory93 @return: 0 when is 'pwd' member94 """95 if isinstance(pwd, int):96 pwd = self.cgroups[pwd]97 if open(pwd + '/tasks').readlines().count("%d\n" % pid) > 0:98 return 099 else:100 return -1101 def is_root_cgroup(self, pid):102 """103 Checks if the 'pid' process is in root cgroup (WO cgroup)104 @param pid: pid of the process105 @return: 0 when is 'root' member106 """107 return self.is_cgroup(pid, self.root)108 def set_cgroup(self, pid, pwd=None):109 """110 Sets cgroup membership111 @param pid: pid of the process112 @param pwd: cgroup directory113 """114 if pwd == None:115 pwd = self.root116 if isinstance(pwd, int):117 pwd = self.cgroups[pwd]118 try:119 open(pwd+'/tasks', 'w').write(str(pid))120 except Exception, inst:121 raise error.TestError("cg.set_cgroup(): %s" % inst)122 if self.is_cgroup(pid, pwd):123 raise error.TestError("cg.set_cgroup(): Setting %d pid into %s "124 "cgroup failed" % (pid, pwd))125 def set_root_cgroup(self, pid):126 """127 Resets the cgroup membership (sets to root)128 @param pid: pid of the process129 @return: 0 when PASSED130 """131 return self.set_cgroup(pid, self.root)132 def get_property(self, prop, pwd=None):133 """134 Gets the property value135 @param prop: property name (file)136 @param pwd: cgroup directory137 @return: [] values or None when FAILED138 """139 if pwd == None:140 pwd = self.root141 if isinstance(pwd, int):142 pwd = self.cgroups[pwd]143 try:144 # Remove tailing '\n' from each line145 ret = [_[:-1] for _ in open(pwd+prop, 'r').readlines()]146 if ret:147 return ret148 else:149 return [""]150 except Exception, inst:151 raise error.TestError("cg.get_property(): %s" % inst)152 def set_property_h(self, prop, value, pwd=None, check=True, checkprop=None):153 """154 Sets the one-line property value concerning the K,M,G postfix155 @param prop: property name (file)156 @param value: desired value157 @param pwd: cgroup directory158 @param check: check the value after setup / override checking value159 @param checkprop: override prop when checking the value160 """161 _value = value162 try:163 value = str(value)164 human = {'B': 1,165 'K': 1024,166 'M': 1048576,167 'G': 1073741824,168 'T': 1099511627776169 }170 if human.has_key(value[-1]):171 value = int(value[:-1]) * human[value[-1]]172 except Exception:173 logging.warn("cg.set_prop() fallback into cg.set_property.")174 value = _value175 self.set_property(prop, value, pwd, check, checkprop)176 def set_property(self, prop, value, pwd=None, check=True, checkprop=None):177 """178 Sets the property value179 @param prop: property name (file)180 @param value: desired value181 @param pwd: cgroup directory182 @param check: check the value after setup / override checking value183 @param checkprop: override prop when checking the value184 """185 value = str(value)186 if pwd == None:187 pwd = self.root188 if isinstance(pwd, int):189 pwd = self.cgroups[pwd]190 try:191 open(os.path.join(pwd, prop), 'w').write(value)192 except Exception, inst:193 raise error.TestError("cg.set_property(): %s" % inst)194 if check is not False:195 if check is True:196 check = value197 if checkprop is None:198 checkprop = prop199 _values = self.get_property(checkprop, pwd)200 # Sanitize non printable characters before check201 check = " ".join(check.split())202 if check not in _values:203 raise error.TestError("cg.set_property(): Setting failed: "204 "desired = %s, real values = %s"205 % (repr(check), repr(_values)))206 def smoke_test(self):207 """208 Smoke test209 Module independent basic tests210 """211 pwd = self.mk_cgroup()212 ps = self.test("smoke")213 if ps == None:214 raise error.TestError("cg.smoke_test: Couldn't create process")215 if (ps.poll() != None):216 raise error.TestError("cg.smoke_test: Process died unexpectidly")217 # New process should be a root member218 if self.is_root_cgroup(ps.pid):219 raise error.TestError("cg.smoke_test: Process is not a root member")220 # Change the cgroup221 self.set_cgroup(ps.pid, pwd)222 # Try to remove used cgroup223 try:224 self.rm_cgroup(pwd)225 except error.TestError:226 pass227 else:228 raise error.TestError("cg.smoke_test: Unexpected successful deletion"229 " of the used cgroup")230 # Return the process into the root cgroup231 self.set_root_cgroup(ps.pid)232 # It should be safe to remove the cgroup now233 self.rm_cgroup(pwd)234 # Finish the process235 ps.stdin.write('\n')236 time.sleep(2)237 if (ps.poll() == None):238 raise error.TestError("cg.smoke_test: Process is not finished")239class CgroupModules(object):240 """241 Handles the list of different cgroup filesystems.242 """243 def __init__(self):244 self.modules = []245 self.modules.append([])...
cgroup_common.py
Source:cgroup_common.py
...30 self.cgroups.sort(reverse=True)31 for pwd in self.cgroups[:]:32 for task in self.get_property("tasks", pwd):33 if task:34 self.set_root_cgroup(int(task))35 self.rm_cgroup(pwd)36 def initialize(self, modules):37 """38 Initializes object for use.39 @param modules: Array of all available cgroup modules.40 """41 self.root = modules.get_pwd(self.module)42 if not self.root:43 raise error.TestError("cg.initialize(): Module %s not found"44 % self.module)45 def mk_cgroup(self, root=None):46 """47 Creates new temporary cgroup48 @param root: where to create this cgroup (default: self.root)49 @return: 0 when PASSED50 """51 try:52 if root:53 pwd = mkdtemp(prefix='cgroup-', dir=root) + '/'54 else:55 pwd = mkdtemp(prefix='cgroup-', dir=self.root) + '/'56 except Exception, inst:57 raise error.TestError("cg.mk_cgroup(): %s" % inst)58 self.cgroups.append(pwd)59 return pwd60 def rm_cgroup(self, pwd):61 """62 Removes cgroup.63 @param pwd: cgroup directory.64 """65 try:66 os.rmdir(pwd)67 self.cgroups.remove(pwd)68 except ValueError:69 logging.warn("cg.rm_cgroup(): Removed cgroup which wasn't created"70 "using this Cgroup")71 except Exception, inst:72 raise error.TestError("cg.rm_cgroup(): %s" % inst)73 def test(self, cmd):74 """75 Executes cgroup_client.py with cmd parameter.76 @param cmd: command to be executed77 @return: subprocess.Popen() process78 """79 logging.debug("cg.test(): executing paralel process '%s'", cmd)80 cmd = self._client + ' ' + cmd81 process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,82 stdout=subprocess.PIPE,83 stderr=subprocess.PIPE, close_fds=True)84 return process85 def is_cgroup(self, pid, pwd):86 """87 Checks if the 'pid' process is in 'pwd' cgroup88 @param pid: pid of the process89 @param pwd: cgroup directory90 @return: 0 when is 'pwd' member91 """92 if isinstance(pwd, int):93 pwd = self.cgroups[pwd]94 if open(pwd + '/tasks').readlines().count("%d\n" % pid) > 0:95 return 096 else:97 return -198 def is_root_cgroup(self, pid):99 """100 Checks if the 'pid' process is in root cgroup (WO cgroup)101 @param pid: pid of the process102 @return: 0 when is 'root' member103 """104 return self.is_cgroup(pid, self.root)105 def set_cgroup(self, pid, pwd):106 """107 Sets cgroup membership108 @param pid: pid of the process109 @param pwd: cgroup directory110 """111 if isinstance(pwd, int):112 pwd = self.cgroups[pwd]113 try:114 open(pwd+'/tasks', 'w').write(str(pid))115 except Exception, inst:116 raise error.TestError("cg.set_cgroup(): %s" % inst)117 if self.is_cgroup(pid, pwd):118 raise error.TestError("cg.set_cgroup(): Setting %d pid into %s "119 "cgroup failed" % (pid, pwd))120 def set_root_cgroup(self, pid):121 """122 Resets the cgroup membership (sets to root)123 @param pid: pid of the process124 @return: 0 when PASSED125 """126 return self.set_cgroup(pid, self.root)127 def get_property(self, prop, pwd=None):128 """129 Gets the property value130 @param prop: property name (file)131 @param pwd: cgroup directory132 @return: [] values or None when FAILED133 """134 if pwd == None:135 pwd = self.root136 if isinstance(pwd, int):137 pwd = self.cgroups[pwd]138 try:139 # Remove tailing '\n' from each line140 ret = [_[:-1] for _ in open(pwd+prop, 'r').readlines()]141 if ret:142 return ret143 else:144 return [""]145 except Exception, inst:146 raise error.TestError("cg.get_property(): %s" % inst)147 def set_property_h(self, prop, value, pwd=None, check=True, checkprop=None):148 """149 Sets the one-line property value concerning the K,M,G postfix150 @param prop: property name (file)151 @param value: desired value152 @param pwd: cgroup directory153 @param check: check the value after setup / override checking value154 @param checkprop: override prop when checking the value155 """156 _value = value157 try:158 value = str(value)159 human = {'B': 1,160 'K': 1024,161 'M': 1048576,162 'G': 1073741824,163 'T': 1099511627776164 }165 if human.has_key(value[-1]):166 value = int(value[:-1]) * human[value[-1]]167 except Exception:168 logging.warn("cg.set_prop() fallback into cg.set_property.")169 value = _value170 self.set_property(prop, value, pwd, check, checkprop)171 def set_property(self, prop, value, pwd=None, check=True, checkprop=None):172 """173 Sets the property value174 @param prop: property name (file)175 @param value: desired value176 @param pwd: cgroup directory177 @param check: check the value after setup / override checking value178 @param checkprop: override prop when checking the value179 """180 value = str(value)181 if pwd == None:182 pwd = self.root183 if isinstance(pwd, int):184 pwd = self.cgroups[pwd]185 try:186 open(pwd+prop, 'w').write(value)187 except Exception, inst:188 raise error.TestError("cg.set_property(): %s" % inst)189 if check != False:190 if check == True:191 check = value192 if checkprop == None:193 checkprop = prop194 _values = self.get_property(checkprop, pwd)195 if check not in _values:196 raise error.TestError("cg.set_property(): Setting failed: "197 "desired = %s, real values = %s"198 % (repr(check), repr(_values)))199 def smoke_test(self):200 """201 Smoke test202 Module independent basic tests203 """204 pwd = self.mk_cgroup()205 ps = self.test("smoke")206 if ps == None:207 raise error.TestError("cg.smoke_test: Couldn't create process")208 if (ps.poll() != None):209 raise error.TestError("cg.smoke_test: Process died unexpectidly")210 # New process should be a root member211 if self.is_root_cgroup(ps.pid):212 raise error.TestError("cg.smoke_test: Process is not a root member")213 # Change the cgroup214 self.set_cgroup(ps.pid, pwd)215 # Try to remove used cgroup216 try:217 self.rm_cgroup(pwd)218 except error.TestError:219 pass220 else:221 raise error.TestError("cg.smoke_test: Unexpected successful deletion"222 " of the used cgroup")223 # Return the process into the root cgroup224 self.set_root_cgroup(ps.pid)225 # It should be safe to remove the cgroup now226 self.rm_cgroup(pwd)227 # Finish the process228 ps.stdin.write('\n')229 time.sleep(2)230 if (ps.poll() == None):231 raise error.TestError("cg.smoke_test: Process is not finished")232class CgroupModules(object):233 """234 Handles the list of different cgroup filesystems.235 """236 def __init__(self):237 self.modules = []238 self.modules.append([])...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!