Best Python code snippet using slash
core.py
Source:core.py
...21def check_pro(ps_cmd):22 ex_result = ex_cmd(ps_cmd)23 status = ex_result[2]24 return status25def get_last_file(model):26 position_file = position_dir + model + '_position.json'27 with open(position_file,'r') as f:28 data = json.load(f,encoding='utf-8')29 return [data[-1]["inode"],data[-1]["pos"],data[-1]["file"]] #{u'inode': 922009, u'pos': 55574166, u'file': u'/data0/logs/clickstream/staytime.20171221160000'}30def check_size(model):31 size_ngixn_cmd = "du -sb " + get_last_file(model)[2] + "|awk '{print $1}'"32 size_flume_pos,size_nginx_log = int(get_last_file(model)[1]),int(ex_cmd(size_ngixn_cmd)[0])33 if size_flume_pos and size_nginx_log:34 diff_num = size_nginx_log - size_flume_pos35 get_flag = "YES"36 else:37 get_flag = "NO"38 diff_num = 100000039 return {"size_nginx_log": size_nginx_log, "size_flume_pos": size_flume_pos, "diff_num": diff_num, "get_flag": get_flag,"last_file":get_last_file(model)[2]}40def diff_model_allert(model):41 for i in range(3):42 size_list = check_size(model)43 if size_list["get_flag"] == "NO":44 printLog('''model,"æªè·åå°å¯¹æ¯æ°æ®,å°ä¼éè¯2次,é´é20ç§"''',3)45 time.sleep(20)46 continue47 #print model + ": size_flume_pos :", size_list["size_flume_pos"], " size_nginx_log:", size_list["size_nginx_log"], "diff_num:",size_list["diff_num"]48 if size_list["diff_num"] >= allert_num:49 err_mess = "%s åæ¥å»¶è¿è¶
è¿%sB延è¿å¤§å°(æ¥å¿å®é
大å°-flume读å大å°)为:%sB延è¿è¯»åæ件: %s" % (model,allert_num,size_list["diff_num"],size_list["last_file"])50 printLog(err_mess,3)51 global read_err_model_list52 read_err_model_list.append({model:size_list["diff_num"]})53 global err_mess_list...
file_manager.py
Source:file_manager.py
...24 if pair in s and os.path.isfile(os.path.join(path, s))]25 # a.sort(key=lambda s: os.path.getmtime(os.path.join(path, s)))26 return a27 @staticmethod28 def get_last_file(exchange, pair, stream):29 file_name = '{}_{}_{}'.format(exchange,30 pair.upper(),31 str(datetime.now().date()))32 return os.path.join(os.getcwd(), 'files', exchange, stream, file_name)33 @staticmethod34 def does_file_exist(exchange, stream, pair):35 file_path = FileManager.get_last_file(exchange, pair, stream)36 return os.path.exists(file_path)37 @staticmethod38 def save_data_frame_to_file(exchange, stream, pair, df):39 file_path = FileManager.get_last_file(exchange, pair, stream)40 with open(file_path, 'a') as file:41 df.to_csv(file, sep=',', header=file.tell() == 0, index=False)42 @staticmethod43 def save_data_to_file(exchange, stream, pair, data, columns=None):44 file_path = FileManager.get_last_file(exchange, pair, stream)45 with open(file_path, 'a') as file:46 if file.tell() == 0 and columns:47 file.write(columns+"\n")48 file.write(data)49 @staticmethod50 def save_trades_to_file(exchange, stream, pair, trade, columns=None):51 file_path = FileManager.get_last_file(exchange, pair, stream)52 if isinstance(trade, dict):53 with open(file_path, 'a') as file:54 w = csv.DictWriter(file, trade.keys())55 if file.tell() == 0:56 w.writeheader()57 w.writerow(trade)58 elif isinstance(trade, str):59 with open(file_path, 'a') as file:60 if file.tell() == 0 and columns:61 file.write(columns + "\n")...
Efine_RT.py
Source:Efine_RT.py
...39decode_istance=deode_TL.reader("raw_data/")40def decode_data(filename,gemroc):41 out_name=("root_files/")+filename.split("raw_data/")[1].split(".")[0]+".root"42 decode_istance.write_root(filename,gemroc, 0, 0, out_name)43def get_last_file(mypath):44 onlyfiles = [f for f in os.listdir(mypath) if (os.path.isfile(os.path.join(mypath, f)) and 'missing_frames' not in f and "efine" in f)]45 onlyfiles.sort()46 return (os.path.join(mypath,onlyfiles[-1]))47def run_acq():48 while True:49 acquire_data(gemroc,acq_time)50 raw_f=get_last_file("raw_data/")51 decode_data(get_last_file("raw_data/"),gemroc)52 root_f=get_last_file("root_files/")53 f=R.TFile(get_last_file("root_files/"))54 f.tree.Draw("efine","")55 c.Update()56 os.remove(raw_f)57 os.remove(root_f)58 #root.update()59def status_run():60 running = True61 62def status_stop():63 running = False64 65#root = tk.Tk()66#frame = tk.Frame(root)67#frame.pack()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!