Best Python code snippet using slash
fetcher.py
Source:fetcher.py
1# /usr/bin/env python32"""3ç½ç»æ¥è¯¢æ¥å£ï¼41. 个è¡æ¥è¯¢5 - QA_fetch_get_individual_financial: æ¥è¯¢ä¸ªè¡æå®æ¶é´æ®µæå®è´¢å¡æ¥è¡¨æå®æ¥åç±»åæ°æ®62. æªé¢æ¥è¯¢7 - QA_fetch_get_crosssection_financial: æ¥è¯¢æå®æ¥åææå®æ¥è¡¨æå®æ¥åç±»åæ°æ®8æ¬å°æ¥è¯¢æ¥å£ï¼91. æªé¢æ¥è¯¢10 - QA_fetch_crosssection_financial112. é«çº§æ¥è¯¢12 - QA_fetch_financial_adv13"""14import datetime15import time16from typing import List, Tuple, Union17import pandas as pd18import pymongo19import tushare as ts20from QUANTAXIS.QAFactor.utils import QA_fmt_code, QA_fmt_code_list21from QUANTAXIS.QAFetch.QAQuery_Advance import QA_fetch_stock_list22from QUANTAXIS.QAFetch.QATushare import get_pro23from QUANTAXIS.QAUtil import (DATABASE, QASETTING, QA_util_date_int2str,24 QA_util_date_stamp, QA_util_get_pre_trade_date,25 QA_util_get_real_date, QA_util_log_info,26 QA_util_to_json_from_pandas)27REPORT_DATE_TAILS = ["0331", "0630", "0930", "1231"]28SHEET_TYPE = ["income", "balancesheet", "cashflow"]29REPORT_TYPE = ['1', '2', '3', '4', '5', '11']30def QA_fetch_get_individual_financial(31 code: str,32 start: Union[str, datetime.datetime, pd.Timestamp] = None,33 end: Union[str, datetime.datetime, pd.Timestamp] = None,34 report_date: Union[str, datetime.datetime] = None,35 sheet_type: str = "income",36 report_type: Union[int, str] = 1,37 fields: Union[str, Tuple, List] = None,38 wait_seconds: int = 61,39 max_trial: int = 3) -> pd.DataFrame:40 """个è¡è´¢å¡æ¥è¡¨ç½ç»æ¥è¯¢æ¥å£ï¼æ³¨æï¼è¿éç start ä¸ end æ¯é对 report_date è¿è¡èå´æ¥è¯¢41 Args:42 code (str): è¡ç¥¨ä»£ç 43 start (Union[str, datetime.datetime, pd.Timestamp], optional): æ¥è¯¢èµ·å§æ¶é´ï¼é»è®¤ä¸º None44 end (Union[str, datetime.datetime, pd.Timestamp], optional): æ¥è¯¢ç»ææ¶é´ï¼é»è®¤ä¸º None45 report_date (Union[str, datetime.datetime], optional): æ¥åæ. é»è®¤ä¸º Noneï¼å¦æ使ç¨äº report_date, å start ä¸ end åæ°ä¸åèµ·ä½ç¨46 sheet_type (str, optional): æ¥è¡¨ç±»åï¼é»è®¤ä¸º "income" ç±»å47 (å©æ¶¦è¡¨ "income"|48 èµäº§è´åºè¡¨ "balancesheet"|49 ç°éæµé表 "cashflow"|50 ä¸ç»©é¢å "forecast"|51 ä¸ç»©å¿«æ¥ "express")52 report_type (Union[int, str], optional): æ¥åç±»å. é»è®¤ä¸º 1ã53 (1 å并æ¥è¡¨ ä¸å¸å
¬å¸ææ°æ¥è¡¨ï¼é»è®¤ï¼|54 2 åå£å并 åä¸å£åº¦çå并æ¥è¡¨ |55 3 è°æ´åå£å并表 è°æ´åçåå£å并æ¥è¡¨ï¼å¦ææï¼ |56 4 è°æ´å并æ¥è¡¨ æ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ®ï¼æ¥åæ为ä¸å¹´åº¦ |57 5 è°æ´åå并æ¥è¡¨ æ°æ®åçåæ´ï¼å°åæ°æ®è¿è¡ä¿çï¼å³è°æ´åçåæ°æ® |58 6 æ¯å
¬å¸æ¥è¡¨ 该å
¬å¸æ¯å
¬å¸çè´¢å¡æ¥è¡¨æ°æ® |59 7 æ¯å
¬å¸åå£è¡¨ æ¯å
¬å¸çåå£åº¦è¡¨ |60 8 æ¯å
¬å¸è°æ´åå£è¡¨ æ¯å
¬å¸è°æ´åçåå£è¡¨ |61 9 æ¯å
¬å¸è°æ´è¡¨ 该å
¬å¸æ¯å
¬å¸çæ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ® |62 10 æ¯å
¬å¸è°æ´åæ¥è¡¨ æ¯å
¬å¸è°æ´ä¹åçåå§è´¢å¡æ¥è¡¨æ°æ® |63 11 è°æ´åå并æ¥è¡¨ è°æ´ä¹åå并æ¥è¡¨åæ°æ® |64 12 æ¯å
¬å¸è°æ´åæ¥è¡¨ æ¯å
¬å¸æ¥è¡¨åçåæ´åä¿ççåæ°æ®)65 fields (Union[str, Tuple, List], optional): æå®æ°æ®èå´ï¼å¦æ设置为 Noneï¼åè¿åæææ°æ®. é»è®¤ä¸º None.66 wait_seconds (int, optional): çå¾
éè¯æ¶é´. é»è®¤ä¸º 61 ç§.67 max_trial (int, optional): æ大éè¯æ¬¡æ°. é»è®¤ä¸º 3.68 Returns:69 pd.DataFrame: è¿åæå®ä¸ªè¡æ¶é´èå´å
æå®ç±»åçæ¥è¡¨æ°æ®70 """71 def _get_individual_financial(code, report_date, report_type, sheet_type, fields, wait_seconds, trial_count):72 nonlocal pro, max_trial73 if trial_count >= max_trial:74 raise ValueError("[ERROR]\tEXCEED MAX TRIAL!")75 try:76 if not fields:77 df = eval(78 f"pro.{sheet_type}(ts_code='{code}', period='{report_date}', report_type={report_type})")79 else:80 df = eval(81 f"pro.{sheet_type}(ts_code='{code}', period='{report_date}', report_type={report_type}, fields={fields})")82 return df.rename(columns={"ts_code": "code", "end_date": "report_date"})83 except Exception as e:84 print(e)85 time.sleep(wait_seconds)86 _get_individual_financial(87 code, report_date, report_type, sheet_type, fields, wait_seconds, trial_count+1)88 pro = get_pro()89 report_type = int(report_type)90 if (not start) and (not end) and (not report_date):91 raise ValueError(92 "[QRY_DATES ERROR]\tparam 'start', 'end' and 'report_date' should not be none at the same time!")93 if isinstance(fields, str):94 fields = sorted(list(set([fields, "ts_code", "end_date",95 "ann_date", "f_ann_date", "report_type", "update_flag"])))96 if report_date:97 report_date = pd.Timestamp(report_date)98 year = report_date.year99 report_date_lists = [100 pd.Timestamp(str(year) + report_date_tail) for report_date_tail in REPORT_DATE_TAILS]101 if report_date not in report_date_lists:102 raise ValueError("[REPORT_DATE ERROR]")103 if sheet_type not in ["income", "balancesheet", "cashflow", "forecast", "express"]:104 raise ValueError("[SHEET_TYPE ERROR]")105 if report_type not in range(1, 13):106 raise ValueError("[REPORT_TYPE ERROR]")107 report_dates = [report_date]108 else:109 start = pd.Timestamp(start)110 start_year = start.year111 end = pd.Timestamp(end)112 end_year = end.year113 origin_year_ranges = pd.date_range(114 str(start_year), str(end_year+1), freq='Y').map(str).str.slice(0, 4).tolist()115 origin_report_ranges = pd.Series([116 pd.Timestamp(year + report_date_tail) for year in origin_year_ranges for report_date_tail in REPORT_DATE_TAILS])117 report_dates = origin_report_ranges.loc[(118 origin_report_ranges >= start) & (origin_report_ranges <= end)]119 df = pd.DataFrame()120 for report_date in report_dates:121 df = df.append(_get_individual_financial(122 code=QA_fmt_code(code, "ts"),123 report_date=report_date.strftime("%Y%m%d"),124 report_type=report_type,125 sheet_type=sheet_type,126 fields=fields,127 wait_seconds=wait_seconds,128 trial_count=0))129 df.code = QA_fmt_code_list(df.code)130 return df.reset_index(drop=True)131def QA_fetch_get_crosssection_financial(132 report_date: Union[str, datetime.datetime, pd.Timestamp],133 report_type: Union[int, str] = 1,134 sheet_type: str = "income",135 fields: Union[str, Tuple, List] = None,136 wait_seconds: int = 61,137 max_trial: int = 3) -> pd.DataFrame:138 """æªé¢è´¢å¡æ¥è¡¨ç½ç»æ¥è¯¢æ¥å£139 Args:140 report_date (Union[str, datetime.datetime, pd.Timestamp]): æ¥åæ141 report_type (Union[int, str], optional): æ¥åç±»åï¼é»è®¤å¼ä¸º 1.142 (1 å并æ¥è¡¨ ä¸å¸å
¬å¸ææ°æ¥è¡¨ï¼é»è®¤ï¼|143 2 åå£å并 åä¸å£åº¦çå并æ¥è¡¨ |144 3 è°æ´åå£å并表 è°æ´åçåå£å并æ¥è¡¨ï¼å¦ææï¼ |145 4 è°æ´å并æ¥è¡¨ æ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ®ï¼æ¥åæ为ä¸å¹´åº¦ |146 5 è°æ´åå并æ¥è¡¨ æ°æ®åçåæ´ï¼å°åæ°æ®è¿è¡ä¿çï¼å³è°æ´åçåæ°æ® |147 6 æ¯å
¬å¸æ¥è¡¨ 该å
¬å¸æ¯å
¬å¸çè´¢å¡æ¥è¡¨æ°æ® |148 7 æ¯å
¬å¸åå£è¡¨ æ¯å
¬å¸çåå£åº¦è¡¨ |149 8 æ¯å
¬å¸è°æ´åå£è¡¨ æ¯å
¬å¸è°æ´åçåå£è¡¨ |150 9 æ¯å
¬å¸è°æ´è¡¨ 该å
¬å¸æ¯å
¬å¸çæ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ® |151 10 æ¯å
¬å¸è°æ´åæ¥è¡¨ æ¯å
¬å¸è°æ´ä¹åçåå§è´¢å¡æ¥è¡¨æ°æ® |152 11 è°æ´åå并æ¥è¡¨ è°æ´ä¹åå并æ¥è¡¨åæ°æ® |153 12 æ¯å
¬å¸è°æ´åæ¥è¡¨ æ¯å
¬å¸æ¥è¡¨åçåæ´åä¿ççåæ°æ®)154 sheet_type (str, optional): æ¥è¡¨ç±»åï¼é»è®¤ä¸º "income".155 (å©æ¶¦è¡¨ "income"|156 èµäº§è´åºè¡¨ "balancesheet"|157 ç°éæµé表 "cashflow"|158 ä¸ç»©é¢å "forecast"|159 ä¸ç»©å¿«æ¥ "express")160 fields (Union[str, List], optional): æ°æ®èå´ï¼é»è®¤ä¸º Noneï¼è¿åæææ°æ®.161 wait_seconds (int, optional): æ¥è¯¢è¶
æ¶æ¶é´, é»è®¤ä¸º 61.162 max_trial (int, optional): æ¥è¯¢æ大å°è¯æ¬¡æ°, é»è®¤ä¸º 3.163 Returns:164 pd.DataFrame: æå®æ¥åæçæå®è´¢å¡æ¥è¡¨æ°æ®165 """166 def _get_crosssection_financial(report_date, report_type, sheet_type, fields, wait_seconds, trial_count):167 nonlocal pro, max_trial168 if trial_count >= max_trial:169 raise ValueError("[ERROR]\tEXCEED MAX TRIAL!")170 try:171 if not fields:172 print(173 f"pro.{sheet_type}_vip(period='{report_date}', report_type={report_type})")174 df = eval(175 f"pro.{sheet_type}_vip(period='{report_date}', report_type={report_type})")176 else:177 df = eval(178 f"pro.{sheet_type}_vip(period='{report_date}', report_type={report_type}, fields={fields})")179 if df.empty:180 return df181 df.ts_code = QA_fmt_code_list(df.ts_code)182 return df.rename(columns={"ts_code": "code", "end_date": "report_date"}).sort_values(by=['ann_date', 'f_ann_date'])183 except Exception as e:184 print(e)185 time.sleep(wait_seconds)186 _get_crosssection_financial(187 report_date, report_type, sheet_type, fields, wait_seconds, trial_count + 1)188 # Tushare è´¦å·é
ç½®189 pro = get_pro()190 # 设置æ åæ¥åææ ¼å¼191 report_date = pd.Timestamp(report_date)192 report_type = int(report_type)193 year = report_date.year194 std_report_dates = [195 str(year) + report_date_tail for report_date_tail in REPORT_DATE_TAILS]196 # Tushare æ¥å£æ¯æçæ¥ææ ¼å¼197 if report_date.strftime("%Y%m%d") not in std_report_dates:198 raise ValueError("[REPORT_DATE ERROR]")199 # fields æ ¼å¼åå¤ç200 if isinstance(fields, str):201 fields = sorted(list(set([fields, "ts_code", "end_date",202 "ann_date", "f_ann_date", "report_type", "update_flag"])))203 # ç®åæ¯æå©æ¶¦è¡¨ï¼èµäº§è´åºè¡¨åç°éæµé表204 if sheet_type not in SHEET_TYPE:205 raise ValueError("[SHEET_TYPE ERROR]")206 if report_type not in range(1, 13):207 raise ValueError("[REPORT_TYTPE ERROR]")208 return _get_crosssection_financial(209 report_date=report_date.strftime("%Y%m%d"),210 report_type=report_type,211 sheet_type=sheet_type,212 fields=fields,213 wait_seconds=wait_seconds,214 trial_count=0)215# FIXME: Add Fetch Get Method of Daily Basic216def QA_fetch_get_daily_basic(217 code: Union[str, List, Tuple] = None,218 trade_date: Union[str, pd.Timestamp, datetime.datetime] = None,219 fields: Union[str, List, Tuple] = None,220 wait_seconds: int = 61,221 max_trial: int = 3222) -> pd.DataFrame:223 """224 ä»ç½ç»è·åå¸åºæå®äº¤ææ¥éè¦åºæ¬é¢ææ ï¼ç¨äºéè¡åæåæ¥è¡¨å±ç¤º225 Args:226 code(Union[str, List, Tuple], optional): æå®è¡ç¥¨ä»£ç ï¼é»è®¤ä¸º Noneï¼å³å¯¹åºäº¤ææ¥çå
¨å¸åºè¡ç¥¨227 trade_date(Union[str, pd.Timestamp, datetime.datetime], optional): æå®äº¤ææ¥æ, é»è®¤ä¸º None, å³è·ç¦»å½å228 æ¥ææè¿ç交ææ¥229 fields(Union[str, List, Tuple], optional): é»è®¤ä¸º Noneï¼å¦ææå®ä¸ºæä¸å个 strï¼é»è®¤è¿å DataFrame å
æ¬230 交ææ¥çéå ä¿¡æ¯231 wait_seconds (int, optional): æ¥è¯¢è¶
æ¶æ¶é´, é»è®¤ä¸º 61.232 max_trial (int, optional): æ¥è¯¢æ大å°è¯æ¬¡æ°, é»è®¤ä¸º 3.233 Returns:234 pd.DataFrame: æå®äº¤ææ¥æå®èå´æå®æ ççæ¯æ¥åºæ¬é¢ææ ä¿¡æ¯235 """236 def _fetch_get_daily_basic(trade_date, fields, trial_count):237 nonlocal pro, max_trial238 try:239 if trial_count >= max_trial:240 raise ValueError("[ERROR]\tEXCEED MAX TRIAL!")241 if not trade_date:242 trade_date = QA_util_get_pre_trade_date(243 datetime.date.today(), 1).replace("-", "")244 else:245 trade_date = pd.Timestamp(trade_date).strftime("%Y%m%d")246 if not fields:247 qry = f"pro.daily_basic(trade_date={trade_date})"248 else:249 if isinstance(fields, str):250 fields = list(set([fields] + ["ts_code", "trade_date"]))251 fields = ",".join(fields)252 qry = "pro.daily_basic(trade_date={trade_date}, fields={fields})"253 df = eval(qry)254 if df is None:255 raise ValueError("[ERROR]")256 return df257 except:258 time.sleep(61)259 _fetch_get_daily_basic(260 trade_date, fields, trial_count+1261 )262 pro = get_pro()263 df = _fetch_get_daily_basic(264 trade_date=trade_date, fields=fields, trial_count=0)265 if df.empty:266 return df267 else:268 df = df.rename(columns={"ts_code": "code"})269 df.code = QA_fmt_code_list(df.code)270 df = df.set_index("code")271 if not code:272 return df273 if isinstance(code, str):274 code = (code,)275 # exclude code which not in rtn dataframe276 filter_idx = df.index.intersection(code)277 return df.loc[filter_idx]278def QA_fetch_crosssection_financial(279 report_date: Union[str, datetime.datetime, pd.Timestamp],280 report_type: Union[int, str] = 1,281 sheet_type: str = "income",282 fields: Union[str, Tuple, List] = None) -> pd.DataFrame:283 """æ¬å°æ¥è¯¢æªé¢è´¢å¡æ°æ®æ¥å£284 Args:285 report_date (Union[str, datetime.datetime, pd.Timestamp]): æ¥åæ286 report_type (Union[int, str], optional): æ¥åç±»åï¼é»è®¤ä¸º 1.287 (1 å并æ¥è¡¨ ä¸å¸å
¬å¸ææ°æ¥è¡¨ï¼é»è®¤ï¼|288 2 åå£å并 åä¸å£åº¦çå并æ¥è¡¨ |289 3 è°æ´åå£å并表 è°æ´åçåå£å并æ¥è¡¨ï¼å¦ææï¼ |290 4 è°æ´å并æ¥è¡¨ æ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ®ï¼æ¥åæ为ä¸å¹´åº¦ |291 5 è°æ´åå并æ¥è¡¨ æ°æ®åçåæ´ï¼å°åæ°æ®è¿è¡ä¿çï¼å³è°æ´åçåæ°æ® |292 11 è°æ´åå并æ¥è¡¨ è°æ´ä¹åå并æ¥è¡¨åæ°æ®)293 sheet_type (str, optional): æ¥è¡¨ç±»åï¼é»è®¤ä¸º "income".294 fields (Union[str, Tuple, List], optional): å段ï¼é»è®¤ä¸º Noneï¼è¿åææå段.295 Returns:296 pd.DataFrame: æå®æ¥åææå®æ¥è¡¨æ°æ®297 """298 if isinstance(fields, str):299 fields = sorted(list(set([fields, "code", "report_date",300 "ann_date", "f_ann_date", "report_type", "update_flag"])))301 coll = eval(f"DATABASE.{sheet_type}")302 report_date = pd.Timestamp(report_date).strftime("%Y%m%d")303 cursor = coll.find(304 {305 "report_date": report_date,306 "report_type": str(report_type)307 }308 )309 res = pd.DataFrame([item for item in cursor])310 if res.empty:311 return pd.DataFrame()312 res.report_date = pd.to_datetime(res.report_date, utc=False)313 if not fields:314 return res.drop(columns="_id")315 return res.drop(columns="_id")[fields]316def QA_fetch_financial_adv(317 code: Union[str, Tuple, List] = None,318 start: Union[str, datetime.datetime, pd.Timestamp] = None,319 end: Union[str, datetime.datetime, pd.Timestamp] = None,320 report_date: Union[str, datetime.datetime, pd.Timestamp] = None,321 report_type: Union[int, str] = None,322 sheet_type: str = "income",323 fields: Union[str, Tuple, List] = None) -> pd.DataFrame:324 """æ¬å°è·åæå®è¡ç¥¨æè
æå®è¡ç¥¨å表ï¼æå®æ¶é´èå´æè
æ¥åæï¼æå®æ¥åç±»åçæå®è´¢å¡æ¥è¡¨æ°æ®325 Args:326 code (Union[str, Tuple, List], optional): æå®è¡ç¥¨ä»£ç æå表ï¼é»è®¤ä¸º None, å
¨å¸åºè¡ç¥¨327 start (Union[str, datetime.datetime, pd.Timestamp], optional): èµ·å§æ¶é´328 end (Union[str, datetime.datetime, pd.Timestamp], optional): ç»ææ¶é´329 report_date (Union[str, datetime.datetime, pd.Timestamp], optional): æ¥åæ330 report_type (Union[int, str], optional): æ¥åç±»åï¼é»è®¤ä¸º 1.331 (1 å并æ¥è¡¨ ä¸å¸å
¬å¸ææ°æ¥è¡¨ï¼é»è®¤ï¼|332 2 åå£å并 åä¸å£åº¦çå并æ¥è¡¨ |333 3 è°æ´åå£å并表 è°æ´åçåå£å并æ¥è¡¨ï¼å¦ææï¼ |334 4 è°æ´å并æ¥è¡¨ æ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ®ï¼æ¥åæ为ä¸å¹´åº¦ |335 5 è°æ´åå并æ¥è¡¨ æ°æ®åçåæ´ï¼å°åæ°æ®è¿è¡ä¿çï¼å³è°æ´åçåæ°æ® |336 11 è°æ´åå并æ¥è¡¨ è°æ´ä¹åå并æ¥è¡¨åæ°æ®)337 sheet_type (str, optional): æ¥è¡¨ç±»åï¼é»è®¤ä¸º "income".338 fields (List, optional): å段ï¼é»è®¤ä¸º Noneï¼è¿åææå段.339 Returns:340 pd.DataFrame: æå®æ¡ä»¶çæ¬å°æ¥è¡¨æ°æ®341 """342 if (not start) and (not end) and (not report_date):343 raise ValueError(344 "[DATE ERROR]\t 'start', 'end' ä¸ 'report_date' ä¸è½åæ¶ä¸º None")345 if isinstance(code, str):346 code = (code,)347 if not report_type:348 report_type = ("1", "2", "4", "5", "11")349 if isinstance(report_type, int) or isinstance(report_type, str):350 report_type = (str(report_type), )351 else:352 report_type = list(map(str, report_type))353 coll = eval(f"DATABASE.{sheet_type}")354 qry = {}355 if not report_date:356 if not end:357 end = datetime.date.today()358 start = pd.Timestamp(start)359 end = pd.Timestamp(end)360 start_date_stamp = QA_util_date_stamp(start)361 end_date_stamp = QA_util_date_stamp(end)362 if not code:363 qry = {364 "f_ann_date_stamp": {365 "$gte": start_date_stamp,366 "$lte": end_date_stamp367 },368 "report_type": {369 "$in": report_type370 }371 }372 else:373 qry = {374 "code": {375 "$in": code376 },377 "f_ann_date_stamp": {378 "$gte": start_date_stamp,379 "$lte": end_date_stamp380 },381 "report_type": {382 "$in": report_type383 }384 }385 else:386 report_date_stamp = QA_util_date_stamp(report_date)387 if not code:388 qry = {389 "report_date_stamp": report_date_stamp,390 "report_type": {391 "$in": report_type392 }393 }394 else:395 qry = {396 "code": {397 "$in": code398 },399 "report_date_stamp": report_date_stamp,400 "report_type": {401 "$in": report_type402 }403 }404 if isinstance(fields, str):405 fields = list(406 set([fields, "code", "ann_date", "report_date", "f_ann_date"]))407 elif fields:408 fields = list(409 set(list(fields) + ["code", "ann_date", "report_date", "f_ann_date"]))410 cursor = coll.find(qry, batch_size=10000).sort([411 ("report_date_stamp", pymongo.ASCENDING),412 ("f_ann_date_stamp", pymongo.ASCENDING)])413 if fields:414 df = pd.DataFrame(cursor).drop(columns="_id")[fields].set_index("code")415 df.report_date = pd.to_datetime(df.report_date, utc=False)416 df.ann_date = pd.to_datetime(df.ann_date, utc=False)417 df.f_ann_date = pd.to_datetime(df.f_ann_date, utc=False)418 else:419 df = pd.DataFrame(cursor).drop(columns="_id").set_index("code")420 df.report_date = pd.to_datetime(df.report_date, utc=False)421 df.ann_date = pd.to_datetime(df.ann_date, utc=False)422 df.f_ann_date = pd.to_datetime(df.f_ann_date, utc=False)423 return df424def QA_fetch_last_financial(425 code: Union[str, List, Tuple] = None,426 cursor_date: Union[str, datetime.datetime, pd.Timestamp] = None,427 report_label: Union[int, str] = None,428 report_type: Union[int, str, List, Tuple] = None,429 sheet_type: str = "income",430 fields: Union[str, List, Tuple] = None) -> pd.DataFrame:431 """è·åè·ç¦»æå®æ¥æ (cursor_date) æè¿çåå§æ°æ® (ä¸å
å«å¨ cursor_date åå¸çè´¢å¡æ°æ®)ï¼432 å½åæ¶è¾å
¥ cursor_date ä¸ report_date æ¶ï¼ä»¥ report_date ä½ä¸ºæ¥è¯¢æ å433 注æï¼434 è¿éç report_type ä»
æ¯æ (1,4, 5) ä¸ç§ç±»åï¼ä»¥é¿å
æ··æ·å并æ°æ®ååå£æ°æ®ç435 说æï¼436 æ³å·¥ (000528) å¨ 2018 å¹´ 8 æ 30 æ¥åå¸åå¹´æ¥ï¼ä¹åå¨ 2018 å¹´ 9 æ 29 æ¥åå¸ä¿®æ£æ¥åï¼437 - å¦æè¾å
¥ç cursor_date 为 2018-08-31, é£ä¹è·åå°çå°±æ¯åå§åå¹´æ¥ï¼å¯¹åº report_type == 5438 - å¦æè¾å
¥ç cursor_date 为 2018-09-30ï¼é£ä¹è·åå°çå°±æ¯ææ°å并æ¥è¡¨ï¼å¯¹åº report_type == 1439 - å¦æ对åºç cursor_date 为 2019-08-31ï¼éè¦è·å 2018 å¹´åå¹´æ¥ï¼é£ä¹å°±è¿åæ³å·¥å¨ 2019 å¹´ 8 æ 29 æ¥åå¸çä¸å¹´åæåºåï¼å¯¹åº report_type == 4440 Args:441 code (Union[str, List, Tuple], optional): è¡ç¥¨ä»£ç æè¡ç¥¨å表ï¼é»è®¤ä¸º None, æ¥è¯¢ææè¡ç¥¨442 cursor_date (Union[str, datetime.datetime, pd.Timestamp]): æ¥è¯¢æªé¢æ¥æ (ä¸è¬æè°ä»æ¥), é»è®¤ä¸º None443 report_label (Union[str, int], optional): æå®æ¥è¡¨ç±»åï¼è¿éçç±»åå类为ä¸å£æ¥ï¼åå¹´æ¥ï¼ä¸å£æ¥ï¼å¹´æ¥, é»è®¤ä¸º Noneï¼å³éæ©è·ç¦» cursor_date æè¿çæ¥è¡¨ç±»å444 report_type (Union[str, List, Tuple], optional): [description]. æ¥è¡¨ç±»åï¼é»è®¤ä¸º None. å³è·ç¦» cursor_date æè¿çè´¢æ¥ï¼ä¸æå®ç±»åï¼é¿å
å¼å
¥æªæ¥æ°æ®445 (1 å并æ¥è¡¨ ä¸å¸å
¬å¸ææ°æ¥è¡¨ï¼é»è®¤ï¼|446 2 åå£å并æ¥è¡¨447 4 è°æ´å并æ¥è¡¨ æ¬å¹´åº¦å
¬å¸ä¸å¹´åæçè´¢å¡æ¥è¡¨æ°æ®ï¼æ¥åæ为ä¸å¹´åº¦ |448 5 è°æ´åå并æ¥è¡¨ æ°æ®åçåæ´ï¼å°åæ°æ®è¿è¡ä¿çï¼å³è°æ´åçåæ°æ®)449 sheet_type (str, optional): æ¥è¡¨ç±»åï¼é»è®¤ä¸º "income".450 fields (Union[str, List, Tuple], optional): å段, é»è®¤ä¸º None, è¿åææå段451 Returns:452 pd.DataFrame: å¤åæ¡ä»¶çè´¢å¡æ°æ®453 """454 def _trans_financial_type(x):455 if x.empty:456 return x457 if sheet_type == "balancesheet":458 # èµäº§è´åºè¡¨å±äºæ¶ç¹ä¿¡æ¯ï¼ç´æ¥è¿å459 return x460 else:461 if x.iloc[0].report_date[4:] in ['0331', '1231']:462 # ä¸å£æ¥èè¨ï¼åå£å并ä¸æ®éå并没æåºå«ï¼ç´æ¥è¿å463 # å¹´æ¥èè¨ï¼ä¸åå¨åå£æ¦å¿µ464 return x.iloc[0]465 if x.iloc[0].report_type in ['1', '4', '5']:466 return x.iloc[0]467 if x.iloc[0].report_type == '2':468 # å°è¯æ¥æ¾åä¸æ¥åææ¥åç±»å为 '1' æ '4' çæ¥è¡¨æ°æ®469 # try:470 # if (x.shape[0] > 1) & (x.iloc[1].report_date == x.iloc[0].report_date) & (x.iloc[1].report_type in ['1', '4']):471 # return x.iloc[1]472 # except:473 # return pd.Series()474 # å°è¯ç´æ¥å©ç¨åå£æ°æ®è¿è¡æ¼æ¥475 cursor_x = x.loc[x.report_date.map(str).str.slice(476 0, 4) == x.iloc[0].report_date[:4]]477 cursor_x = cursor_x.drop_duplicates(subset = ['report_date'], keep='first')478 cursor_x = cursor_x.loc[cursor_x.report_date <=479 x.iloc[0].report_date]480 cursor_x = cursor_x.fillna(0)481 non_numeric_columns = sorted(["f_ann_date", "f_ann_date_stamp", "ann_date", "ann_date_stamp", "report_date", "report_date_stamp",482 "update_flag", "report_type", "code", "report_label"])483 columns = sorted(list(set(cursor_x.columns) - set(non_numeric_columns)))484 rtn_se = cursor_x[columns].sum(axis=0)485 rtn_se = rtn_se.append(cursor_x[non_numeric_columns].iloc[0])486 return rtn_se487 if isinstance(code, str):488 code = (code,)489 if not report_type:490 report_type = ["1", "2", "4", "5"]491 else:492 if isinstance(report_type, int):493 report_type = str(report_type)494 if isinstance(report_type, str):495 if report_type not in ["1", "4", "5"]:496 raise ValueError("[REPORT_TYPE ERROR]")497 report_type = (report_type,)498 else:499 report_type = list(set(report_type) & set('1', '2', '4', '5'))500 if sheet_type not in SHEET_TYPE:501 raise ValueError(f"[SHEET_TYPE ERROR]")502 if report_label:503 report_label = str(report_label)504 if isinstance(fields, str):505 fields = list(506 set([fields, "code", "ann_date", "report_date", "f_ann_date", "report_type"]))507 elif fields:508 fields = list(509 set(fields + ["code", "ann_date", "report_date", "f_ann_date", "report_type"]))510 coll = eval(f"DATABASE.{sheet_type}")511 if (not code) and (not report_label):512 # 为äºå å¿«æ£ç´¢é度ï¼ä»å½åæ¥æå¾åè³å¤å溯ä¸å£åº¦ï¼å®é
è°ä»æ¶ï¼ä»
èèå½åè½æ¿å°çææ°æ°æ®ï¼è°ä»å¨æä¸è¬ä»¥æ, å£ä¸ºåä½ï¼513 # æé¿ä¸è¬ä¸ºå¹´æ¥ï¼èä¿®æ£æ¥è¡¨å¦æè¶
è¿ 1 个å£åº¦ï¼åºæ¬ä¸æ¼è°ä»æ²¡æå½±åï¼è¿é以 1 å¹´ä½ä¸ºå溯åºå514 qry = {515 "f_ann_date_stamp": {516 "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),517 "$lt": QA_util_date_stamp(cursor_date)518 },519 "report_type": {520 "$in": report_type521 }}522 cursor = coll.find(qry, batch_size=10000).sort([523 ("report_date_stamp", pymongo.DESCENDING),524 ("f_ann_date_stamp", pymongo.DESCENDING)])525 try:526 if not fields:527 df = pd.DataFrame(cursor).drop(columns="_id")528 else:529 df = pd.DataFrame(cursor).drop(columns="_id")[fields]530 except:531 raise ValueError("[QRY ERROR]")532 if sheet_type == "balancesheet":533 return df.groupby("code").apply(lambda x: x.iloc[0])534 return df.groupby("code").apply(_trans_financial_type).unstack()535 if not report_label:536 qry = {537 "code": {538 "$in": code539 },540 "f_ann_date_stamp": {541 "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),542 "$lt": QA_util_date_stamp(cursor_date)543 },544 "report_type": {"$in": report_type}}545 cursor = coll.find(qry, batch_size=10000).sort([546 ("report_date_stamp", pymongo.DESCENDING),547 ("f_ann_date_stamp", pymongo.DESCENDING)])548 try:549 if not fields:550 df = pd.DataFrame(cursor).drop(columns="_id")551 else:552 df = pd.DataFrame(cursor).drop(columns="_id")[fields]553 except:554 raise ValueError("[QRY ERROR]")555 if sheet_type == "balancesheet":556 return df.groupby("code").apply(lambda x: x.iloc[0])557 return df.groupby("code").apply(_trans_financial_type).unstack()558 if not code:559 qry = {560 "f_ann_date_stamp": {561 "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),562 "$lt": QA_util_date_stamp(cursor_date)563 },564 "report_type": {565 "$in": report_type566 },567 "report_label": report_label568 }569 cursor = coll.find(qry, batch_size=10000).sort([570 ("report_date_stamp", pymongo.DESCENDING),571 ("f_ann_date_stamp", pymongo.DESCENDING)])572 try:573 if not fields:574 df = pd.DataFrame(cursor).drop(columns="_id")575 else:576 df = pd.DataFrame(cursor).drop(columns="_id")[fields]577 except:578 raise ValueError("[QRY ERROR]")579 if sheet_type == "balancesheet":580 return df.groupby("code").apply(lambda x: x.iloc[0])581 return df.groupby("code").apply(_trans_financial_type).unstack()582 else:583 qry = {584 "code": {585 "$in": code586 },587 "f_ann_date_stamp": {588 "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),589 "$lt": QA_util_date_stamp(cursor_date)590 },591 "report_type": {592 "$in": report_type593 },594 "report_label": report_label595 }596 cursor = coll.find(qry, batch_size=10000).sort([597 ("report_date_stamp", pymongo.DESCENDING),598 ("f_ann_date_stamp", pymongo.DESCENDING)])599 try:600 if not fields:601 df = pd.DataFrame(cursor).drop(columns="_id")602 else:603 df = pd.DataFrame(cursor).drop(columns="_id")[fields]604 except:605 raise ValueError("[QRY ERROR]")606 # df.report_date = pd.to_datetime(df.report_date, utc=False)607 # df.ann_date = pd.to_datetime(df.ann_date, utc=False)608 # df.f_ann_date = pd.to_datetime(df.f_ann_date, utc=False)609 if sheet_type == "balancesheet":610 return df.groupby("code").apply(lambda x: x.iloc[0])611 return df.groupby("code").apply(_trans_financial_type).unstack()612def QA_fetch_stock_basic(613 code: Union[str, List, Tuple] = None,614 status: Union[str, List, Tuple] = 'L') -> pd.DataFrame:615 """è·åè¡ç¥¨åºæ¬ä¿¡æ¯616 Args:617 code (Union[str, List, Tuple], optional): è¡ç¥¨ä»£ç æå表ï¼é»è®¤ä¸º Noneï¼è·åå
¨é¨è¡ç¥¨618 status (Union[str, List, Tuple], optional): è¡ç¥¨ç¶æ, é»è®¤ä¸º 'L', å³ä»å¨ä¸å¸çè¡ç¥¨ï¼å¦æ为 Noneï¼ åè¿åææç¶æè¡ç¥¨619 Returns:620 pd.DataFrame: è¡ç¥¨åºæ¬ä¿¡æ¯621 """622 coll = DATABASE.stock_basic623 if isinstance(code, str):624 code = (code,)625 if isinstance(status, str):626 status = (status,)627 qry = {}628 if not status:629 if not code:630 qry = {}631 else:632 qry = {633 "code": {634 "$in": code635 }636 }637 else:638 if not code:639 qry = {640 "status": {641 "$in": status642 }643 }644 else:645 qry = {646 "code": {647 "$in": code648 },649 "status": {650 "$in": status651 }652 }653 cursor = coll.find(qry)654 res = pd.DataFrame(cursor)655 if res.empty:656 return res657 else:658 res.list_date = pd.to_datetime(res.list_date, utc=False)659 return res.drop(columns="_id").set_index("code")660def QA_fetch_stock_name(661 code: Union[str, List, Tuple] = None,662 cursor_date: Union[str, datetime.datetime, pd.Timestamp] = None663) -> pd.DataFrame:664 """è·åè¡ç¥¨åå²æ¾ç¨å665 Args:666 code (Union[str, List, Tuple], optional): è¡ç¥¨ä»£ç æå表ï¼é»è®¤ä¸º Noneï¼æ¥è¯¢ææè¡ç¥¨.667 cursor (Union[str, datetime.datetime, pd.Timestamp], optional): æªæ¢æ¶é´ï¼è¡ç¥¨å称è·ç¦» cursor_date æè¿çåå 668 Returns:669 pd.DataFrame: è¡ç¥¨åå²æ¾ç¨å670 """671 coll = DATABASE.namechange672 if isinstance(code, str):673 code = [code]674 qry = {}675 if not code:676 if not cursor_date:677 qry = {}678 else:679 qry = {680 "start_date_stamp": {681 "$lte": QA_util_date_stamp(cursor_date)682 },683 "end_date_stamp": {684 "$gte": QA_util_date_stamp(cursor_date)685 }686 }687 else:688 if not cursor_date:689 qry = {690 "code": {691 "$in": code692 }693 }694 else:695 qry = {696 "code": {697 "$in": code698 },699 "start_date_stamp": {700 "$lte": QA_util_date_stamp(cursor_date)701 },702 "end_date_stamp": {703 "$gte": QA_util_date_stamp(cursor_date)704 }705 }706 cursor = coll.find(qry)707 res = pd.DataFrame(cursor)708 if res.empty:709 return res710 else:711 res.start_date = pd.to_datetime(res.start_date, utc=False)712 res.end_date = pd.to_datetime(res.end_date, utc=False)713 return res.drop(columns="_id").set_index("code").sort_values(by="start_date_stamp").drop_duplicates(keep="last").sort_index()714def QA_fetch_industry_adv(715 code: Union[str, List, Tuple] = None,716 cursor_date: Union[str, datetime.datetime] = None,717 start: Union[str, datetime.datetime] = None,718 end: Union[str, datetime.datetime] = None,719 levels: Union[str, List, Tuple] = None,720 src: str = "sw"721) -> pd.DataFrame:722 """æ¬å°è·åæå®è¡ç¥¨æè¡ç¥¨å表çè¡ä¸723 Args:724 code (Union[str, List, Tuple], optional): è¡ç¥¨ä»£ç æå表ï¼é»è®¤ä¸º None, æ¥è¯¢ææè¡ç¥¨ä»£ç .725 cursor_date (Union[str, datetime.datetime], optional): ä¸è¬æè°ä»æ¥ï¼æ¤æ¶ä¸éè¦å设置 start ä¸ end726 start(Union[str, datetime.datetime], optional): èµ·å§æ¶é´ï¼é»è®¤ä¸º None.727 end(Union[str, datetime.datetime], optional): æªæ¢æ¶é´, é»è®¤ä¸º None.728 levels (Union[str, List, Tuple], optional): [description]. 对åºè¡ä¸å级级å«ï¼é»è®¤ä¸º Noneï¼æ¥è¯¢ææè¡ä¸å级æ°æ®729 src (str, optional): å级æ¥æºï¼é»è®¤ä¸º "sw"(ç®åä»
æ¯æç³ä¸è¡ä¸åç±»).730 Returns:731 pd.DataFrame: è¡ä¸ä¿¡æ¯732 """733 coll = DATABASE.industry734 if not code:735 code = QA_fetch_stock_list().index.tolist()736 if isinstance(code, str):737 code = [code]738 if isinstance(levels, str):739 levels = [levels, ]740 if not levels:741 levels = ["l1", "l2", "l3"]742 levels = list(map(lambda x: x.lower(), levels))743 df_tmp = pd.DataFrame()744 if not cursor_date:745 if not start:746 qry = {747 "code": {748 "$in": code749 },750 "level": {751 "$in": levels752 },753 "src": src.lower()754 }755 else:756 qry = {757 "code": {758 "$in": code759 },760 "level": {761 "$in": levels762 },763 "src": src.lower(),764 "in_date_stamp": {765 "$lte": QA_util_date_stamp(pd.Timestamp(start).strftime("%Y-%m-%d"))766 }767 }768 if coll.count_documents(filter=qry) < 1:769 print("æ¾ä¸å°å¯¹åºè¡ä¸æ°æ®")770 return pd.DataFrame()771 cursor = coll.find(qry)772 df_tmp = pd.DataFrame(cursor).drop(columns="_id")773 if end:774 df_tmp = df_tmp.loc[df_tmp.out_date_stamp > QA_util_date_stamp(775 pd.Timestamp(end).strftime("%Y-%m-%d"))]776 else:777 qry = {778 "code": {779 "$in": code780 },781 "level": {782 "$in": levels783 },784 "src": src.lower(),785 "in_date_stamp": {786 "$lte": QA_util_date_stamp(pd.Timestamp(cursor_date).strftime("%Y-%m-%d"))787 }788 }789 if coll.count_documents(filter=qry) < 1:790 print("æ¾ä¸å°å¯¹åºè¡ä¸æ°æ®")791 return pd.DataFrame()792 cursor = coll.find(qry)793 df_tmp = pd.DataFrame(cursor).drop(columns="_id")794 df_tmp.loc[df_tmp.out_date_stamp > QA_util_date_stamp(795 pd.Timestamp(cursor_date).strftime("%Y-%m-%d"))]796 df_tmp.in_date = pd.to_datetime(df_tmp.in_date, utc=False)797 df_tmp.out_date = pd.to_datetime(df_tmp.out_date, utc=False)798 return df_tmp.drop(columns=["in_date_stamp", "out_date_stamp"])799def QA_fetch_daily_basic(800 code: Union[str, List, Tuple] = None,801 start: Union[str, pd.Timestamp, datetime.datetime] = None,802 end: Union[str, pd.Timestamp, datetime.datetime] = None,803 cursor_date: Union[str, pd.Timestamp, datetime.datetime] = None,804 fields: Union[str, Tuple, List]= None805) -> pd.DataFrame:806 """è·åå
¨é¨è¡ç¥¨æ¯æ¥éè¦çåºæ¬é¢ææ ï¼å¯ç¨äºéè¡åæãæ¥è¡¨å±ç¤ºç807 Args:808 code (Union[str, List, Tuple], optional): æå®è¡ç¥¨ä»£ç æå表, é»è®¤ä¸º Noneï¼è·åå
¨å¸åº809 start (Union[str, pd.Timestamp, datetime.datetime], optional): èµ·å§æ¥æï¼é»è®¤ä¸º None810 end (Union[str, pd.Timestamp, datetime.datetime], optional): ç»ææ¥æï¼é»è®¤ä¸º None811 cursor_date (Union[str, pd.Timestamp, datetime.datetime], optional): æå®æ¥æï¼ä¸ start å end å²çªï¼åªè½éæ© cursor_date812 æè
start, end813 fields (Union[str, Tuple, List], optional): æå® fields814 Returns:815 pd.DataFrame: 以æ¥æï¼è¡ç¥¨å为 Multiindex çåºæ¬ä¿¡æ¯816 """817 if isinstance(code, str):818 code = (code,)819 if not code:820 if (not start) and (not cursor_date):821 raise ValueError(822 "[ERROR]\tstart and end and cursor_date cannot all be none!")823 if not cursor_date:824 if not end:825 end_stamp = QA_util_date_stamp(datetime.date.today())826 else:827 end_stamp = QA_util_date_stamp(end)828 start_stamp = QA_util_date_stamp(start)829 qry = {830 "trade_date_stamp": {831 "$gte": start_stamp,832 "$lte": end_stamp833 }834 }835 else:836 real_trade_date = QA_util_get_real_date(cursor_date)837 trade_date_stamp = QA_util_date_stamp(real_trade_date)838 qry = {839 "trade_date_stamp": trade_date_stamp840 }841 else:842 if (not start) and (not cursor_date):843 raise ValueError(844 "[ERROR]\tstart and end and cursor_date cannot all be none!")845 if not cursor_date:846 if not end:847 end_stamp = QA_util_date_stamp(datetime.date.today())848 else:849 end_stamp = QA_util_date_stamp(end)850 start_stamp = QA_util_date_stamp(start)851 qry = {852 "code": {853 "$in": code854 },855 "trade_date_stamp": {856 "$gte": start_stamp,857 "$lte": end_stamp858 }859 }860 else:861 real_trade_date = QA_util_get_real_date(cursor_date)862 trade_date_stamp = QA_util_date_stamp(real_trade_date)863 qry = {864 "code": {865 "$in": code866 },867 "trade_date_stamp": trade_date_stamp868 }869 coll = DATABASE.daily_basic870 cursor = coll.find(qry)871 df = pd.DataFrame(cursor)872 if df.empty:873 return df874 df = df.rename(columns={"trade_date": "date"}).drop(875 columns="_id")876 df.date = pd.to_datetime(df.date, utc=False)877 df = df.set_index(["date", "code"]).sort_index()878 if not fields:879 return df880 return df[fields]881if __name__ == "__main__":882 # print(QA_fetch_get_individual_financial(883 # "000001", "2020-01-01", "2020-12-31"))884 # print(QA_fetch_get_individual_financial(885 # "000001", report_date="2020-03-31", fields="basic_eps"))886 # print(QA_fetch_get_crosssection_financial('2020-03-31'))887 # print(QA_fetch_crosssection_financial("2020-03-31", fields="basic_eps"))888 # df = QA_fetch_financial_adv(start="2018-06-30", end="2018-09-30")889 # print(df.loc['000528', ["report_date", "f_ann_date",890 # "ann_date", "basic_eps", "report_type", "update_flag", "report_label"]])891 # print(df)892 # print(QA_fetch_stock_basic(status="D"))893 # æè¿è´¢å¡æ°æ®è·åæµè¯894 # print(QA_fetch_last_financial(895 # code="000596", cursor_date="2020-10-08"))896 # print(QA_fetch_last_financial(897 # code=QA_fetch_stock_list().index.tolist(), cursor_date="2020-10-08"))898 # print(QA_fetch_last_financial(899 # code = '000001', cursor_date = '2020-10-08'900 # ))901 code = QA_fetch_stock_list().index.tolist()902 cursor_date = '2020-10-08'903 df_origin = QA_fetch_last_financial(code = code, cursor_date = cursor_date, sheet_type = "balancesheet")904 # print(QA_fetch_last_financial(905 # cursor_date="2018-08-31"))906 # print(QA_fetch_last_financial(907 # cursor_date="2018-08-31", code=["000528"], fields=["report_date", "ann_date", "f_ann_date", "update_flag"]))908 # print(QA_fetch_financial_adv(909 # cursor_date="2018-08-31"))910 # è¡ç¥¨åºæ¬ä¿¡æ¯è·åæµè¯911 # print(QA_fetch_stock_basic("000001"))912 # print(QA_fetch_stock_basic(status=["P", "D"]))913 # è¡ä¸è·åæµè¯914 # print(QA_fetch_industry_adv(start="1998-01-01", end="2020-12-02").head())915 # print(QA_fetch_industry_adv(["000001", "600000"],916 # start="1998-01-01", end="2020-12-02"))917 # print(QA_fetch_industry_adv(918 # ["000001", "600000"], cursor_date="2020-12-02"))919 # print(QA_fetch_stock_name(920 # code=['000001', '000002'], cursor_date="20081009"))...
upload.py
Source:upload.py
1import os2import luigi3import yaml4from morgoth.balrog_handlers import ProcessFitResults5from morgoth.plots import (6 Create3DLocationPlot,7 CreateBalrogSwiftPlot,8 CreateCornerPlot,9 CreateLightcurve,10 CreateLocationPlot,11 CreateMollLocationPlot,12 CreateSatellitePlot,13 CreateSpectrumPlot14)15from morgoth.data_files import (16 CreateHealpixSysErr,17 CreateHealpix18)19from morgoth.configuration import morgoth_config20from morgoth.utils.file_utils import if_dir_containing_file_not_existing_then_make21from morgoth.utils.env import get_env_value22from morgoth.utils.upload_utils import upload_grb_report, upload_plot, upload_datafile23base_dir = get_env_value("GBM_TRIGGER_DATA_DIR")24class UploadReport(luigi.Task):25 grb_name = luigi.Parameter()26 report_type = luigi.Parameter()27 version = luigi.Parameter()28 def requires(self):29 return ProcessFitResults(30 grb_name=self.grb_name, report_type=self.report_type, version=self.version31 )32 def output(self):33 filename = f"{self.report_type}_{self.version}_report.yml"34 return luigi.LocalTarget(35 os.path.join(36 base_dir, self.grb_name, self.report_type, self.version, filename37 )38 )39 def run(self):40 with self.input()["result_file"].open() as f:41 result = yaml.safe_load(f)42 report = upload_grb_report(43 grb_name=self.grb_name,44 result=result,45 wait_time=float(46 morgoth_config["upload"]["report"]["interval"]47 ),48 max_time=float(49 morgoth_config["upload"]["report"]["max_time"]50 ),51 )52 report_name = f"{self.report_type}_{self.version}_report.yml"53 report_path = os.path.join(54 base_dir, self.grb_name, self.report_type, self.version, report_name55 )56 with open(report_path, "w") as f:57 yaml.dump(report, f, default_flow_style=False)58class UploadAllDataFiles(luigi.Task):59 grb_name = luigi.Parameter()60 report_type = luigi.Parameter()61 version = luigi.Parameter(default="v00")62 63 def requires(self):64 return {65 "healpix": UploadHealpix(66 grb_name=self.grb_name,67 report_type=self.report_type,68 version=self.version,69 ),70 "healpixSysErr": UploadHealpixSysErr(71 grb_name=self.grb_name,72 report_type=self.report_type,73 version=self.version,74 ),75 }76 def output(self):77 filename = f"{self.report_type}_{self.version}_upload_datafiles.done"78 return luigi.LocalTarget(79 os.path.join(80 base_dir,81 self.grb_name,82 self.report_type,83 self.version,84 "upload",85 filename,86 )87 )88 def run(self):89 filename = f"{self.report_type}_{self.version}_upload_datafiles.done"90 tmp = os.path.join(91 base_dir, self.grb_name, self.report_type, self.version, "upload", filename92 )93 if_dir_containing_file_not_existing_then_make(tmp)94 os.system(f"touch {tmp}")95class UploadHealpix(luigi.Task):96 grb_name = luigi.Parameter()97 report_type = luigi.Parameter()98 version = luigi.Parameter(default="v00")99 def requires(self):100 return {101 "create_report": UploadReport(102 grb_name=self.grb_name,103 report_type=self.report_type,104 version=self.version,105 ),106 "data_file": CreateHealpix(107 grb_name=self.grb_name,108 report_type=self.report_type,109 version=self.version,110 ),111 }112 def output(self):113 filename = f"{self.report_type}_{self.version}_upload_healpix.done"114 return luigi.LocalTarget(115 os.path.join(116 base_dir,117 self.grb_name,118 self.report_type,119 self.version,120 "upload",121 filename,122 )123 )124 def run(self):125 126 upload_datafile(127 grb_name=self.grb_name,128 report_type=self.report_type,129 data_file=self.input()["data_file"].path,130 file_type="healpix",131 version=self.version,132 wait_time=float(133 morgoth_config["upload"]["plot"]["interval"]134 ),135 max_time=float(136 morgoth_config["upload"]["plot"]["max_time"]137 ),138 )139 if_dir_containing_file_not_existing_then_make(self.output().path)140 os.system(f"touch {self.output().path}")141class UploadHealpixSysErr(luigi.Task):142 grb_name = luigi.Parameter()143 report_type = luigi.Parameter()144 version = luigi.Parameter(default="v00")145 def requires(self):146 return {147 "create_report": UploadReport(148 grb_name=self.grb_name,149 report_type=self.report_type,150 version=self.version,151 ),152 "data_file": CreateHealpixSysErr(153 grb_name=self.grb_name,154 report_type=self.report_type,155 version=self.version,156 ),157 }158 def output(self):159 filename = f"{self.report_type}_{self.version}_upload_healpixsyserr.done"160 return luigi.LocalTarget(161 os.path.join(162 base_dir,163 self.grb_name,164 self.report_type,165 self.version,166 "upload",167 filename,168 )169 )170 def run(self):171 upload_datafile(172 grb_name=self.grb_name,173 report_type=self.report_type,174 data_file=self.input()["data_file"].path,175 file_type="healpixSysErr",176 version=self.version,177 wait_time=float(178 morgoth_config["upload"]["plot"]["interval"]179 ),180 max_time=float(181 morgoth_config["upload"]["plot"]["max_time"]182 ),183 )184 filename = f"{self.report_type}_{self.version}_upload_plot_location.done"185 tmp = os.path.join(186 base_dir, self.grb_name, self.report_type, self.version, "upload", filename187 )188 if_dir_containing_file_not_existing_then_make(self.output().path)189 os.system(f"touch {self.output().path}")190class UploadAllPlots(luigi.Task):191 grb_name = luigi.Parameter()192 report_type = luigi.Parameter()193 version = luigi.Parameter(default="v00")194 def requires(self):195 return {196 "lightcurves": UploadAllLightcurves(197 grb_name=self.grb_name,198 report_type=self.report_type,199 version=self.version,200 ),201 "location": UploadLocationPlot(202 grb_name=self.grb_name,203 report_type=self.report_type,204 version=self.version,205 ),206 "corner": UploadCornerPlot(207 grb_name=self.grb_name,208 report_type=self.report_type,209 version=self.version,210 ),211 "molllocation": UploadMollLocationPlot(212 grb_name=self.grb_name,213 report_type=self.report_type,214 version=self.version,215 ),216 "satellite": UploadSatellitePlot(217 grb_name=self.grb_name,218 report_type=self.report_type,219 version=self.version,220 ),221 "spectrum": UploadSpectrumPlot(222 grb_name=self.grb_name,223 report_type=self.report_type,224 version=self.version,225 ),226 "3d_location": Upload3DLocationPlot(227 grb_name=self.grb_name,228 report_type=self.report_type,229 version=self.version,230 ),231 "balrogswift": UploadBalrogSwiftPlot(232 grb_name=self.grb_name,233 report_type=self.report_type,234 version=self.version,235 ),236 }237 def output(self):238 filename = f"{self.report_type}_{self.version}_upload_plot_all.done"239 return luigi.LocalTarget(240 os.path.join(241 base_dir,242 self.grb_name,243 self.report_type,244 self.version,245 "upload",246 filename,247 )248 )249 def run(self):250 filename = f"{self.report_type}_{self.version}_upload_plot_all.done"251 tmp = os.path.join(252 base_dir, self.grb_name, self.report_type, self.version, "upload", filename253 )254 if_dir_containing_file_not_existing_then_make(tmp)255 os.system(f"touch {tmp}")256class UploadAllLightcurves(luigi.Task):257 grb_name = luigi.Parameter()258 report_type = luigi.Parameter()259 version = luigi.Parameter(default="v00")260 def requires(self):261 return {262 "n0": UploadLightcurve(263 grb_name=self.grb_name,264 report_type=self.report_type,265 detector="n0",266 version=self.version,267 ),268 "n1": UploadLightcurve(269 grb_name=self.grb_name,270 report_type=self.report_type,271 detector="n1",272 version=self.version,273 ),274 "n2": UploadLightcurve(275 grb_name=self.grb_name,276 report_type=self.report_type,277 detector="n2",278 version=self.version,279 ),280 "n3": UploadLightcurve(281 grb_name=self.grb_name,282 report_type=self.report_type,283 detector="n3",284 version=self.version,285 ),286 "n4": UploadLightcurve(287 grb_name=self.grb_name,288 report_type=self.report_type,289 detector="n4",290 version=self.version,291 ),292 "n5": UploadLightcurve(293 grb_name=self.grb_name,294 report_type=self.report_type,295 detector="n5",296 version=self.version,297 ),298 "n6": UploadLightcurve(299 grb_name=self.grb_name,300 report_type=self.report_type,301 detector="n6",302 version=self.version,303 ),304 "n7": UploadLightcurve(305 grb_name=self.grb_name,306 report_type=self.report_type,307 detector="n7",308 version=self.version,309 ),310 "n8": UploadLightcurve(311 grb_name=self.grb_name,312 report_type=self.report_type,313 detector="n8",314 version=self.version,315 ),316 "n9": UploadLightcurve(317 grb_name=self.grb_name,318 report_type=self.report_type,319 detector="n9",320 version=self.version,321 ),322 "na": UploadLightcurve(323 grb_name=self.grb_name,324 report_type=self.report_type,325 detector="na",326 version=self.version,327 ),328 "nb": UploadLightcurve(329 grb_name=self.grb_name,330 report_type=self.report_type,331 detector="nb",332 version=self.version,333 ),334 "b0": UploadLightcurve(335 grb_name=self.grb_name,336 report_type=self.report_type,337 detector="b0",338 version=self.version,339 ),340 "b1": UploadLightcurve(341 grb_name=self.grb_name,342 report_type=self.report_type,343 detector="b1",344 version=self.version,345 ),346 }347 def output(self):348 filename = f"{self.report_type}_{self.version}_upload_plot_all_lightcurves.done"349 return luigi.LocalTarget(350 os.path.join(351 base_dir,352 self.grb_name,353 self.report_type,354 self.version,355 "upload",356 filename,357 )358 )359 def run(self):360 filename = f"{self.report_type}_{self.version}_upload_plot_all_lightcurves.done"361 tmp = os.path.join(362 base_dir, self.grb_name, self.report_type, self.version, "upload", filename363 )364 if_dir_containing_file_not_existing_then_make(tmp)365 os.system(f"touch {tmp}")366class UploadLightcurve(luigi.Task):367 grb_name = luigi.Parameter()368 report_type = luigi.Parameter()369 detector = luigi.Parameter()370 version = luigi.Parameter(default="v00")371 def requires(self):372 return {373 "create_report": UploadReport(374 grb_name=self.grb_name,375 report_type=self.report_type,376 version=self.version,377 ),378 "plot_file": CreateLightcurve(379 grb_name=self.grb_name,380 report_type=self.report_type,381 detector=self.detector,382 version=self.version,383 ),384 }385 def output(self):386 filename = f"{self.report_type}_{self.version}_{self.detector}_upload_plot_lightcurve.done"387 return luigi.LocalTarget(388 os.path.join(389 base_dir,390 self.grb_name,391 self.report_type,392 self.version,393 "upload",394 filename,395 )396 )397 def run(self):398 upload_plot(399 grb_name=self.grb_name,400 report_type=self.report_type,401 plot_file=self.input()["plot_file"].path,402 plot_type="lightcurve",403 version=self.version,404 wait_time=float(405 morgoth_config["upload"]["plot"]["interval"]406 ),407 max_time=float(408 morgoth_config["upload"]["plot"]["max_time"]409 ),410 det_name=self.detector,411 )412 filename = f"{self.report_type}_{self.version}_{self.detector}_upload_plot_lightcurve.done"413 tmp = os.path.join(414 base_dir, self.grb_name, self.report_type, self.version, "upload", filename415 )416 if_dir_containing_file_not_existing_then_make(tmp)417 os.system(f"touch {tmp}")418class UploadLocationPlot(luigi.Task):419 grb_name = luigi.Parameter()420 report_type = luigi.Parameter()421 version = luigi.Parameter(default="v00")422 def requires(self):423 return {424 "create_report": UploadReport(425 grb_name=self.grb_name,426 report_type=self.report_type,427 version=self.version,428 ),429 "plot_file": CreateLocationPlot(430 grb_name=self.grb_name,431 report_type=self.report_type,432 version=self.version,433 ),434 }435 def output(self):436 filename = f"{self.report_type}_{self.version}_upload_plot_location.done"437 return luigi.LocalTarget(438 os.path.join(439 base_dir,440 self.grb_name,441 self.report_type,442 self.version,443 "upload",444 filename,445 )446 )447 def run(self):448 upload_plot(449 grb_name=self.grb_name,450 report_type=self.report_type,451 plot_file=self.input()["plot_file"].path,452 plot_type="location",453 version=self.version,454 wait_time=float(455 morgoth_config["upload"]["plot"]["interval"]456 ),457 max_time=float(458 morgoth_config["upload"]["plot"]["max_time"]459 ),460 )461 filename = f"{self.report_type}_{self.version}_upload_plot_location.done"462 tmp = os.path.join(463 base_dir, self.grb_name, self.report_type, self.version, "upload", filename464 )465 if_dir_containing_file_not_existing_then_make(tmp)466 os.system(f"touch {tmp}")467class UploadCornerPlot(luigi.Task):468 grb_name = luigi.Parameter()469 report_type = luigi.Parameter()470 version = luigi.Parameter(default="v00")471 def requires(self):472 return {473 "create_report": UploadReport(474 grb_name=self.grb_name,475 report_type=self.report_type,476 version=self.version,477 ),478 "plot_file": CreateCornerPlot(479 grb_name=self.grb_name,480 report_type=self.report_type,481 version=self.version,482 ),483 }484 def output(self):485 filename = f"{self.report_type}_{self.version}_upload_plot_corner.done"486 return luigi.LocalTarget(487 os.path.join(488 base_dir,489 self.grb_name,490 self.report_type,491 self.version,492 "upload",493 filename,494 )495 )496 def run(self):497 upload_plot(498 grb_name=self.grb_name,499 report_type=self.report_type,500 plot_file=self.input()["plot_file"].path,501 plot_type="allcorner",502 version=self.version,503 wait_time=float(504 morgoth_config["upload"]["plot"]["interval"]505 ),506 max_time=float(507 morgoth_config["upload"]["plot"]["max_time"]508 ),509 )510 filename = f"{self.report_type}_{self.version}_upload_plot_corner.done"511 tmp = os.path.join(512 base_dir, self.grb_name, self.report_type, self.version, "upload", filename513 )514 if_dir_containing_file_not_existing_then_make(tmp)515 os.system(f"touch {tmp}")516class UploadMollLocationPlot(luigi.Task):517 grb_name = luigi.Parameter()518 report_type = luigi.Parameter()519 version = luigi.Parameter(default="v00")520 def requires(self):521 return {522 "create_report": UploadReport(523 grb_name=self.grb_name,524 report_type=self.report_type,525 version=self.version,526 ),527 "plot_file": CreateMollLocationPlot(528 grb_name=self.grb_name,529 report_type=self.report_type,530 version=self.version,531 ),532 }533 def output(self):534 filename = f"{self.report_type}_{self.version}_upload_plot_molllocation.done"535 return luigi.LocalTarget(536 os.path.join(537 base_dir,538 self.grb_name,539 self.report_type,540 self.version,541 "upload",542 filename,543 )544 )545 def run(self):546 upload_plot(547 grb_name=self.grb_name,548 report_type=self.report_type,549 plot_file=self.input()["plot_file"].path,550 plot_type="molllocation",551 version=self.version,552 wait_time=float(553 morgoth_config["upload"]["plot"]["interval"]554 ),555 max_time=float(556 morgoth_config["upload"]["plot"]["max_time"]557 ),558 )559 filename = f"{self.report_type}_{self.version}_upload_plot_molllocation.done"560 tmp = os.path.join(561 base_dir, self.grb_name, self.report_type, self.version, "upload", filename562 )563 if_dir_containing_file_not_existing_then_make(tmp)564 os.system(f"touch {tmp}")565class UploadSatellitePlot(luigi.Task):566 grb_name = luigi.Parameter()567 report_type = luigi.Parameter()568 version = luigi.Parameter(default="v00")569 def requires(self):570 return {571 "create_report": UploadReport(572 grb_name=self.grb_name,573 report_type=self.report_type,574 version=self.version,575 ),576 "plot_file": CreateSatellitePlot(577 grb_name=self.grb_name,578 report_type=self.report_type,579 version=self.version,580 ),581 }582 def output(self):583 filename = f"{self.report_type}_{self.version}_upload_plot_satellite.done"584 return luigi.LocalTarget(585 os.path.join(586 base_dir,587 self.grb_name,588 self.report_type,589 self.version,590 "upload",591 filename,592 )593 )594 def run(self):595 upload_plot(596 grb_name=self.grb_name,597 report_type=self.report_type,598 plot_file=self.input()["plot_file"].path,599 plot_type="satellite",600 version=self.version,601 wait_time=float(602 morgoth_config["upload"]["plot"]["interval"]603 ),604 max_time=float(605 morgoth_config["upload"]["plot"]["max_time"]606 ),607 )608 filename = f"{self.report_type}_{self.version}_upload_plot_satellite.done"609 tmp = os.path.join(610 base_dir, self.grb_name, self.report_type, self.version, "upload", filename611 )612 if_dir_containing_file_not_existing_then_make(tmp)613 os.system(f"touch {tmp}")614class UploadSpectrumPlot(luigi.Task):615 grb_name = luigi.Parameter()616 report_type = luigi.Parameter()617 version = luigi.Parameter(default="v00")618 def requires(self):619 return {620 "create_report": UploadReport(621 grb_name=self.grb_name,622 report_type=self.report_type,623 version=self.version,624 ),625 "plot_file": CreateSpectrumPlot(626 grb_name=self.grb_name,627 report_type=self.report_type,628 version=self.version,629 ),630 }631 def output(self):632 filename = f"{self.report_type}_{self.version}_upload_plot_spectrum.done"633 return luigi.LocalTarget(634 os.path.join(635 base_dir,636 self.grb_name,637 self.report_type,638 self.version,639 "upload",640 filename,641 )642 )643 def run(self):644 upload_plot(645 grb_name=self.grb_name,646 report_type=self.report_type,647 plot_file=self.input()["plot_file"].path,648 plot_type="spectrum",649 version=self.version,650 wait_time=float(651 morgoth_config["upload"]["plot"]["interval"]652 ),653 max_time=float(654 morgoth_config["upload"]["plot"]["max_time"]655 ),656 )657 filename = f"{self.report_type}_{self.version}_upload_plot_spectrum.done"658 tmp = os.path.join(659 base_dir, self.grb_name, self.report_type, self.version, "upload", filename660 )661 if_dir_containing_file_not_existing_then_make(tmp)662 os.system(f"touch {tmp}")663class Upload3DLocationPlot(luigi.Task):664 grb_name = luigi.Parameter()665 report_type = luigi.Parameter()666 version = luigi.Parameter(default="v00")667 def requires(self):668 return {669 "create_report": UploadReport(670 grb_name=self.grb_name,671 report_type=self.report_type,672 version=self.version,673 ),674 "plot_file": Create3DLocationPlot(675 grb_name=self.grb_name,676 report_type=self.report_type,677 version=self.version,678 ),679 }680 def output(self):681 filename = f"{self.report_type}_{self.version}_upload_plot_3dlocation.done"682 return luigi.LocalTarget(683 os.path.join(684 base_dir,685 self.grb_name,686 self.report_type,687 self.version,688 "upload",689 filename,690 )691 )692 def run(self):693 upload_plot(694 grb_name=self.grb_name,695 report_type=self.report_type,696 plot_file=self.input()["plot_file"].path,697 plot_type="3dlocation",698 version=self.version,699 wait_time=float(700 morgoth_config["upload"]["plot"]["interval"]701 ),702 max_time=float(703 morgoth_config["upload"]["plot"]["max_time"]704 ),705 )706 filename = f"{self.report_type}_{self.version}_upload_plot_3dlocation.done"707 tmp = os.path.join(708 base_dir, self.grb_name, self.report_type, self.version, "upload", filename709 )710 if_dir_containing_file_not_existing_then_make(tmp)711 os.system(f"touch {tmp}")712class UploadBalrogSwiftPlot(luigi.Task):713 grb_name = luigi.Parameter()714 report_type = luigi.Parameter()715 version = luigi.Parameter(default="v00")716 def requires(self):717 return {718 "create_report": UploadReport(719 grb_name=self.grb_name,720 report_type=self.report_type,721 version=self.version,722 ),723 "plot_file": CreateBalrogSwiftPlot(724 grb_name=self.grb_name,725 report_type=self.report_type,726 version=self.version,727 ),728 }729 def output(self):730 filename = f"{self.report_type}_{self.version}_upload_plot_balrogswift.done"731 return luigi.LocalTarget(732 os.path.join(733 base_dir,734 self.grb_name,735 self.report_type,736 self.version,737 "upload",738 filename,739 )740 )741 def run(self):742 upload_plot(743 grb_name=self.grb_name,744 report_type=self.report_type,745 plot_file=self.input()["plot_file"].path,746 plot_type="balrogswift",747 version=self.version,748 wait_time=float(749 morgoth_config["upload"]["plot"]["interval"]750 ),751 max_time=float(752 morgoth_config["upload"]["plot"]["max_time"]753 ),754 )755 filename = f"{self.report_type}_{self.version}_upload_plot_balrogswift.done"756 tmp = os.path.join(757 base_dir, self.grb_name, self.report_type, self.version, "upload", filename758 )759 if_dir_containing_file_not_existing_then_make(tmp)...
plots.py
Source:plots.py
1import os2import luigi3import yaml4from morgoth.balrog_handlers import ProcessFitResults5from morgoth.downloaders import DownloadTrigdat, GatherTrigdatDownload6from morgoth.exceptions.custom_exceptions import UnkownReportType7from morgoth.utils.env import get_env_value8from morgoth.utils.plot_utils import (9 azimuthal_plot_sat_frame,10 create_corner_all_plot,11 create_corner_loc_plot,12 interactive_3D_plot,13 mollweide_plot,14 swift_gbm_plot,15)16base_dir = get_env_value("GBM_TRIGGER_DATA_DIR")17class CreateAllPlots(luigi.Task):18 grb_name = luigi.Parameter()19 report_type = luigi.Parameter()20 version = luigi.Parameter(default="v00")21 def requires(self):22 return {23 "lightcurves": CreateAllLightcurves(24 grb_name=self.grb_name,25 report_type=self.report_type,26 version=self.version,27 ),28 "location": CreateLocationPlot(29 grb_name=self.grb_name,30 report_type=self.report_type,31 version=self.version,32 ),33 "corner": CreateCornerPlot(34 grb_name=self.grb_name,35 report_type=self.report_type,36 version=self.version,37 ),38 "molllocation": CreateMollLocationPlot(39 grb_name=self.grb_name,40 report_type=self.report_type,41 version=self.version,42 ),43 "satellite": CreateSatellitePlot(44 grb_name=self.grb_name,45 report_type=self.report_type,46 version=self.version,47 ),48 "spectrum": CreateSpectrumPlot(49 grb_name=self.grb_name,50 report_type=self.report_type,51 version=self.version,52 ),53 "3dlocation": Create3DLocationPlot(54 grb_name=self.grb_name,55 report_type=self.report_type,56 version=self.version,57 ),58 "balrogswift": CreateBalrogSwiftPlot(59 grb_name=self.grb_name,60 report_type=self.report_type,61 version=self.version,62 ),63 }64 def output(self):65 filename = f"{self.report_type}_{self.version}_plot_all.txt"66 return luigi.LocalTarget(67 os.path.join(68 base_dir, self.grb_name, self.report_type, self.version, filename69 )70 )71 def run(self):72 filename = f"{self.report_type}_{self.version}_plot_all.txt"73 tmp = os.path.join(74 base_dir, self.grb_name, self.report_type, self.version, filename75 )76 os.system(f"touch {tmp}")77class CreateAllLightcurves(luigi.Task):78 grb_name = luigi.Parameter()79 report_type = luigi.Parameter()80 version = luigi.Parameter(default="v00")81 def requires(self):82 return {83 "n0": CreateLightcurve(84 grb_name=self.grb_name,85 report_type=self.report_type,86 detector="n0",87 version=self.version,88 ),89 "n1": CreateLightcurve(90 grb_name=self.grb_name,91 report_type=self.report_type,92 detector="n1",93 version=self.version,94 ),95 "n2": CreateLightcurve(96 grb_name=self.grb_name,97 report_type=self.report_type,98 detector="n2",99 version=self.version,100 ),101 "n3": CreateLightcurve(102 grb_name=self.grb_name,103 report_type=self.report_type,104 detector="n3",105 version=self.version,106 ),107 "n4": CreateLightcurve(108 grb_name=self.grb_name,109 report_type=self.report_type,110 detector="n4",111 version=self.version,112 ),113 "n5": CreateLightcurve(114 grb_name=self.grb_name,115 report_type=self.report_type,116 detector="n5",117 version=self.version,118 ),119 "n6": CreateLightcurve(120 grb_name=self.grb_name,121 report_type=self.report_type,122 detector="n6",123 version=self.version,124 ),125 "n7": CreateLightcurve(126 grb_name=self.grb_name,127 report_type=self.report_type,128 detector="n7",129 version=self.version,130 ),131 "n8": CreateLightcurve(132 grb_name=self.grb_name,133 report_type=self.report_type,134 detector="n8",135 version=self.version,136 ),137 "n9": CreateLightcurve(138 grb_name=self.grb_name,139 report_type=self.report_type,140 detector="n9",141 version=self.version,142 ),143 "na": CreateLightcurve(144 grb_name=self.grb_name,145 report_type=self.report_type,146 detector="na",147 version=self.version,148 ),149 "nb": CreateLightcurve(150 grb_name=self.grb_name,151 report_type=self.report_type,152 detector="nb",153 version=self.version,154 ),155 "b0": CreateLightcurve(156 grb_name=self.grb_name,157 report_type=self.report_type,158 detector="b0",159 version=self.version,160 ),161 "b1": CreateLightcurve(162 grb_name=self.grb_name,163 report_type=self.report_type,164 detector="b1",165 version=self.version,166 ),167 }168 def output(self):169 filename = f"{self.report_type}_{self.version}_plot_all_lightcurves.txt"170 return luigi.LocalTarget(171 os.path.join(172 base_dir, self.grb_name, self.report_type, self.version, filename173 )174 )175 def run(self):176 filename = f"{self.report_type}_{self.version}_plot_all_lightcurves.txt"177 tmp = os.path.join(178 base_dir, self.grb_name, self.report_type, self.version, filename179 )180 os.system(f"touch {tmp}")181class CreateLightcurve(luigi.Task):182 grb_name = luigi.Parameter()183 report_type = luigi.Parameter()184 detector = luigi.Parameter()185 version = luigi.Parameter(default="v00")186 def requires(self):187 return ProcessFitResults(188 grb_name=self.grb_name, report_type=self.report_type, version=self.version189 )190 def output(self):191 base_job = os.path.join(base_dir, self.grb_name, self.report_type, self.version)192 filename = f"{self.grb_name}_lightcurve_{self.report_type}_detector_{self.detector}_plot_{self.version}.png"193 return luigi.LocalTarget(194 os.path.join(base_job, "plots", "lightcurves", filename)195 )196 def run(self):197 # The lightcurve is created in the background fit Task, this task will check if the creation was successful198 pass199class CreateLocationPlot(luigi.Task):200 grb_name = luigi.Parameter()201 report_type = luigi.Parameter()202 version = luigi.Parameter(default="v00")203 def requires(self):204 return ProcessFitResults(205 grb_name=self.grb_name, report_type=self.report_type, version=self.version206 )207 def output(self):208 filename = (209 f"{self.grb_name}_location_plot_{self.report_type}_{self.version}.png"210 )211 return luigi.LocalTarget(212 os.path.join(213 base_dir,214 self.grb_name,215 self.report_type,216 self.version,217 "plots",218 filename,219 )220 )221 def run(self):222 with self.input()["result_file"].open() as f:223 result = yaml.safe_load(f)224 create_corner_loc_plot(225 post_equal_weights_file=self.input()["post_equal_weights"].path,226 model=result["fit_result"]["model"],227 save_path=self.output().path,228 )229class CreateCornerPlot(luigi.Task):230 grb_name = luigi.Parameter()231 report_type = luigi.Parameter()232 version = luigi.Parameter(default="v00")233 def requires(self):234 return ProcessFitResults(235 grb_name=self.grb_name, report_type=self.report_type, version=self.version236 )237 def output(self):238 filename = (239 f"{self.grb_name}_allcorner_plot_{self.report_type}_{self.version}.png"240 )241 return luigi.LocalTarget(242 os.path.join(243 base_dir,244 self.grb_name,245 self.report_type,246 self.version,247 "plots",248 filename,249 )250 )251 def run(self):252 with self.input()["result_file"].open() as f:253 result = yaml.safe_load(f)254 create_corner_all_plot(255 post_equal_weights_file=self.input()["post_equal_weights"].path,256 model=result["fit_result"]["model"],257 save_path=self.output().path,258 )259class CreateMollLocationPlot(luigi.Task):260 grb_name = luigi.Parameter()261 report_type = luigi.Parameter()262 version = luigi.Parameter(default="v00")263 def requires(self):264 return {265 "fit_result": ProcessFitResults(266 grb_name=self.grb_name,267 report_type=self.report_type,268 version=self.version,269 ),270 "trigdat_version": GatherTrigdatDownload(grb_name=self.grb_name),271 }272 def output(self):273 filename = (274 f"{base_dir}/{self.grb_name}/{self.report_type}/{self.version}/plots/"275 f"{self.grb_name}_molllocation_plot_{self.report_type}_{self.version}.png"276 )277 return luigi.LocalTarget(filename)278 def run(self):279 with self.input()["fit_result"]["result_file"].open() as f:280 result = yaml.safe_load(f)281 if self.report_type.lower() == "tte":282 with self.input()["trigdat_version"].open() as f:283 trigdat_version = yaml.safe_load(f)["trigdat_version"]284 trigdat_file = DownloadTrigdat(285 grb_name=self.grb_name, version=trigdat_version286 ).output()287 elif self.report_type.lower() == "trigdat":288 trigdat_file = DownloadTrigdat(289 grb_name=self.grb_name, version=self.version290 ).output()291 else:292 raise UnkownReportType(293 f"The report_type '{self.report_type}' is not valid!"294 )295 mollweide_plot(296 grb_name=self.grb_name,297 trigdat_file=trigdat_file.path,298 post_equal_weights_file=self.input()["fit_result"][299 "post_equal_weights"300 ].path,301 used_dets=result["time_selection"]["used_detectors"],302 model=result["fit_result"]["model"],303 ra=result["fit_result"]["ra"],304 dec=result["fit_result"]["dec"],305 swift=result["general"]["swift"],306 save_path=self.output().path,307 )308class CreateSatellitePlot(luigi.Task):309 grb_name = luigi.Parameter()310 report_type = luigi.Parameter()311 version = luigi.Parameter(default="v00")312 def requires(self):313 return {314 "fit_result": ProcessFitResults(315 grb_name=self.grb_name,316 report_type=self.report_type,317 version=self.version,318 ),319 "trigdat_version": GatherTrigdatDownload(grb_name=self.grb_name),320 }321 def output(self):322 filename = (323 f"{self.grb_name}_satellite_plot_{self.report_type}_{self.version}.png"324 )325 return luigi.LocalTarget(326 os.path.join(327 base_dir,328 self.grb_name,329 self.report_type,330 self.version,331 "plots",332 filename,333 )334 )335 def run(self):336 with self.input()["fit_result"]["result_file"].open() as f:337 result = yaml.safe_load(f)338 if self.report_type.lower() == "tte":339 with self.input()["trigdat_version"].open() as f:340 trigdat_version = yaml.safe_load(f)["trigdat_version"]341 trigdat_file = DownloadTrigdat(342 grb_name=self.grb_name, version=trigdat_version343 ).output()344 elif self.report_type.lower() == "trigdat":345 trigdat_file = DownloadTrigdat(346 grb_name=self.grb_name, version=self.version347 ).output()348 else:349 raise UnkownReportType(350 f"The report_type '{self.report_type}' is not valid!"351 )352 azimuthal_plot_sat_frame(353 grb_name=self.grb_name,354 trigdat_file=trigdat_file.path,355 ra=result["fit_result"]["ra"],356 dec=result["fit_result"]["dec"],357 save_path=self.output().path,358 )359class CreateSpectrumPlot(luigi.Task):360 grb_name = luigi.Parameter()361 report_type = luigi.Parameter()362 version = luigi.Parameter(default="v00")363 def requires(self):364 return ProcessFitResults(365 grb_name=self.grb_name, report_type=self.report_type, version=self.version366 )367 def output(self):368 filename = (369 f"{self.grb_name}_spectrum_plot_{self.report_type}_{self.version}.png"370 )371 return luigi.LocalTarget(372 os.path.join(373 base_dir,374 self.grb_name,375 self.report_type,376 self.version,377 "plots",378 filename,379 )380 )381 def run(self):382 # The spectrum plot is created in the balrog fit Task, this task will check if the creation was successful383 pass384class Create3DLocationPlot(luigi.Task):385 grb_name = luigi.Parameter()386 report_type = luigi.Parameter()387 version = luigi.Parameter(default="v00")388 def requires(self):389 return {390 "fit_result": ProcessFitResults(391 grb_name=self.grb_name,392 report_type=self.report_type,393 version=self.version,394 ),395 "trigdat_version": GatherTrigdatDownload(grb_name=self.grb_name),396 }397 def output(self):398 filename = (399 f"{self.grb_name}_3dlocation_plot_{self.report_type}_{self.version}.html"400 )401 return luigi.LocalTarget(402 os.path.join(403 base_dir,404 self.grb_name,405 self.report_type,406 self.version,407 "plots",408 filename,409 )410 )411 def run(self):412 with self.input()["fit_result"]["result_file"].open() as f:413 result = yaml.safe_load(f)414 if self.report_type.lower() == "tte":415 with self.input()["trigdat_version"].open() as f:416 trigdat_version = yaml.safe_load(f)["trigdat_version"]417 trigdat_file = DownloadTrigdat(418 grb_name=self.grb_name, version=trigdat_version419 ).output()420 elif self.report_type.lower() == "trigdat":421 trigdat_file = DownloadTrigdat(422 grb_name=self.grb_name, version=self.version423 ).output()424 else:425 raise UnkownReportType(426 f"The report_type '{self.report_type}' is not valid!"427 )428 interactive_3D_plot(429 trigdat_file=trigdat_file.path,430 post_equal_weights_file=self.input()["fit_result"][431 "post_equal_weights"432 ].path,433 used_dets=result["time_selection"]["used_detectors"],434 model=result["fit_result"]["model"],435 save_path=self.output().path,436 )437class CreateBalrogSwiftPlot(luigi.Task):438 grb_name = luigi.Parameter()439 report_type = luigi.Parameter()440 version = luigi.Parameter(default="v00")441 def requires(self):442 return ProcessFitResults(443 grb_name=self.grb_name, report_type=self.report_type, version=self.version444 )445 def output(self):446 filename = (447 f"{self.grb_name}_balrogswift_plot_{self.report_type}_{self.version}.png"448 )449 return luigi.LocalTarget(450 os.path.join(451 base_dir,452 self.grb_name,453 self.report_type,454 self.version,455 "plots",456 filename,457 )458 )459 def run(self):460 with self.input()["result_file"].open() as f:461 result = yaml.safe_load(f)462 swift_gbm_plot(463 grb_name=self.grb_name,464 post_equal_weights_file=self.input()["post_equal_weights"].path,465 model=result["fit_result"]["model"],466 ra=result["fit_result"]["ra"],467 dec=result["fit_result"]["dec"],468 swift=result["general"]["swift"],469 save_path=self.output().path,...
copy_report_type.py
Source:copy_report_type.py
1# -*- encoding: utf-8 -*-2from optparse import make_option3import copy4from django.core.management.base import BaseCommand5from accounts.models import Authority6from common.models import Domain7from reports.models import ReportType, ReportState, ReportTypeCategory, CaseDefinition8class Command(BaseCommand):9 args = '<from_domain> <to_domain> <report_type_id report_type_id ...>'10 help = 'Copy report types'11 option_list = BaseCommand.option_list + (12 make_option(13 '--from-authority',14 action='store',15 dest='from_authority',16 default=None,17 help='Specify origin authority'18 ),19 make_option(20 '--to-authority',21 action='store',22 dest='to_authority',23 default=None,24 help='Specify target authority'25 ),26 make_option(27 '--force',28 action='store_true',29 dest='force',30 default=False,31 help='Whether to force mode, default is dry_run mode'32 )33 )34 def _copy_report_states(self, from_domain, to_domain, from_report_type, to_report_type, dry_run=True):35 original_report_states = ReportState.objects.filter(domain=from_domain, report_type=from_report_type)36 for original_report_state in original_report_states:37 should_save_report_state = False38 try:39 report_state = ReportState.objects.get(domain=to_domain, report_type=to_report_type, code=original_report_state.code)40 if report_state.name != original_report_state.name:41 report_state.name = original_report_state.name42 should_save_report_state = True43 except ReportState.DoesNotExist:44 should_save_report_state = True45 report_state = copy.deepcopy(original_report_state)46 report_state.pk = None47 report_state.domain = to_domain48 report_state.report_type = to_report_type49 if should_save_report_state:50 print "Will copy report state from %s to %s using this data:" % (from_domain.name, to_domain.name)51 print " [FROM] id:%s domain:%s authority:%s report_type:%s report_state_code:%s" % (52 original_report_state.pk, original_report_state.domain.name, original_report_state.report_type.authority, original_report_state.report_type, original_report_state.code)53 print " [TO] id:%s domain:%s authority:%s report_type:%s report_state_code:%s" % (54 report_state.pk, report_state.domain.name, report_state.report_type.authority, report_state.report_type, report_state.code)55 if not dry_run:56 report_state.save()57 print " - Saved id: %s" % report_state.pk58 print ""59 def _copy_case_definitions(self, from_domain, to_domain, from_report_type, to_report_type, dry_run=True):60 original_case_definitions = CaseDefinition.objects.filter(domain=from_domain, report_type=from_report_type)61 for original_case_definition in original_case_definitions:62 should_save_case_definition = False63 try:64 case_definition = CaseDefinition.objects.get(domain=to_domain, report_type=to_report_type, code=original_case_definition.code)65 if case_definition.epl != original_case_definition.epl:66 case_definition.epl = original_case_definition.epl67 should_save_case_definition = True68 if case_definition.description != original_case_definition.description:69 case_definition.description = original_case_definition.description70 should_save_case_definition = True71 if case_definition.accumulate != original_case_definition.accumulate:72 case_definition.accumulate = original_case_definition.accumulate73 should_save_case_definition = True74 if case_definition.window != original_case_definition.window:75 case_definition.window = original_case_definition.window76 should_save_case_definition = True77 except CaseDefinition.DoesNotExist:78 should_save_case_definition = True79 case_definition = copy.deepcopy(original_case_definition)80 case_definition.pk = None81 case_definition.domain = to_domain82 case_definition.report_type = to_report_type83 if should_save_case_definition:84 print "Will copy case definition from %s to %s using this data:" % (from_domain.name, to_domain.name)85 print " [FROM] id:%s domain:%s report_type:%s case_def_code:%s" % (86 original_case_definition.pk, original_case_definition.domain, original_case_definition.report_type.name, original_case_definition.code)87 print " [TO] id:%s domain:%s report_type:%s case_def_code:%s" % (88 case_definition.pk, case_definition.domain.name, case_definition.report_type.name, original_case_definition.code)89 if not dry_run:90 case_definition.from_state = ReportState.objects.get(domain=to_domain, report_type=to_report_type, code=original_case_definition.from_state.code)91 case_definition.to_state = ReportState.objects.get(domain=to_domain, report_type=to_report_type, code=original_case_definition.to_state.code)92 case_definition.save()93 print " - Saved id: %s" % case_definition.pk94 print "--"95 def _copy_report_type_categories(self, from_domain, to_domain, dry_run=True):96 original_report_type_categories = ReportTypeCategory.objects.filter(domain=from_domain)97 for original_report_type_category in original_report_type_categories:98 try:99 report_type_category = ReportTypeCategory.objects.get(domain=to_domain, code=original_report_type_category.code)100 if report_type_category.name != original_report_type_category.name:101 report_type_category.name = original_report_type_category.name102 print "Will update report type category from %s to %s using this data:" % (103 from_domain.name, to_domain.name)104 print " [FROM] id:%s domain:%s code:%s" % (105 original_report_type_category.pk, original_report_type_category.domain,106 original_report_type_category.code)107 print " [TO] id:%s domain:%s code:%s" % (108 report_type_category.pk, report_type_category.domain.name, report_type_category.code)109 if not dry_run:110 report_type_category.save()111 print " - Saved id: %s" % report_type_category.pk112 except ReportTypeCategory.DoesNotExist:113 report_type_category = copy.deepcopy(original_report_type_category)114 report_type_category.pk = None115 report_type_category.domain = to_domain116 print "Will copy report type category from %s to %s using this data:" % (from_domain.name, to_domain.name)117 print " [FROM] id:%s domain:%s code:%s" % (118 original_report_type_category.pk, original_report_type_category.domain, original_report_type_category.code)119 print " [TO] id:%s domain:%s code:%s" % (120 report_type_category.pk, report_type_category.domain.name, report_type_category.code)121 if not dry_run:122 report_type_category.save()123 print " - Saved id: %s" % report_type_category.pk124 print "--"125 def _copy_report_types(self, from_domain, to_domain, from_authority=None, to_authority=None, report_type_ids=[],126 dry_run=True):127 if dry_run:128 print ">> DRY RUN <<\n"129 self._copy_report_type_categories(from_domain, to_domain, dry_run)130 original_report_types = ReportType.objects.filter(domain=from_domain)131 if from_authority:132 original_report_types = original_report_types.filter(authority=from_authority)133 if report_type_ids:134 original_report_types = original_report_types.filter(id__in=report_type_ids)135 for original_report_type in original_report_types:136 should_save_report_type = False137 try:138 report_type = ReportType.objects.get(domain=to_domain, code=original_report_type.code)139 if report_type.form_definition != original_report_type.form_definition:140 report_type.form_definition = original_report_type.form_definition141 should_save_report_type = True142 if report_type.name != original_report_type.name:143 report_type.name = original_report_type.name144 should_save_report_type = True145 if report_type.template != original_report_type.template:146 report_type.template = original_report_type.template147 should_save_report_type = True148 if report_type.django_template != original_report_type.django_template:149 report_type.django_template = original_report_type.django_template150 should_save_report_type = True151 if report_type.summary_template != original_report_type.summary_template:152 report_type.summary_template = original_report_type.summary_template153 should_save_report_type = True154 except ReportType.DoesNotExist:155 should_save_report_type = True156 report_type = copy.deepcopy(original_report_type)157 report_type.pk = None158 report_type.domain = to_domain159 report_type.default_state = None160 if not dry_run and original_report_type.category:161 report_type.category = ReportTypeCategory.objects.get(domain=to_domain, code=original_report_type.category.code)162 if to_authority:163 report_type.authority = to_authority164 else:165 report_type.authority = None166 if should_save_report_type:167 print "Will copy report type from %s to %s using this data:" % (from_domain.name, to_domain.name)168 print " [FROM] id:%s domain:%s authority:%s report_type:%s" % (original_report_type.pk, original_report_type.domain.name, original_report_type.authority, original_report_type.name)169 print " [TO] id:%s domain:%s authority:%s report_type:%s" % (report_type.pk, report_type.domain.name, report_type.authority, report_type.name)170 if not dry_run:171 report_type.save(set_default_state=True)172 print " - Saved id: %s" % report_type.pk173 print ""174 # copy report states175 self._copy_report_states(from_domain, to_domain, from_report_type=original_report_type,176 to_report_type=report_type, dry_run=dry_run)177 # copy case definitions178 self._copy_case_definitions(from_domain, to_domain, from_report_type=original_report_type,179 to_report_type=report_type, dry_run=dry_run)180 if should_save_report_type:181 default_state = ReportState.objects.get(report_type=report_type, code='report')182 if not report_type.default_state or default_state.id != report_type.default_state.id:183 print ">> Then will set default state to %s" % default_state.name184 if not dry_run:185 report_type.default_state = default_state186 report_type.save()187 print "---------------------"188 def handle(self, *args, **options):189 from_domain = Domain.objects.get(id=args[0])190 to_domain = Domain.objects.get(id=args[1])191 report_type_ids = list(args[2:])192 from_authority = options['from_authority']193 if from_authority:194 from_authority = Authority.objects.get(domain=from_domain, id=from_authority)195 to_authority = options['to_authority']196 if to_authority:197 to_authority = Authority.objects.get(domain=to_domain, id=to_authority)198 dry_run = not options['force']...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!