Best Python code snippet using Contexts
_NodeFileTransferMethods.py
Source:_NodeFileTransferMethods.py
1import threading2import os3import sys4sys.path.append(os.path.dirname(os.path.realpath(__file__)))5from _utils import file_size, md5, eprint6from _NodeInternalClasses import Future, Thread7def _write_to_file(self, data, filename):8 self._request(-2, data=data, file_name=filename)9def _process__write_to_file(self, request):10 try:11 filename = os.path.abspath(request["data"]["file_name"])12 if self._out_file != None and self._out_file.name != filename:13 self._out_file.close()14 self._out_file = None15 if self._out_file == None:16 if not os.path.isdir(os.path.dirname(filename)):17 os.makedirs(os.path.dirname(filename))18 self._out_file = open(filename, "wb")19 self._out_file.write(request["data"]["data"])20 self._out_file.flush()21 except:22 pass23def _close_file(self):24 self._request(-2)25def _process__close_file(self, request):26 try:27 if self._out_file != None:28 self._out_file.close()29 self._out_file = None30 except:31 pass32def get_file(self, src_filename, dest_filename = None, block=True):33 session_id = self._get_session_id()34 if dest_filename == None:35 dest_filename = os.path.basename(src_filename)36 self._request(session_id, src_filename=src_filename, file_size=file_size(dest_filename), md5=md5(dest_filename), block=block)37 def session():38 response = self._recv_response(session_id)39 if response["cancel"] or response["data"]["same_file"]:40 return41 if not response["success"]:42 eprint(response["traceback"])43 raise response["exception"]44 45 if not os.path.isdir(os.path.dirname(os.path.abspath(dest_filename))):46 os.makedirs(os.path.dirname(os.path.abspath(dest_filename)))47 try:48 file = open(dest_filename, "wb")49 self._respond_ok(session_id)50 file.close()51 except BaseException as e:52 self._respond_exception(session_id, e)53 if block:54 raise e55 else:56 return57 recved_size = 058 full_size = response["data"]["file_size"]59 with open(dest_filename, "wb") as file:60 while recved_size < full_size:61 response = self._recv_response(session_id)62 if response["cancel"]:63 break64 file.write(response["data"]["data"])65 file.flush()66 recved_size += len(response["data"]["data"])67 if block:68 session()69 else:70 thread = threading.Thread(target=session)71 thread.start()72 return Future(self, session_id)73def _process_get_file(self, request):74 session_id = request["session_id"]75 def session():76 src_filename = request["data"]["src_filename"]77 src_file_size = file_size(src_filename)78 same_file = (src_file_size != 0 and request["data"]["file_size"] == src_file_size and \79 request["data"]["md5"] == md5(src_filename))80 try:81 file = open(src_filename, "rb")82 file.close()83 self._respond_ok(session_id, same_file=same_file, file_size=src_file_size)84 except BaseException as e:85 self._respond_exception(session_id, e, same_file=same_file, block=request["block"])86 self._make_signal(session_id)87 return88 if same_file:89 if not request["block"]:90 self._put_result(session_id)91 self._make_signal(session_id)92 return93 response = self._recv_response(session_id)94 if not response["success"]:95 if not request["block"]:96 self._put_exception(session_id, response["exception"])97 self._make_signal(session_id)98 return99 100 block_size = 8192*1024101 sent_size = 0102 with open(src_filename, "rb") as file:103 while sent_size < src_file_size:104 data = file.read(block_size)105 self._respond_ok(session_id, data=data)106 sent_size += len(data)107 if not request["block"]:108 self._put_result(session_id)109 self._make_signal(session_id)110 thread = Thread(target=session)111 thread.start()112 signal = self._recv_signal(session_id)113 if signal["cancel"] and thread.is_alive():114 thread.kill()115 thread.join()116 self._respond_ok(session_id, cancel=True)117 self._put_result(session_id, cancelled=True)118 else:119 thread.join()120def put_file(self, src_filename, dest_filename = None, block=True):121 file = open(src_filename, "rb")122 file.close()123 session_id = self._get_session_id()124 if dest_filename == None:125 dest_filename = os.path.basename(src_filename)126 src_file_size = file_size(src_filename)127 self._stop_send = False128 self._request(session_id, md5=md5(src_filename), file_size=src_file_size, dest_filename=dest_filename, block=block)129 130 def session():131 response = self._recv_response(session_id)132 if response["cancel"] or response["data"]["same_file"]:133 return134 if not response["success"]:135 eprint(response["traceback"])136 raise response["exception"]137 block_size = 8192*1024138 sent_size = 0139 with open(src_filename, "rb") as file:140 while sent_size < src_file_size:141 if self._stop_send:142 break143 data = file.read(block_size)144 self._respond_ok(session_id, data=data)145 sent_size += len(data)146 if block:147 session()148 else:149 thread = Thread(target=session)150 thread.start()151 return Future(self, session_id)152def _process_put_file(self, request):153 session_id = request["session_id"]154 def session():155 dest_filename = request["data"]["dest_filename"]156 full_size = request["data"]["file_size"]157 if full_size == file_size(dest_filename) and \158 request["data"]["md5"] == md5(dest_filename):159 self._respond_ok(session_id, same_file=True, block=request["block"])160 self._make_signal(session_id)161 return162 163 if not os.path.isdir(os.path.dirname(os.path.abspath(dest_filename))):164 os.makedirs(os.path.dirname(os.path.abspath(dest_filename)))165 166 try:167 file = open(dest_filename, "wb")168 file.close()169 self._respond_ok(session_id, same_file=False)170 except BaseException as e:171 self._respond_exception(session_id, e, same_file=False, block=request["block"])172 self._make_signal(session_id)173 return174 recved_size = 0175 with open(dest_filename, "wb") as file:176 while recved_size < full_size:177 response = self._recv_response(session_id) 178 file.write(response["data"]["data"])179 file.flush()180 recved_size += len(response["data"]["data"])181 if not request["block"]:182 self._put_result(session_id)183 self._make_signal(session_id)184 thread = Thread(target=session)185 thread.start()186 signal = self._recv_signal(session_id)187 if signal["cancel"] and thread.is_alive():188 thread.kill()189 thread.join()190 self._respond_ok(session_id, cancel=True)191 self._put_result(session_id, cancelled=True)192 else:193 thread.join()194def get_folder(self, src_foldername, dest_foldername = None, block=True):195 session_id = self._get_session_id()196 if dest_foldername == None:197 dest_foldername = os.path.basename(src_foldername)198 self._request(session_id, src_foldername=src_foldername, block=block)199 # self._request(session_id, src_foldername=src_foldername, block=block, debug="I need folder \"" + src_foldername + "\"")200 def session():201 response = self._recv_response(session_id)202 if response["cancel"]:203 return204 if not response["success"]:205 eprint(response["traceback"])206 raise response["exception"]207 for folder in response["data"]["folders"]:208 if not os.path.isdir(dest_foldername + "/" + folder):209 try:210 os.makedirs(dest_foldername + "/" + folder)211 except:212 eprint(self._traceback(False))213 self._respond_ok(session_id)214 # self._respond_ok(session_id, debug="Created all folders. Please send files.")215 while True:216 response = self._recv_response(session_id)217 if response["cancel"] or response["data"]["finished"]:218 return219 if not response["success"]:220 eprint(response["traceback"])221 continue222 dest_filename = dest_foldername + "/" + response["data"]["file_name"]223 same_file = (response["data"]["file_size"] == file_size(dest_filename) and \224 response["data"]["md5"] == md5(dest_filename))225 if same_file:226 self._respond_ok(session_id, same_file = same_file)227 # self._respond_ok(session_id, same_file = same_file, debug="Already have file: " + dest_filename)228 continue229 else:230 try:231 file = open(dest_filename, "wb")232 self._respond_ok(session_id, same_file = same_file)233 # self._respond_ok(session_id, same_file = same_file, debug="I don't have this file, please send me.")234 file.close()235 except BaseException as e:236 eprint(self._traceback())237 self._respond_exception(session_id, e)238 # self._respond_exception(session_id, e, debug="Error")239 continue240 recved_size = 0241 full_size = response["data"]["file_size"]242 with open(dest_filename, "wb") as file:243 while recved_size < full_size:244 response = self._recv_response(session_id)245 if response["cancel"]:246 return247 file.write(response["data"]["data"])248 file.flush()249 recved_size += len(response["data"]["data"])250 if block:251 session()252 else:253 thread = threading.Thread(target=session)254 thread.start()255 return Future(self, session_id)256def _process_get_folder(self, request):257 session_id = request["session_id"]258 def session():259 src_foldername = request["data"]["src_foldername"]260 if not os.path.isdir(src_foldername):261 self._respond_exception(session_id, FileNotFoundError(src_foldername + " is not a folder."), block=request["block"])262 self._make_signal(session_id)263 return264 i = len(src_foldername)-1265 while src_foldername[i] in ['/', '\\']:266 i -= 1267 folders = []268 for root, dirs, files in os.walk(src_foldername):269 for name in dirs:270 folders.append(os.path.join(root, name)[i+2:])271 self._respond_ok(session_id, folders=folders)272 # self._respond_ok(session_id, folders=folders, debug="Please create folders: " + str(folders))273 response = self._recv_response(session_id)274 for root, dirs, files in os.walk(src_foldername):275 for name in files:276 src_filename = os.path.join(root, name)277 src_file_size = file_size(src_filename)278 relative_file_name = src_filename[i+2:]279 280 try:281 file = open(src_filename, "rb")282 file.close()283 self._respond_ok(session_id, finished=False, file_name=relative_file_name, file_size=src_file_size, md5=md5(src_filename))284 # self._respond_ok(session_id, finished=False, file_name=relative_file_name, file_size=src_file_size, md5=md5(src_filename), debug="Do you have file " + src_filename + "?")285 except BaseException as e:286 self._respond_exception(session_id, e)287 # self._respond_exception(session_id, e, debug="I cannot open file: " + src_filename)288 continue289 response = self._recv_response(session_id)290 if not response["success"] or response["data"]["same_file"]:291 continue292 block_size = 8192*1024293 sent_size = 0294 with open(src_filename, "rb") as file:295 while sent_size < src_file_size:296 data = file.read(block_size)297 self._respond_ok(session_id, data=data)298 # self._respond_ok(session_id, data=data, debug="Please write " + str(len(data)) + " bytes data to file " + relative_file_name)299 sent_size += len(data)300 self._respond_ok(session_id, finished=True)301 # self._respond_ok(session_id, finished=True, debug="Sent all files.")302 if not request["block"]:303 self._put_result(session_id)304 self._make_signal(session_id)305 thread = Thread(target=session)306 thread.start()307 signal = self._recv_signal(session_id)308 if signal["cancel"] and thread.is_alive():309 thread.kill()310 thread.join()311 self._respond_ok(session_id, cancel=True)312 # self._respond_ok(session_id, cancel=True, debug="Cancel.")313 self._put_result(session_id, cancelled=True)314 else:315 thread.join()316def put_folder(self, src_foldername, dest_foldername = None, block = True):317 if not os.path.isdir(src_foldername):318 raise FileNotFoundError(src_foldername + " is not a folder.")319 session_id = self._get_session_id()320 if dest_foldername == None:321 dest_foldername = os.path.basename(src_foldername)322 self._stop_send = False323 self._request(session_id, dest_foldername=dest_foldername, block=block)324 # self._request(session_id, dest_foldername=dest_foldername, block=block, debug="I will send you folder: " + dest_foldername)325 def session():326 response = self._recv_response(session_id)327 if response["cancel"]:328 return329 if not response["success"]:330 eprint(response["traceback"])331 raise response["exception"]332 i = len(src_foldername)-1333 while src_foldername[i] in ['/', '\\']:334 i -= 1335 folders = []336 for root, dirs, files in os.walk(src_foldername):337 for name in dirs:338 folders.append(os.path.join(root, name)[i+2:])339 self._respond_ok(session_id, folders=folders)340 # self._respond_ok(session_id, folders=folders, debug="Please create folders: " + str(folders))341 response = self._recv_response(session_id)342 if response["cancel"]:343 return344 for root, dirs, files in os.walk(src_foldername):345 for name in files:346 src_filename = os.path.join(root, name)347 src_file_size = file_size(src_filename)348 relative_file_name = src_filename[i+2:]349 350 try:351 file = open(src_filename, "rb")352 file.close()353 self._respond_ok(session_id, finished=False, file_name=relative_file_name, file_size=src_file_size, md5=md5(src_filename))354 # self._respond_ok(session_id, finished=False, file_name=relative_file_name, file_size=src_file_size, md5=md5(src_filename), debug="Do you have file " + relative_file_name + "?")355 except BaseException as e:356 self._respond_exception(session_id, e) # , debug="Error")357 continue358 response = self._recv_response(session_id)359 if response["cancel"]:360 return361 if not response["success"] or response["data"]["same_file"]:362 continue363 block_size = 8192*1024364 sent_size = 0365 with open(src_filename, "rb") as file:366 while sent_size < src_file_size:367 if self._stop_send:368 return369 data = file.read(block_size)370 self._respond_ok(session_id, data=data)371 # self._respond_ok(session_id, data=data, debug="Please write " + str(len(data)) + " bytes to file " + relative_file_name)372 sent_size += len(data)373 self._respond_ok(session_id, finished=True)374 # self._respond_ok(session_id, finished=True, debug="Sent all files.")375 if block:376 session()377 else:378 thread = threading.Thread(target=session)379 thread.start()380 return Future(self, session_id)381def _process_put_folder(self, request):382 session_id = request["session_id"]383 def session():384 dest_foldername = request["data"]["dest_foldername"]385 try:386 if not os.path.isdir(dest_foldername):387 os.makedirs(dest_foldername)388 self._respond_ok(session_id)389 # self._respond_ok(session_id, debug="Created folder " + dest_foldername)390 except BaseException as e:391 self._respond_exception(session_id, e, block=request["block"])392 # self._respond_exception(session_id, e, block=request["block"], debug="Cannot create folder " + dest_foldername)393 self._make_signal(session_id)394 return395 response = self._recv_response(session_id)396 for folder in response["data"]["folders"]:397 if not os.path.isdir(dest_foldername + "/" + folder):398 try:399 os.makedirs(dest_foldername + "/" + folder)400 except:401 pass402 self._respond_ok(session_id)403 # self._respond_ok(session_id, debug="Created all folders.")404 while True:405 response = self._recv_response(session_id)406 if not response["success"]:407 continue408 if response["data"]["finished"]:409 break410 dest_filename = dest_foldername + "/" + response["data"]["file_name"]411 same_file = (response["data"]["file_size"] == file_size(dest_filename) and \412 response["data"]["md5"] == md5(dest_filename))413 if same_file:414 self._respond_ok(session_id, same_file = same_file)415 # self._respond_ok(session_id, same_file = same_file, debug="Already have file " + dest_filename)416 continue417 else:418 try:419 file = open(dest_filename, "wb")420 self._respond_ok(session_id, same_file = same_file)421 # self._respond_ok(session_id, same_file = same_file, debug="I don't have this file, please send me.")422 file.close()423 except BaseException as e:424 self._respond_exception(session_id, e)425 # self._respond_exception(session_id, e, debug="Error")426 continue427 recved_size = 0428 full_size = response["data"]["file_size"]429 with open(dest_filename, "wb") as file:430 while recved_size < full_size:431 response = self._recv_response(session_id)432 file.write(response["data"]["data"])433 file.flush()434 recved_size += len(response["data"]["data"])435 436 if not request["block"]:437 self._put_result(session_id)438 self._make_signal(session_id)439 thread = Thread(target=session)440 thread.start()441 signal = self._recv_signal(session_id)442 if signal["cancel"] and thread.is_alive():443 thread.kill()444 thread.join()445 self._respond_ok(session_id, cancel=True)446 # self._respond_ok(session_id, cancel=True, debug="Cancel.")447 self._put_result(session_id, cancelled=True)448 else:...
reqNewItem.py
Source:reqNewItem.py
1from itemRec.newRec import itemSearch2# from newRec import itemSearch3import pandas as pd4DB_user = 'mdmig'5import os6LOCATION = r"C:/Users/ezmedicom/PycharmProjects/pythonProject/instantclient_19_11"7os.environ["PATH"] = LOCATION + ";" + os.environ["PATH"] # íê²½ë³ì ë±ë¡8import cx_Oracle9con = cx_Oracle.connect(DB_user, 'medilinx00', '10.29.22.8:1522/mdvans', encoding="UTF-8")10file_DB = 'ICOMATCH'11req_DB = 'MDVUDIITEMREQUEST'12sql = """SELECT 13 seq14 ,reqDate15 ,reqFileId16 ,ATC.UUID_SEQ AS reqFileNm17 ,ATC.FILE_PATH AS reqFilePath18 ,reqDiv19 ,reqState20 ,sameFileId21 ,accFileId22 ,reqRemark23 ,regUserId24 ,regDate25 ,modDate26 ,addcolumn27 FROM MDVUDIITEMREQUEST REQ28 LEFT JOIN ICOMATCH ATC ON ATC.UUID = REQ.REQFILEID AND ATC.UUID_SEQ = REQ.reqFileIdSeq29 WHERE REQSTATE = 'N'30 AND ATC.FILE_NAME IS NOT NULL31 AND ATC.FILE_PATH IS NOT NULL32 ORDER BY SEQ"""33swtich_df = pd.read_sql("SELECT * FROM MDVUDIITEMREQUEST WHERE REQSTATE = 'P' ORDER BY SEQ DESC", con)34if swtich_df.empty:35 file_df = pd.read_sql(sql, con)36else:37 file_df = pd.DataFrame(columns = ['SEQ', 'REQFILEID', 'REQFILENM', 'REQFILEPATH', 'REQDIV', 'ADDCOLUMN'])38file_df = file_df[file_df['REQFILEID'].notnull()]39file_df = file_df.reset_index(drop=True)40update_df = file_df[['SEQ', 'REQFILEID', 'REQFILENM', 'REQFILEPATH', 'REQDIV', 'ADDCOLUMN']]41update_df['ADDCOLUMN'] = update_df['ADDCOLUMN'].fillna('')42# update_df['REQFILEPATH'] = update_df['REQFILEPATH'].str.replace('Z:', '/z') # Linux43ing_sql = "UPDATE " + req_DB44ing_sql += " SET REQSTATE = 'P', MODDATE = SYSDATE, MODUSERID = 'pyadmin', RESULTMSG = ''"45ing_sql += " WHERE SEQ = :SEQ"46file_sql = " INSERT INTO " + file_DB + " ("47file_sql += " HOUSE_CODE, UUID, UUID_SEQ, FILE_NAME, FILE_PATH, FILE_SIZE, FILE_EXTENSION,"48file_sql += " REAL_FILE_NAME, BIZ_TYPE, ADD_DATE, ADD_TIME, ADD_USER_ID )"49file_sql += " SELECT "50file_sql += " '100', SEQ_ATTACHID.NEXTVAL, :UUID_SEQ, :FILE_NAME, :FILE_PATH, :FILE_SIZE, :FILE_EXTENSION,"51file_sql += " :REAL_FILE_NAME, 'UDI', TO_CHAR(SYSDATE,'YYYYMMDD'), TO_CHAR(SYSDATE,'HH24MISS'), 'pyadmin'"52file_sql += " FROM DUAL "53end1_sql = " UPDATE " + req_DB + " SET (REQSTATE, SAMEFILEID, MODDATE, MODUSERID) ="54end1_sql += " (SELECT DISTINCT 'Y' AS REQSTATE, SAME.UUID AS SAMEFILEID, SYSDATE AS MODDATE, 'pyadmin' AS MODUSERID FROM ICOMATCH"55end1_sql += " LEFT JOIN ICOMATCH SAME ON SAME.FILE_NAME = :SAME_FILE_NAME)"56end1_sql += " WHERE SEQ = :SEQ"57end2_sql = " UPDATE " + req_DB + " SET (REQSTATE, ACCFILEID, MODDATE, MODUSERID) ="58end2_sql += " (SELECT DISTINCT 'Y' AS REQSTATE, ACC.UUID AS ACCFILEID, SYSDATE AS MODDATE, 'pyadmin' AS MODUSERID FROM ICOMATCH"59end2_sql += " LEFT JOIN ICOMATCH ACC ON ACC.FILE_NAME = :ACC_FILE_NAME)"60end2_sql += " WHERE SEQ = :SEQ"61end3_sql = " UPDATE " + req_DB + " SET (REQSTATE, SAMEFILEID, ACCFILEID, MODDATE, MODUSERID) ="62end3_sql += " (SELECT DISTINCT 'Y' AS REQSTATE, SAME.UUID AS SAMEFILEID, ACC.UUID AS ACCFILEID, SYSDATE AS MODDATE, 'pyadmin' AS MODUSERID FROM ICOMATCH"63end3_sql += " LEFT JOIN ICOMATCH SAME ON SAME.FILE_NAME = :SAME_FILE_NAME"64end3_sql += " LEFT JOIN ICOMATCH ACC ON ACC.FILE_NAME = :ACC_FILE_NAME)"65end3_sql += " WHERE SEQ = :SEQ"66error_sql = "UPDATE " + req_DB67error_sql += " SET REQSTATE = 'E', MODDATE = SYSDATE, MODUSERID = 'pyadmin', RESULTMSG = :RESULTMSG"68error_sql += " WHERE SEQ = :SEQ"69try:70 cur = con.cursor()71 for file_data in update_df.to_dict('index').values():72 seq = file_data['SEQ']73 try:74 cur.prepare(ing_sql)75 print('íì¼ ìì± ìì')76 cur.execute(ing_sql, {'SEQ': seq})77 print('íì¼ ìì± ì¤')78 except Exception as e1:79 errMsg = ">>>> :: " + str(e1)80 con.rollback()81 print(errMsg)82 finally:83 try:84 con.commit()85 except Exception as e3:86 print('commit err' + str(e3))87 # í목ì¶ì²88 file_path = file_data['REQFILEPATH']89 file_nm = file_data['REQFILENM']90 div = file_data['REQDIV']91 if file_data['ADDCOLUMN'] == '':92 sel_cols = []93 else:94 sel_cols = file_data['ADDCOLUMN'].split(',')95 try :96 same_file, acc_file = itemSearch.item_search(file_path, file_nm, div, seq, sel_cols)97 print('íì¼ ìì± ìë£')98 except Exception as e1:99 print('íì¼ ì
ë ¥ ì¤ë¥')100 errMsg = str(e1)101 cur.prepare(error_sql)102 cur.execute(error_sql, {'SEQ': seq, 'RESULTMSG': errMsg})103 try:104 if file_data['REQDIV'] == '1':105 cur.prepare(file_sql)106 cur.execute(file_sql, {'UUID_SEQ': same_file,107 'FILE_NAME': same_file,108 'FILE_PATH': file_path,109 'FILE_SIZE': os.path.getsize(file_path + same_file),110 'FILE_EXTENSION': same_file.split('.')[1],111 'REAL_FILE_NAME': same_file})112 cur.prepare(end1_sql)113 cur.execute(end1_sql, {'SAME_FILE_NAME': same_file,114 'SEQ': seq})115 print('ìí ì
ë°ì´í¸ ìë£')116 elif file_data['REQDIV'] == '2':117 cur.prepare(file_sql)118 cur.execute(file_sql, {'UUID_SEQ': acc_file,119 'FILE_NAME': acc_file,120 'FILE_PATH': file_path,121 'FILE_SIZE': os.path.getsize(file_path + acc_file),122 'FILE_EXTENSION': acc_file.split('.')[1],123 'REAL_FILE_NAME': acc_file})124 cur.prepare(end2_sql)125 cur.execute(end2_sql, {'ACC_FILE_NAME': acc_file,126 'SEQ': seq})127 print('ìí ì
ë°ì´í¸ ìë£')128 elif file_data['REQDIV'] == '3':129 cur.prepare(file_sql)130 cur.execute(file_sql, {'UUID_SEQ': same_file,131 'FILE_NAME': same_file,132 'FILE_PATH': file_path,133 'FILE_SIZE': os.path.getsize(file_path + same_file),134 'FILE_EXTENSION': same_file.split('.')[1],135 'REAL_FILE_NAME': same_file})136 cur.prepare(file_sql)137 cur.execute(file_sql, {'UUID_SEQ': acc_file,138 'FILE_NAME': acc_file,139 'FILE_PATH': file_path,140 'FILE_SIZE': os.path.getsize(file_path + acc_file),141 'FILE_EXTENSION': acc_file.split('.')[1],142 'REAL_FILE_NAME': acc_file})143 cur.prepare(end3_sql)144 cur.execute(end3_sql, {'SAME_FILE_NAME': same_file,145 'ACC_FILE_NAME': acc_file,146 'SEQ': seq})147 print('ìí ì
ë°ì´í¸ ìë£')148 else:149 print('REQDIV error')150 except Exception as e:151 print('update err : ' + str(e))152 finally:153 try:154 con.commit()155 except Exception as e3:156 print('commit err : ' + str(e3))157except Exception as e1:158 print('íì¼ ì
ë ¥ ì¤ë¥')159 errMsg = str(e1)160 cur.prepare(error_sql)161 cur.execute(error_sql, {'SEQ': seq, 'RESULTMSG': errMsg})162 con.commit()163finally:164 try:165 cur.close()166 except Exception as e3:...
test.py
Source:test.py
1import unittest2from os import listdir, path, remove3from os.path import isfile, join4import filecmp5import netCDF46import numpy as np7from radarconverter.radar_grib2netcdf.radar_grib2netcdf import radar_grib2netcdf8from radarconverter.radar_netcdf2grib.radar_netcdf2grib import radar_netcdf2grib9path_grib1 = "dataset/grib1/"10path_grib2 = "dataset/grib2/"11path_netcdf = "dataset/netcdf/"12path_temps = "dataset/temps/"13def clear_dir(dir_name):14 """15 Delete all file ins pecified dir16 """17 print("CLEANING DIR {} ...".format(dir_name))18 for f in listdir(dir_name):19 if isfile(join(dir_name, f)):20 remove(join(dir_name, f))21 print("CLEAN DIR {} DONE".format(dir_name))22def read_netcdf_data(name_nc):23 # Apertura del file netcdf24 ncid = netCDF4.Dataset(name_nc)25 for k in ncid.variables.keys():26 if k == 'cum_pr_mm': # Estraggo gli attributi del campo di pioggia27 varid_pr = ncid.variables['cum_pr_mm']28 temp = [varid_pr[:]][0]29 temp = np.array(temp)30 cum_pr_mm = temp[0]31 if k == 'lat':32 varid_pr = ncid.variables['lat']33 temp = [varid_pr[:]][0]34 temp = np.array(temp)35 lats = temp[0]36 if k == 'lon':37 varid_pr = ncid.variables['lon']38 temp = [varid_pr[:]][0]39 temp = np.array(temp)40 lons = temp[0]41 return lats, lons, cum_pr_mm42class TestMethods(unittest.TestCase):43 """44 Insert45 """46 def test_convertion_grib1(self):47 print("TEST GRIB1")48 same_file = True49 files = [f for f in listdir(path_grib1) if isfile(join(path_grib1, f))]50 for f in files:51 grib_filename_orig = path_grib1 + path.basename(f)52 grib_filename_new = path_temps + path.basename(f)53 nc_filename = path_temps + path.basename(f).replace("grib1", "nc")54 radar_grib2netcdf(grib_filename_orig, nc_filename)55 radar_netcdf2grib(nc_filename, grib_filename_new, 1)56 if not filecmp.cmp(grib_filename_orig, grib_filename_new):57 same_file = False58 print("Errore su file: {}".format(grib_filename_orig))59 self.assertTrue(same_file)60 clear_dir(path_temps)61 '''62 def test_convertion_grib2(self):63 print("TEST GRIB2")64 same_file = True65 files = [f for f in listdir(path_grib2) if isfile(join(path_grib2, f))]66 for f in files:67 grib_filename_orig = path_grib2 + path.basename(f)68 grib_filename_new = path_temps + path.basename(f)69 nc_filename = path_temps + path.basename(f).replace("grib2", "nc")70 radar_grib2netcdf(grib_filename_orig, nc_filename)71 radar_netcdf2grib(nc_filename, grib_filename_new, 2)72 if not filecmp.cmp(grib_filename_orig, grib_filename_new):73 same_file = False74 print("Errore su file: {}".format(grib_filename_orig))75 self.assertTrue(same_file)76 clear_dir(path_temps)77 '''78 def test_convertion_netcdf(self):79 print("TEST NETCDF")80 same_file = True81 files = [f for f in listdir(path_netcdf) if isfile(join(path_netcdf, f))]82 for f in files:83 nc_filename_orig = path_netcdf + path.basename(f)84 nc_filename_new = path_temps + path.basename(f)85 grib_filename = path_temps + path.basename(f).replace("nc", "grib1")86 radar_netcdf2grib(nc_filename_orig, grib_filename, 1)87 radar_grib2netcdf(grib_filename, nc_filename_new)88 if not filecmp.cmp(nc_filename_orig, nc_filename_new):89 lats_new, lons_new, cum_pr_mm_new = read_netcdf_data(nc_filename_new)90 lats_orig, lons_orig, cum_pr_mm_orig = read_netcdf_data(nc_filename_orig)91 error_max_lat = np.max(np.abs(lats_new - lats_orig))92 error_max_lon = np.max(np.abs(lons_new - lons_orig))93 error_max_cum_pr_mm = np.max(np.abs(cum_pr_mm_new - cum_pr_mm_orig))94 error_max = error_max_lat95 if error_max < error_max_lon:96 error_max = error_max_lon97 if error_max < error_max_cum_pr_mm:98 error_max = error_max_cum_pr_mm99 if error_max >= 0.005:100 print("errore massmo {}".format(str(error_max)))101 same_file = False102 print("Errore su file: {}".format(nc_filename_orig))103 self.assertTrue(same_file)104 clear_dir(path_temps)105if __name__ == '__main__':...
test_cli.py
Source:test_cli.py
1import unittest2from hashlib import md53from tempfile import gettempdir4class Test(unittest.TestCase):5 def test_link(self):6 import string7 from os.path import join, split, exists8 from os import makedirs9 from shutil import rmtree10 tmp = join(gettempdir(), "dupln")11 data = dict(12 filter(13 (lambda _: isinstance(_[1], str)),14 ((k, getattr(string, k)) for k in dir(string) if k[0].isalpha()),15 )16 )17 # pprint.pprint(data)18 def test(sepins=[2, 5, 8]):19 exists(tmp) and rmtree(tmp)20 same_file = {}21 def put_bytes(b, path):22 h = md5()23 h.update(b)24 q = (len(b), h.hexdigest())25 if q in same_file:26 same_file[q] += 127 else:28 same_file[q] = 129 dir = split(path)[0]30 exists(dir) or makedirs(dir)31 with open(path, "wb") as o:32 o.write(b)33 for k, v in data.items():34 b = v.encode("UTF-8")35 put_bytes(b[::-1], join(tmp, k)) # reversed36 for i in sepins:37 if i < len(k):38 k = k[:i] + "/" + k[i:]39 put_bytes(b, join(tmp, k))40 return dict(41 same_file=sum(1 for count in same_file.values() if count > 1),42 unique_files=len(same_file),43 disk_size=sum(size_hash[0] for size_hash in same_file.keys()),44 size=sum(45 size_hash[0] * count for size_hash, count in same_file.items()46 ),47 files=sum(same_file.values()),48 same_size=len(set(size_hash[0] for size_hash in same_file.keys())),49 linked=sum(count - 1 for count in same_file.values() if count > 1),50 )51 v = test()52 from .__main__ import App53 # Total disk_size 836b; files 26; inodes 26; same_size 8; size 836b;54 total = App().main(["cmd", "stat", tmp])55 self.assertEqual(total.disk_size, v["size"])56 self.assertEqual(total.files, v["files"])57 self.assertEqual(total.inodes, v["files"])58 self.assertEqual(total.same_size, v["same_size"])59 self.assertEqual(total.size, v["size"])60 # Total devices 1; disk_size 836b; files 26; inodes 26; size 836b; unique_size 8;61 total = App().main(["cmd", "unique_files", tmp])62 self.assertEqual(total.disk_size, v["size"])63 self.assertEqual(total.files, v["files"])64 self.assertEqual(total.inodes, v["files"])65 self.assertEqual(total.size, v["size"])66 self.assertEqual(total.unique_size, v["same_size"])67 # disk_size ; files ; inodes ; linked ; same_hash ; same_size ; size ;68 total = App().main(["cmd", "link", tmp])69 self.assertEqual(total.disk_size, v["disk_size"])70 self.assertEqual(total.files, v["files"])71 self.assertEqual(total.inodes, v["files"])72 self.assertEqual(total.linked, v["linked"])73 self.assertEqual(total.same_hash, v["same_file"])74 self.assertEqual(total.same_size, v["same_size"])75 self.assertEqual(total.size, v["size"])76 #77 total = App().main(["cmd", "unique_files", tmp])78 self.assertEqual(total.disk_size, v["disk_size"])79 self.assertEqual(total.files, v["files"])80 self.assertEqual(total.inodes, v["unique_files"])81 self.assertEqual(total.size, v["size"])82 self.assertEqual(total.unique_size, v["same_size"])83 #84 total = App().main(["cmd", "stat", tmp])85 self.assertEqual(total.disk_size, v["disk_size"])86 self.assertEqual(total.files, v["files"])87 self.assertEqual(total.same_size, v["same_size"])88 self.assertEqual(total.inodes, v["unique_files"])...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!