Best Python code snippet using autotest_python
bc_record.py
Source:bc_record.py
...170 up_to = datetime.datetime.now() - datetime.timedelta(seconds=retention_seconds)171 return int(time.mktime(up_to.timetuple())*1000)172 def __get_ts(self, event_date):173 return int(time.mktime(event_date.timetuple())*1000)174 def is_cgroup(self, data_cgroup):175 if data_cgroup.startswith('/'):176 return True177 else:178 return False179 def is_docker(self, data_cpuset):180 if '/docker' in data_cpuset:181 return True182 return False183 def is_slurm(self, data_cpuset):184 if '/slurm' in data_cpuset:185 cpuset = data_cpuset.split('/')186 for fsg in cpuset:187 if fsg.startswith('job_'):188 return (True, fsg)189 return (False, None)190 def callback_record(self, ch, method, properties, body):191 try:192 rt = json.loads(body)193 content = rt['event']194 logging.debug('Message: %s' % (content))195 if content is None or 'evt_type' not in content:196 ch.basic_ack(delivery_tag=method.delivery_tag)197 return198 if content['evt_type'] == 'fd':199 for data in content['data']:200 is_cgroup = self.is_cgroup(data[2])201 if is_cgroup:202 if self.is_docker(data[2]):203 continue204 (is_slurm, cgroup) = self.is_slurm(data[2])205 if is_slurm:206 data[2] = cgroup207 # event['in'], event['out'], event['proc'], event['name'], event['container']208 event = {209 'proc': int(data[0]),210 'container': data[2],211 'name': data[3],212 'in': 0,213 'out': 0,214 'in_out': 0,215 'start': long(content['ts'])/1000000,216 'ts': long(content['ts'])217 }218 if data[4] == 'in':219 event['in'] = data[5]220 else:221 event['out'] = data[5]222 event['in_out'] += data[5]223 self.__add_fd(event)224 elif content['evt_type'] == 'cpu':225 for data in content['data']:226 is_cgroup = self.is_cgroup(data[5])227 if is_cgroup:228 if self.is_docker(data[5]):229 continue230 (is_slurm, cgroup) = self.is_slurm(data[5])231 if is_slurm:232 data[5] = cgroup233 is_root = 0234 if is_cgroup and is_slurm and 'slurm_script' in data[7]:235 is_root = 1236 if int(data[3]) == 1:237 is_root = 1238 event = {239 'cpu': int(data[0]),240 'proc_name': data[1],...
cgroup_common.py
Source:cgroup_common.py
...75 process = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE,76 stdout=subprocess.PIPE,77 stderr=subprocess.PIPE, close_fds=True)78 return process79 def is_cgroup(self, pid, pwd):80 """81 Checks if the 'pid' process is in 'pwd' cgroup82 @param pid: pid of the process83 @param pwd: cgroup directory84 @return: 0 when is 'pwd' member85 """86 if open(pwd + '/tasks').readlines().count("%d\n" % pid) > 0:87 return 088 else:89 return -190 def is_root_cgroup(self, pid):91 """92 Checks if the 'pid' process is in root cgroup (WO cgroup)93 @param pid: pid of the process94 @return: 0 when is 'root' member95 """96 return self.is_cgroup(pid, self.root)97 def set_cgroup(self, pid, pwd):98 """99 Sets cgroup membership100 @param pid: pid of the process101 @param pwd: cgroup directory102 @return: 0 when PASSED103 """104 try:105 open(pwd+'/tasks', 'w').write(str(pid))106 except Exception, inst:107 logging.error("cg.set_cgroup(): %s" , inst)108 return -1109 if self.is_cgroup(pid, pwd):110 logging.error("cg.set_cgroup(): Setting %d pid into %s cgroup "111 "failed", pid, pwd)112 return -1113 else:114 return 0115 def set_root_cgroup(self, pid):116 """117 Resets the cgroup membership (sets to root)118 @param pid: pid of the process119 @return: 0 when PASSED120 """121 return self.set_cgroup(pid, self.root)122 def get_prop(self, prop, pwd=None, supress=False):123 """...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!