How to use report_type method in Slash

Best Python code snippet using slash

fetcher.py

Source:fetcher.py Github

copy

Full Screen

1# /usr/bin/env python32"""3网络查询接口:41. 个股查询5 - QA_fetch_get_individual_financial: 查询个股指定时间段指定财务报表指定报告类型数据62. 截面查询7 - QA_fetch_get_crosssection_financial: 查询指定报告期指定报表指定报告类型数据8本地查询接口:91. 截面查询10 - QA_fetch_crosssection_financial112. 高级查询12 - QA_fetch_financial_adv13"""14import datetime15import time16from typing import List, Tuple, Union17import pandas as pd18import pymongo19import tushare as ts20from QUANTAXIS.QAFactor.utils import QA_fmt_code, QA_fmt_code_list21from QUANTAXIS.QAFetch.QAQuery_Advance import QA_fetch_stock_list22from QUANTAXIS.QAFetch.QATushare import get_pro23from QUANTAXIS.QAUtil import (DATABASE, QASETTING, QA_util_date_int2str,24 QA_util_date_stamp, QA_util_get_pre_trade_date,25 QA_util_get_real_date, QA_util_log_info,26 QA_util_to_json_from_pandas)27REPORT_DATE_TAILS = ["0331", "0630", "0930", "1231"]28SHEET_TYPE = ["income", "balancesheet", "cashflow"]29REPORT_TYPE = ['1', '2', '3', '4', '5', '11']30def QA_fetch_get_individual_financial(31 code: str,32 start: Union[str, datetime.datetime, pd.Timestamp] = None,33 end: Union[str, datetime.datetime, pd.Timestamp] = None,34 report_date: Union[str, datetime.datetime] = None,35 sheet_type: str = "income",36 report_type: Union[int, str] = 1,37 fields: Union[str, Tuple, List] = None,38 wait_seconds: int = 61,39 max_trial: int = 3) -> pd.DataFrame:40 """个股财务报表网络查询接口,注意,这里的 start 与 end 是针对 report_date 进行范围查询41 Args:42 code (str): 股票代码43 start (Union[str, datetime.datetime, pd.Timestamp], optional): 查询起始时间,默认为 None44 end (Union[str, datetime.datetime, pd.Timestamp], optional): 查询结束时间,默认为 None45 report_date (Union[str, datetime.datetime], optional): 报告期. 默认为 None,如果使用了 report_date, 则 start 与 end 参数不再起作用46 sheet_type (str, optional): 报表类型,默认为 "income" 类型47 (利润表 "income"|48 资产负债表 "balancesheet"|49 现金流量表 "cashflow"|50 业绩预告 "forecast"|51 业绩快报 "express")52 report_type (Union[int, str], optional): 报告类型. 默认为 1。53 (1 合并报表 上市公司最新报表(默认)|54 2 单季合并 单一季度的合并报表 |55 3 调整单季合并表 调整后的单季合并报表(如果有) |56 4 调整合并报表 本年度公布上年同期的财务报表数据,报告期为上年度 |57 5 调整前合并报表 数据发生变更,将原数据进行保留,即调整前的原数据 |58 6 母公司报表 该公司母公司的财务报表数据 |59 7 母公司单季表 母公司的单季度表 |60 8 母公司调整单季表 母公司调整后的单季表 |61 9 母公司调整表 该公司母公司的本年度公布上年同期的财务报表数据 |62 10 母公司调整前报表 母公司调整之前的原始财务报表数据 |63 11 调整前合并报表 调整之前合并报表原数据 |64 12 母公司调整前报表 母公司报表发生变更前保留的原数据)65 fields (Union[str, Tuple, List], optional): 指定数据范围,如果设置为 None,则返回所有数据. 默认为 None.66 wait_seconds (int, optional): 等待重试时间. 默认为 61 秒.67 max_trial (int, optional): 最大重试次数. 默认为 3.68 Returns:69 pd.DataFrame: 返回指定个股时间范围内指定类型的报表数据70 """71 def _get_individual_financial(code, report_date, report_type, sheet_type, fields, wait_seconds, trial_count):72 nonlocal pro, max_trial73 if trial_count >= max_trial:74 raise ValueError("[ERROR]\tEXCEED MAX TRIAL!")75 try:76 if not fields:77 df = eval(78 f"pro.{sheet_type}(ts_code='{code}', period='{report_date}', report_type={report_type})")79 else:80 df = eval(81 f"pro.{sheet_type}(ts_code='{code}', period='{report_date}', report_type={report_type}, fields={fields})")82 return df.rename(columns={"ts_code": "code", "end_date": "report_date"})83 except Exception as e:84 print(e)85 time.sleep(wait_seconds)86 _get_individual_financial(87 code, report_date, report_type, sheet_type, fields, wait_seconds, trial_count+1)88 pro = get_pro()89 report_type = int(report_type)90 if (not start) and (not end) and (not report_date):91 raise ValueError(92 "[QRY_DATES ERROR]\tparam 'start', 'end' and 'report_date' should not be none at the same time!")93 if isinstance(fields, str):94 fields = sorted(list(set([fields, "ts_code", "end_date",95 "ann_date", "f_ann_date", "report_type", "update_flag"])))96 if report_date:97 report_date = pd.Timestamp(report_date)98 year = report_date.year99 report_date_lists = [100 pd.Timestamp(str(year) + report_date_tail) for report_date_tail in REPORT_DATE_TAILS]101 if report_date not in report_date_lists:102 raise ValueError("[REPORT_DATE ERROR]")103 if sheet_type not in ["income", "balancesheet", "cashflow", "forecast", "express"]:104 raise ValueError("[SHEET_TYPE ERROR]")105 if report_type not in range(1, 13):106 raise ValueError("[REPORT_TYPE ERROR]")107 report_dates = [report_date]108 else:109 start = pd.Timestamp(start)110 start_year = start.year111 end = pd.Timestamp(end)112 end_year = end.year113 origin_year_ranges = pd.date_range(114 str(start_year), str(end_year+1), freq='Y').map(str).str.slice(0, 4).tolist()115 origin_report_ranges = pd.Series([116 pd.Timestamp(year + report_date_tail) for year in origin_year_ranges for report_date_tail in REPORT_DATE_TAILS])117 report_dates = origin_report_ranges.loc[(118 origin_report_ranges >= start) & (origin_report_ranges <= end)]119 df = pd.DataFrame()120 for report_date in report_dates:121 df = df.append(_get_individual_financial(122 code=QA_fmt_code(code, "ts"),123 report_date=report_date.strftime("%Y%m%d"),124 report_type=report_type,125 sheet_type=sheet_type,126 fields=fields,127 wait_seconds=wait_seconds,128 trial_count=0))129 df.code = QA_fmt_code_list(df.code)130 return df.reset_index(drop=True)131def QA_fetch_get_crosssection_financial(132 report_date: Union[str, datetime.datetime, pd.Timestamp],133 report_type: Union[int, str] = 1,134 sheet_type: str = "income",135 fields: Union[str, Tuple, List] = None,136 wait_seconds: int = 61,137 max_trial: int = 3) -> pd.DataFrame:138 """截面财务报表网络查询接口139 Args:140 report_date (Union[str, datetime.datetime, pd.Timestamp]): 报告期141 report_type (Union[int, str], optional): 报告类型,默认值为 1.142 (1 合并报表 上市公司最新报表(默认)|143 2 单季合并 单一季度的合并报表 |144 3 调整单季合并表 调整后的单季合并报表(如果有) |145 4 调整合并报表 本年度公布上年同期的财务报表数据,报告期为上年度 |146 5 调整前合并报表 数据发生变更,将原数据进行保留,即调整前的原数据 |147 6 母公司报表 该公司母公司的财务报表数据 |148 7 母公司单季表 母公司的单季度表 |149 8 母公司调整单季表 母公司调整后的单季表 |150 9 母公司调整表 该公司母公司的本年度公布上年同期的财务报表数据 |151 10 母公司调整前报表 母公司调整之前的原始财务报表数据 |152 11 调整前合并报表 调整之前合并报表原数据 |153 12 母公司调整前报表 母公司报表发生变更前保留的原数据)154 sheet_type (str, optional): 报表类型,默认为 "income".155 (利润表 "income"|156 资产负债表 "balancesheet"|157 现金流量表 "cashflow"|158 业绩预告 "forecast"|159 业绩快报 "express")160 fields (Union[str, List], optional): 数据范围,默认为 None,返回所有数据.161 wait_seconds (int, optional): 查询超时时间, 默认为 61.162 max_trial (int, optional): 查询最大尝试次数, 默认为 3.163 Returns:164 pd.DataFrame: 指定报告期的指定财务报表数据165 """166 def _get_crosssection_financial(report_date, report_type, sheet_type, fields, wait_seconds, trial_count):167 nonlocal pro, max_trial168 if trial_count >= max_trial:169 raise ValueError("[ERROR]\tEXCEED MAX TRIAL!")170 try:171 if not fields:172 print(173 f"pro.{sheet_type}_vip(period='{report_date}', report_type={report_type})")174 df = eval(175 f"pro.{sheet_type}_vip(period='{report_date}', report_type={report_type})")176 else:177 df = eval(178 f"pro.{sheet_type}_vip(period='{report_date}', report_type={report_type}, fields={fields})")179 if df.empty:180 return df181 df.ts_code = QA_fmt_code_list(df.ts_code)182 return df.rename(columns={"ts_code": "code", "end_date": "report_date"}).sort_values(by=['ann_date', 'f_ann_date'])183 except Exception as e:184 print(e)185 time.sleep(wait_seconds)186 _get_crosssection_financial(187 report_date, report_type, sheet_type, fields, wait_seconds, trial_count + 1)188 # Tushare 账号配置189 pro = get_pro()190 # 设置标准报告期格式191 report_date = pd.Timestamp(report_date)192 report_type = int(report_type)193 year = report_date.year194 std_report_dates = [195 str(year) + report_date_tail for report_date_tail in REPORT_DATE_TAILS]196 # Tushare 接口支持的日期格式197 if report_date.strftime("%Y%m%d") not in std_report_dates:198 raise ValueError("[REPORT_DATE ERROR]")199 # fields 格式化处理200 if isinstance(fields, str):201 fields = sorted(list(set([fields, "ts_code", "end_date",202 "ann_date", "f_ann_date", "report_type", "update_flag"])))203 # 目前支持利润表,资产负债表和现金流量表204 if sheet_type not in SHEET_TYPE:205 raise ValueError("[SHEET_TYPE ERROR]")206 if report_type not in range(1, 13):207 raise ValueError("[REPORT_TYTPE ERROR]")208 return _get_crosssection_financial(209 report_date=report_date.strftime("%Y%m%d"),210 report_type=report_type,211 sheet_type=sheet_type,212 fields=fields,213 wait_seconds=wait_seconds,214 trial_count=0)215# FIXME: Add Fetch Get Method of Daily Basic216def QA_fetch_get_daily_basic(217 code: Union[str, List, Tuple] = None,218 trade_date: Union[str, pd.Timestamp, datetime.datetime] = None,219 fields: Union[str, List, Tuple] = None,220 wait_seconds: int = 61,221 max_trial: int = 3222) -> pd.DataFrame:223 """224 从网络获取市场指定交易日重要基本面指标,用于选股分析和报表展示225 Args:226 code(Union[str, List, Tuple], optional): 指定股票代码,默认为 None,即对应交易日的全市场股票227 trade_date(Union[str, pd.Timestamp, datetime.datetime], optional): 指定交易日期, 默认为 None, 即距离当前228 日期最近的交易日229 fields(Union[str, List, Tuple], optional): 默认为 None,如果指定为某一单个 str,默认返回 DataFrame 包括230 交易日等附加信息231 wait_seconds (int, optional): 查询超时时间, 默认为 61.232 max_trial (int, optional): 查询最大尝试次数, 默认为 3.233 Returns:234 pd.DataFrame: 指定交易日指定范围指定标的的每日基本面指标信息235 """236 def _fetch_get_daily_basic(trade_date, fields, trial_count):237 nonlocal pro, max_trial238 try:239 if trial_count >= max_trial:240 raise ValueError("[ERROR]\tEXCEED MAX TRIAL!")241 if not trade_date:242 trade_date = QA_util_get_pre_trade_date(243 datetime.date.today(), 1).replace("-", "")244 else:245 trade_date = pd.Timestamp(trade_date).strftime("%Y%m%d")246 if not fields:247 qry = f"pro.daily_basic(trade_date={trade_date})"248 else:249 if isinstance(fields, str):250 fields = list(set([fields] + ["ts_code", "trade_date"]))251 fields = ",".join(fields)252 qry = "pro.daily_basic(trade_date={trade_date}, fields={fields})"253 df = eval(qry)254 if df is None:255 raise ValueError("[ERROR]")256 return df257 except:258 time.sleep(61)259 _fetch_get_daily_basic(260 trade_date, fields, trial_count+1261 )262 pro = get_pro()263 df = _fetch_get_daily_basic(264 trade_date=trade_date, fields=fields, trial_count=0)265 if df.empty:266 return df267 else:268 df = df.rename(columns={"ts_code": "code"})269 df.code = QA_fmt_code_list(df.code)270 df = df.set_index("code")271 if not code:272 return df273 if isinstance(code, str):274 code = (code,)275 # exclude code which not in rtn dataframe276 filter_idx = df.index.intersection(code)277 return df.loc[filter_idx]278def QA_fetch_crosssection_financial(279 report_date: Union[str, datetime.datetime, pd.Timestamp],280 report_type: Union[int, str] = 1,281 sheet_type: str = "income",282 fields: Union[str, Tuple, List] = None) -> pd.DataFrame:283 """本地查询截面财务数据接口284 Args:285 report_date (Union[str, datetime.datetime, pd.Timestamp]): 报告期286 report_type (Union[int, str], optional): 报告类型,默认为 1.287 (1 合并报表 上市公司最新报表(默认)|288 2 单季合并 单一季度的合并报表 |289 3 调整单季合并表 调整后的单季合并报表(如果有) |290 4 调整合并报表 本年度公布上年同期的财务报表数据,报告期为上年度 |291 5 调整前合并报表 数据发生变更,将原数据进行保留,即调整前的原数据 |292 11 调整前合并报表 调整之前合并报表原数据)293 sheet_type (str, optional): 报表类型,默认为 "income".294 fields (Union[str, Tuple, List], optional): 子段,默认为 None,返回所有字段.295 Returns:296 pd.DataFrame: 指定报告期指定报表数据297 """298 if isinstance(fields, str):299 fields = sorted(list(set([fields, "code", "report_date",300 "ann_date", "f_ann_date", "report_type", "update_flag"])))301 coll = eval(f"DATABASE.{sheet_type}")302 report_date = pd.Timestamp(report_date).strftime("%Y%m%d")303 cursor = coll.find(304 {305 "report_date": report_date,306 "report_type": str(report_type)307 }308 )309 res = pd.DataFrame([item for item in cursor])310 if res.empty:311 return pd.DataFrame()312 res.report_date = pd.to_datetime(res.report_date, utc=False)313 if not fields:314 return res.drop(columns="_id")315 return res.drop(columns="_id")[fields]316def QA_fetch_financial_adv(317 code: Union[str, Tuple, List] = None,318 start: Union[str, datetime.datetime, pd.Timestamp] = None,319 end: Union[str, datetime.datetime, pd.Timestamp] = None,320 report_date: Union[str, datetime.datetime, pd.Timestamp] = None,321 report_type: Union[int, str] = None,322 sheet_type: str = "income",323 fields: Union[str, Tuple, List] = None) -> pd.DataFrame:324 """本地获取指定股票或者指定股票列表,指定时间范围或者报告期,指定报告类型的指定财务报表数据325 Args:326 code (Union[str, Tuple, List], optional): 指定股票代码或列表,默认为 None, 全市场股票327 start (Union[str, datetime.datetime, pd.Timestamp], optional): 起始时间328 end (Union[str, datetime.datetime, pd.Timestamp], optional): 结束时间329 report_date (Union[str, datetime.datetime, pd.Timestamp], optional): 报告期330 report_type (Union[int, str], optional): 报告类型,默认为 1.331 (1 合并报表 上市公司最新报表(默认)|332 2 单季合并 单一季度的合并报表 |333 3 调整单季合并表 调整后的单季合并报表(如果有) |334 4 调整合并报表 本年度公布上年同期的财务报表数据,报告期为上年度 |335 5 调整前合并报表 数据发生变更,将原数据进行保留,即调整前的原数据 |336 11 调整前合并报表 调整之前合并报表原数据)337 sheet_type (str, optional): 报表类型,默认为 "income".338 fields (List, optional): 字段,默认为 None,返回所有字段.339 Returns:340 pd.DataFrame: 指定条件的本地报表数据341 """342 if (not start) and (not end) and (not report_date):343 raise ValueError(344 "[DATE ERROR]\t 'start', 'end' 与 'report_date' 不能同时为 None")345 if isinstance(code, str):346 code = (code,)347 if not report_type:348 report_type = ("1", "2", "4", "5", "11")349 if isinstance(report_type, int) or isinstance(report_type, str):350 report_type = (str(report_type), )351 else:352 report_type = list(map(str, report_type))353 coll = eval(f"DATABASE.{sheet_type}")354 qry = {}355 if not report_date:356 if not end:357 end = datetime.date.today()358 start = pd.Timestamp(start)359 end = pd.Timestamp(end)360 start_date_stamp = QA_util_date_stamp(start)361 end_date_stamp = QA_util_date_stamp(end)362 if not code:363 qry = {364 "f_ann_date_stamp": {365 "$gte": start_date_stamp,366 "$lte": end_date_stamp367 },368 "report_type": {369 "$in": report_type370 }371 }372 else:373 qry = {374 "code": {375 "$in": code376 },377 "f_ann_date_stamp": {378 "$gte": start_date_stamp,379 "$lte": end_date_stamp380 },381 "report_type": {382 "$in": report_type383 }384 }385 else:386 report_date_stamp = QA_util_date_stamp(report_date)387 if not code:388 qry = {389 "report_date_stamp": report_date_stamp,390 "report_type": {391 "$in": report_type392 }393 }394 else:395 qry = {396 "code": {397 "$in": code398 },399 "report_date_stamp": report_date_stamp,400 "report_type": {401 "$in": report_type402 }403 }404 if isinstance(fields, str):405 fields = list(406 set([fields, "code", "ann_date", "report_date", "f_ann_date"]))407 elif fields:408 fields = list(409 set(list(fields) + ["code", "ann_date", "report_date", "f_ann_date"]))410 cursor = coll.find(qry, batch_size=10000).sort([411 ("report_date_stamp", pymongo.ASCENDING),412 ("f_ann_date_stamp", pymongo.ASCENDING)])413 if fields:414 df = pd.DataFrame(cursor).drop(columns="_id")[fields].set_index("code")415 df.report_date = pd.to_datetime(df.report_date, utc=False)416 df.ann_date = pd.to_datetime(df.ann_date, utc=False)417 df.f_ann_date = pd.to_datetime(df.f_ann_date, utc=False)418 else:419 df = pd.DataFrame(cursor).drop(columns="_id").set_index("code")420 df.report_date = pd.to_datetime(df.report_date, utc=False)421 df.ann_date = pd.to_datetime(df.ann_date, utc=False)422 df.f_ann_date = pd.to_datetime(df.f_ann_date, utc=False)423 return df424def QA_fetch_last_financial(425 code: Union[str, List, Tuple] = None,426 cursor_date: Union[str, datetime.datetime, pd.Timestamp] = None,427 report_label: Union[int, str] = None,428 report_type: Union[int, str, List, Tuple] = None,429 sheet_type: str = "income",430 fields: Union[str, List, Tuple] = None) -> pd.DataFrame:431 """获取距离指定日期 (cursor_date) 最近的原始数据 (不包含在 cursor_date 发布的财务数据),432 当同时输入 cursor_date 与 report_date 时,以 report_date 作为查询标准433 注意:434 这里的 report_type 仅支持 (1,4, 5) 三种类型,以避免混淆合并数据和单季数据等435 说明:436 柳工 (000528) 在 2018 年 8 月 30 日发布半年报,之后在 2018 年 9 月 29 日发布修正报告,437 - 如果输入的 cursor_date 为 2018-08-31, 那么获取到的就是原始半年报,对应 report_type == 5438 - 如果输入的 cursor_date 为 2018-09-30,那么获取到的就是最新合并报表,对应 report_type == 1439 - 如果对应的 cursor_date 为 2019-08-31,需要获取 2018 年半年报,那么就返回柳工在 2019 年 8 月 29 日发布的上年同期基准,对应 report_type == 4440 Args:441 code (Union[str, List, Tuple], optional): 股票代码或股票列表,默认为 None, 查询所有股票442 cursor_date (Union[str, datetime.datetime, pd.Timestamp]): 查询截面日期 (一般指调仓日), 默认为 None443 report_label (Union[str, int], optional): 指定报表类型,这里的类型分类为一季报,半年报,三季报,年报, 默认为 None,即选择距离 cursor_date 最近的报表类型444 report_type (Union[str, List, Tuple], optional): [description]. 报表类型,默认为 None. 即距离 cursor_date 最近的财报,不指定类型,避免引入未来数据445 (1 合并报表 上市公司最新报表(默认)|446 2 单季合并报表447 4 调整合并报表 本年度公布上年同期的财务报表数据,报告期为上年度 |448 5 调整前合并报表 数据发生变更,将原数据进行保留,即调整前的原数据)449 sheet_type (str, optional): 报表类型,默认为 "income".450 fields (Union[str, List, Tuple], optional): 字段, 默认为 None, 返回所有字段451 Returns:452 pd.DataFrame: 复合条件的财务数据453 """454 def _trans_financial_type(x):455 if x.empty:456 return x457 if sheet_type == "balancesheet":458 # 资产负债表属于时点信息,直接返回459 return x460 else:461 if x.iloc[0].report_date[4:] in ['0331', '1231']:462 # 一季报而言,单季合并与普通合并没有区别,直接返回463 # 年报而言,不存在单季概念464 return x.iloc[0]465 if x.iloc[0].report_type in ['1', '4', '5']:466 return x.iloc[0]467 if x.iloc[0].report_type == '2':468 # 尝试查找同一报告期报告类型为 '1' 或 '4' 的报表数据469 # try:470 # if (x.shape[0] > 1) & (x.iloc[1].report_date == x.iloc[0].report_date) & (x.iloc[1].report_type in ['1', '4']):471 # return x.iloc[1]472 # except:473 # return pd.Series()474 # 尝试直接利用单季数据进行拼接475 cursor_x = x.loc[x.report_date.map(str).str.slice(476 0, 4) == x.iloc[0].report_date[:4]]477 cursor_x = cursor_x.drop_duplicates(subset = ['report_date'], keep='first')478 cursor_x = cursor_x.loc[cursor_x.report_date <=479 x.iloc[0].report_date]480 cursor_x = cursor_x.fillna(0)481 non_numeric_columns = sorted(["f_ann_date", "f_ann_date_stamp", "ann_date", "ann_date_stamp", "report_date", "report_date_stamp",482 "update_flag", "report_type", "code", "report_label"])483 columns = sorted(list(set(cursor_x.columns) - set(non_numeric_columns)))484 rtn_se = cursor_x[columns].sum(axis=0)485 rtn_se = rtn_se.append(cursor_x[non_numeric_columns].iloc[0])486 return rtn_se487 if isinstance(code, str):488 code = (code,)489 if not report_type:490 report_type = ["1", "2", "4", "5"]491 else:492 if isinstance(report_type, int):493 report_type = str(report_type)494 if isinstance(report_type, str):495 if report_type not in ["1", "4", "5"]:496 raise ValueError("[REPORT_TYPE ERROR]")497 report_type = (report_type,)498 else:499 report_type = list(set(report_type) & set('1', '2', '4', '5'))500 if sheet_type not in SHEET_TYPE:501 raise ValueError(f"[SHEET_TYPE ERROR]")502 if report_label:503 report_label = str(report_label)504 if isinstance(fields, str):505 fields = list(506 set([fields, "code", "ann_date", "report_date", "f_ann_date", "report_type"]))507 elif fields:508 fields = list(509 set(fields + ["code", "ann_date", "report_date", "f_ann_date", "report_type"]))510 coll = eval(f"DATABASE.{sheet_type}")511 if (not code) and (not report_label):512 # 为了加快检索速度,从当前日期往前至多回溯一季度,实际调仓时,仅考虑当前能拿到的最新数据,调仓周期一般以月, 季为单位,513 # 最长一般为年报,而修正报表如果超过 1 个季度,基本上怼调仓没有影响,这里以 1 年作为回溯基准514 qry = {515 "f_ann_date_stamp": {516 "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),517 "$lt": QA_util_date_stamp(cursor_date)518 },519 "report_type": {520 "$in": report_type521 }}522 cursor = coll.find(qry, batch_size=10000).sort([523 ("report_date_stamp", pymongo.DESCENDING),524 ("f_ann_date_stamp", pymongo.DESCENDING)])525 try:526 if not fields:527 df = pd.DataFrame(cursor).drop(columns="_id")528 else:529 df = pd.DataFrame(cursor).drop(columns="_id")[fields]530 except:531 raise ValueError("[QRY ERROR]")532 if sheet_type == "balancesheet":533 return df.groupby("code").apply(lambda x: x.iloc[0])534 return df.groupby("code").apply(_trans_financial_type).unstack()535 if not report_label:536 qry = {537 "code": {538 "$in": code539 },540 "f_ann_date_stamp": {541 "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),542 "$lt": QA_util_date_stamp(cursor_date)543 },544 "report_type": {"$in": report_type}}545 cursor = coll.find(qry, batch_size=10000).sort([546 ("report_date_stamp", pymongo.DESCENDING),547 ("f_ann_date_stamp", pymongo.DESCENDING)])548 try:549 if not fields:550 df = pd.DataFrame(cursor).drop(columns="_id")551 else:552 df = pd.DataFrame(cursor).drop(columns="_id")[fields]553 except:554 raise ValueError("[QRY ERROR]")555 if sheet_type == "balancesheet":556 return df.groupby("code").apply(lambda x: x.iloc[0])557 return df.groupby("code").apply(_trans_financial_type).unstack()558 if not code:559 qry = {560 "f_ann_date_stamp": {561 "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),562 "$lt": QA_util_date_stamp(cursor_date)563 },564 "report_type": {565 "$in": report_type566 },567 "report_label": report_label568 }569 cursor = coll.find(qry, batch_size=10000).sort([570 ("report_date_stamp", pymongo.DESCENDING),571 ("f_ann_date_stamp", pymongo.DESCENDING)])572 try:573 if not fields:574 df = pd.DataFrame(cursor).drop(columns="_id")575 else:576 df = pd.DataFrame(cursor).drop(columns="_id")[fields]577 except:578 raise ValueError("[QRY ERROR]")579 if sheet_type == "balancesheet":580 return df.groupby("code").apply(lambda x: x.iloc[0])581 return df.groupby("code").apply(_trans_financial_type).unstack()582 else:583 qry = {584 "code": {585 "$in": code586 },587 "f_ann_date_stamp": {588 "$gt": QA_util_date_stamp((pd.Timestamp(cursor_date) - pd.Timedelta(days=400)).strftime("%Y-%m-%d")),589 "$lt": QA_util_date_stamp(cursor_date)590 },591 "report_type": {592 "$in": report_type593 },594 "report_label": report_label595 }596 cursor = coll.find(qry, batch_size=10000).sort([597 ("report_date_stamp", pymongo.DESCENDING),598 ("f_ann_date_stamp", pymongo.DESCENDING)])599 try:600 if not fields:601 df = pd.DataFrame(cursor).drop(columns="_id")602 else:603 df = pd.DataFrame(cursor).drop(columns="_id")[fields]604 except:605 raise ValueError("[QRY ERROR]")606 # df.report_date = pd.to_datetime(df.report_date, utc=False)607 # df.ann_date = pd.to_datetime(df.ann_date, utc=False)608 # df.f_ann_date = pd.to_datetime(df.f_ann_date, utc=False)609 if sheet_type == "balancesheet":610 return df.groupby("code").apply(lambda x: x.iloc[0])611 return df.groupby("code").apply(_trans_financial_type).unstack()612def QA_fetch_stock_basic(613 code: Union[str, List, Tuple] = None,614 status: Union[str, List, Tuple] = 'L') -> pd.DataFrame:615 """获取股票基本信息616 Args:617 code (Union[str, List, Tuple], optional): 股票代码或列表,默认为 None,获取全部股票618 status (Union[str, List, Tuple], optional): 股票状态, 默认为 'L', 即仍在上市的股票,如果为 None, 则返回所有状态股票619 Returns:620 pd.DataFrame: 股票基本信息621 """622 coll = DATABASE.stock_basic623 if isinstance(code, str):624 code = (code,)625 if isinstance(status, str):626 status = (status,)627 qry = {}628 if not status:629 if not code:630 qry = {}631 else:632 qry = {633 "code": {634 "$in": code635 }636 }637 else:638 if not code:639 qry = {640 "status": {641 "$in": status642 }643 }644 else:645 qry = {646 "code": {647 "$in": code648 },649 "status": {650 "$in": status651 }652 }653 cursor = coll.find(qry)654 res = pd.DataFrame(cursor)655 if res.empty:656 return res657 else:658 res.list_date = pd.to_datetime(res.list_date, utc=False)659 return res.drop(columns="_id").set_index("code")660def QA_fetch_stock_name(661 code: Union[str, List, Tuple] = None,662 cursor_date: Union[str, datetime.datetime, pd.Timestamp] = None663) -> pd.DataFrame:664 """获取股票历史曾用名665 Args:666 code (Union[str, List, Tuple], optional): 股票代码或列表,默认为 None,查询所有股票.667 cursor (Union[str, datetime.datetime, pd.Timestamp], optional): 截止时间,股票名称距离 cursor_date 最近的名字 668 Returns:669 pd.DataFrame: 股票历史曾用名670 """671 coll = DATABASE.namechange672 if isinstance(code, str):673 code = [code]674 qry = {}675 if not code:676 if not cursor_date:677 qry = {}678 else:679 qry = {680 "start_date_stamp": {681 "$lte": QA_util_date_stamp(cursor_date)682 },683 "end_date_stamp": {684 "$gte": QA_util_date_stamp(cursor_date)685 }686 }687 else:688 if not cursor_date:689 qry = {690 "code": {691 "$in": code692 }693 }694 else:695 qry = {696 "code": {697 "$in": code698 },699 "start_date_stamp": {700 "$lte": QA_util_date_stamp(cursor_date)701 },702 "end_date_stamp": {703 "$gte": QA_util_date_stamp(cursor_date)704 }705 }706 cursor = coll.find(qry)707 res = pd.DataFrame(cursor)708 if res.empty:709 return res710 else:711 res.start_date = pd.to_datetime(res.start_date, utc=False)712 res.end_date = pd.to_datetime(res.end_date, utc=False)713 return res.drop(columns="_id").set_index("code").sort_values(by="start_date_stamp").drop_duplicates(keep="last").sort_index()714def QA_fetch_industry_adv(715 code: Union[str, List, Tuple] = None,716 cursor_date: Union[str, datetime.datetime] = None,717 start: Union[str, datetime.datetime] = None,718 end: Union[str, datetime.datetime] = None,719 levels: Union[str, List, Tuple] = None,720 src: str = "sw"721) -> pd.DataFrame:722 """本地获取指定股票或股票列表的行业723 Args:724 code (Union[str, List, Tuple], optional): 股票代码或列表,默认为 None, 查询所有股票代码.725 cursor_date (Union[str, datetime.datetime], optional): 一般指调仓日,此时不需要再设置 start 与 end726 start(Union[str, datetime.datetime], optional): 起始时间,默认为 None.727 end(Union[str, datetime.datetime], optional): 截止时间, 默认为 None.728 levels (Union[str, List, Tuple], optional): [description]. 对应行业分级级别,默认为 None,查询所有行业分级数据729 src (str, optional): 分级来源,默认为 "sw"(目前仅支持申万行业分类).730 Returns:731 pd.DataFrame: 行业信息732 """733 coll = DATABASE.industry734 if not code:735 code = QA_fetch_stock_list().index.tolist()736 if isinstance(code, str):737 code = [code]738 if isinstance(levels, str):739 levels = [levels, ]740 if not levels:741 levels = ["l1", "l2", "l3"]742 levels = list(map(lambda x: x.lower(), levels))743 df_tmp = pd.DataFrame()744 if not cursor_date:745 if not start:746 qry = {747 "code": {748 "$in": code749 },750 "level": {751 "$in": levels752 },753 "src": src.lower()754 }755 else:756 qry = {757 "code": {758 "$in": code759 },760 "level": {761 "$in": levels762 },763 "src": src.lower(),764 "in_date_stamp": {765 "$lte": QA_util_date_stamp(pd.Timestamp(start).strftime("%Y-%m-%d"))766 }767 }768 if coll.count_documents(filter=qry) < 1:769 print("找不到对应行业数据")770 return pd.DataFrame()771 cursor = coll.find(qry)772 df_tmp = pd.DataFrame(cursor).drop(columns="_id")773 if end:774 df_tmp = df_tmp.loc[df_tmp.out_date_stamp > QA_util_date_stamp(775 pd.Timestamp(end).strftime("%Y-%m-%d"))]776 else:777 qry = {778 "code": {779 "$in": code780 },781 "level": {782 "$in": levels783 },784 "src": src.lower(),785 "in_date_stamp": {786 "$lte": QA_util_date_stamp(pd.Timestamp(cursor_date).strftime("%Y-%m-%d"))787 }788 }789 if coll.count_documents(filter=qry) < 1:790 print("找不到对应行业数据")791 return pd.DataFrame()792 cursor = coll.find(qry)793 df_tmp = pd.DataFrame(cursor).drop(columns="_id")794 df_tmp.loc[df_tmp.out_date_stamp > QA_util_date_stamp(795 pd.Timestamp(cursor_date).strftime("%Y-%m-%d"))]796 df_tmp.in_date = pd.to_datetime(df_tmp.in_date, utc=False)797 df_tmp.out_date = pd.to_datetime(df_tmp.out_date, utc=False)798 return df_tmp.drop(columns=["in_date_stamp", "out_date_stamp"])799def QA_fetch_daily_basic(800 code: Union[str, List, Tuple] = None,801 start: Union[str, pd.Timestamp, datetime.datetime] = None,802 end: Union[str, pd.Timestamp, datetime.datetime] = None,803 cursor_date: Union[str, pd.Timestamp, datetime.datetime] = None,804 fields: Union[str, Tuple, List]= None805) -> pd.DataFrame:806 """获取全部股票每日重要的基本面指标,可用于选股分析、报表展示等807 Args:808 code (Union[str, List, Tuple], optional): 指定股票代码或列表, 默认为 None,获取全市场809 start (Union[str, pd.Timestamp, datetime.datetime], optional): 起始日期,默认为 None810 end (Union[str, pd.Timestamp, datetime.datetime], optional): 结束日期,默认为 None811 cursor_date (Union[str, pd.Timestamp, datetime.datetime], optional): 指定日期,与 start 和 end 冲突,只能选择 cursor_date812 或者 start, end813 fields (Union[str, Tuple, List], optional): 指定 fields814 Returns:815 pd.DataFrame: 以日期,股票名为 Multiindex 的基本信息816 """817 if isinstance(code, str):818 code = (code,)819 if not code:820 if (not start) and (not cursor_date):821 raise ValueError(822 "[ERROR]\tstart and end and cursor_date cannot all be none!")823 if not cursor_date:824 if not end:825 end_stamp = QA_util_date_stamp(datetime.date.today())826 else:827 end_stamp = QA_util_date_stamp(end)828 start_stamp = QA_util_date_stamp(start)829 qry = {830 "trade_date_stamp": {831 "$gte": start_stamp,832 "$lte": end_stamp833 }834 }835 else:836 real_trade_date = QA_util_get_real_date(cursor_date)837 trade_date_stamp = QA_util_date_stamp(real_trade_date)838 qry = {839 "trade_date_stamp": trade_date_stamp840 }841 else:842 if (not start) and (not cursor_date):843 raise ValueError(844 "[ERROR]\tstart and end and cursor_date cannot all be none!")845 if not cursor_date:846 if not end:847 end_stamp = QA_util_date_stamp(datetime.date.today())848 else:849 end_stamp = QA_util_date_stamp(end)850 start_stamp = QA_util_date_stamp(start)851 qry = {852 "code": {853 "$in": code854 },855 "trade_date_stamp": {856 "$gte": start_stamp,857 "$lte": end_stamp858 }859 }860 else:861 real_trade_date = QA_util_get_real_date(cursor_date)862 trade_date_stamp = QA_util_date_stamp(real_trade_date)863 qry = {864 "code": {865 "$in": code866 },867 "trade_date_stamp": trade_date_stamp868 }869 coll = DATABASE.daily_basic870 cursor = coll.find(qry)871 df = pd.DataFrame(cursor)872 if df.empty:873 return df874 df = df.rename(columns={"trade_date": "date"}).drop(875 columns="_id")876 df.date = pd.to_datetime(df.date, utc=False)877 df = df.set_index(["date", "code"]).sort_index()878 if not fields:879 return df880 return df[fields]881if __name__ == "__main__":882 # print(QA_fetch_get_individual_financial(883 # "000001", "2020-01-01", "2020-12-31"))884 # print(QA_fetch_get_individual_financial(885 # "000001", report_date="2020-03-31", fields="basic_eps"))886 # print(QA_fetch_get_crosssection_financial('2020-03-31'))887 # print(QA_fetch_crosssection_financial("2020-03-31", fields="basic_eps"))888 # df = QA_fetch_financial_adv(start="2018-06-30", end="2018-09-30")889 # print(df.loc['000528', ["report_date", "f_ann_date",890 # "ann_date", "basic_eps", "report_type", "update_flag", "report_label"]])891 # print(df)892 # print(QA_fetch_stock_basic(status="D"))893 # 最近财务数据获取测试894 # print(QA_fetch_last_financial(895 # code="000596", cursor_date="2020-10-08"))896 # print(QA_fetch_last_financial(897 # code=QA_fetch_stock_list().index.tolist(), cursor_date="2020-10-08"))898 # print(QA_fetch_last_financial(899 # code = '000001', cursor_date = '2020-10-08'900 # ))901 code = QA_fetch_stock_list().index.tolist()902 cursor_date = '2020-10-08'903 df_origin = QA_fetch_last_financial(code = code, cursor_date = cursor_date, sheet_type = "balancesheet")904 # print(QA_fetch_last_financial(905 # cursor_date="2018-08-31"))906 # print(QA_fetch_last_financial(907 # cursor_date="2018-08-31", code=["000528"], fields=["report_date", "ann_date", "f_ann_date", "update_flag"]))908 # print(QA_fetch_financial_adv(909 # cursor_date="2018-08-31"))910 # 股票基本信息获取测试911 # print(QA_fetch_stock_basic("000001"))912 # print(QA_fetch_stock_basic(status=["P", "D"]))913 # 行业获取测试914 # print(QA_fetch_industry_adv(start="1998-01-01", end="2020-12-02").head())915 # print(QA_fetch_industry_adv(["000001", "600000"],916 # start="1998-01-01", end="2020-12-02"))917 # print(QA_fetch_industry_adv(918 # ["000001", "600000"], cursor_date="2020-12-02"))919 # print(QA_fetch_stock_name(920 # code=['000001', '000002'], cursor_date="20081009"))...

Full Screen

Full Screen

upload.py

Source:upload.py Github

copy

Full Screen

1import os2import luigi3import yaml4from morgoth.balrog_handlers import ProcessFitResults5from morgoth.plots import (6 Create3DLocationPlot,7 CreateBalrogSwiftPlot,8 CreateCornerPlot,9 CreateLightcurve,10 CreateLocationPlot,11 CreateMollLocationPlot,12 CreateSatellitePlot,13 CreateSpectrumPlot14)15from morgoth.data_files import (16 CreateHealpixSysErr,17 CreateHealpix18)19from morgoth.configuration import morgoth_config20from morgoth.utils.file_utils import if_dir_containing_file_not_existing_then_make21from morgoth.utils.env import get_env_value22from morgoth.utils.upload_utils import upload_grb_report, upload_plot, upload_datafile23base_dir = get_env_value("GBM_TRIGGER_DATA_DIR")24class UploadReport(luigi.Task):25 grb_name = luigi.Parameter()26 report_type = luigi.Parameter()27 version = luigi.Parameter()28 def requires(self):29 return ProcessFitResults(30 grb_name=self.grb_name, report_type=self.report_type, version=self.version31 )32 def output(self):33 filename = f"{self.report_type}_{self.version}_report.yml"34 return luigi.LocalTarget(35 os.path.join(36 base_dir, self.grb_name, self.report_type, self.version, filename37 )38 )39 def run(self):40 with self.input()["result_file"].open() as f:41 result = yaml.safe_load(f)42 report = upload_grb_report(43 grb_name=self.grb_name,44 result=result,45 wait_time=float(46 morgoth_config["upload"]["report"]["interval"]47 ),48 max_time=float(49 morgoth_config["upload"]["report"]["max_time"]50 ),51 )52 report_name = f"{self.report_type}_{self.version}_report.yml"53 report_path = os.path.join(54 base_dir, self.grb_name, self.report_type, self.version, report_name55 )56 with open(report_path, "w") as f:57 yaml.dump(report, f, default_flow_style=False)58class UploadAllDataFiles(luigi.Task):59 grb_name = luigi.Parameter()60 report_type = luigi.Parameter()61 version = luigi.Parameter(default="v00")62 63 def requires(self):64 return {65 "healpix": UploadHealpix(66 grb_name=self.grb_name,67 report_type=self.report_type,68 version=self.version,69 ),70 "healpixSysErr": UploadHealpixSysErr(71 grb_name=self.grb_name,72 report_type=self.report_type,73 version=self.version,74 ),75 }76 def output(self):77 filename = f"{self.report_type}_{self.version}_upload_datafiles.done"78 return luigi.LocalTarget(79 os.path.join(80 base_dir,81 self.grb_name,82 self.report_type,83 self.version,84 "upload",85 filename,86 )87 )88 def run(self):89 filename = f"{self.report_type}_{self.version}_upload_datafiles.done"90 tmp = os.path.join(91 base_dir, self.grb_name, self.report_type, self.version, "upload", filename92 )93 if_dir_containing_file_not_existing_then_make(tmp)94 os.system(f"touch {tmp}")95class UploadHealpix(luigi.Task):96 grb_name = luigi.Parameter()97 report_type = luigi.Parameter()98 version = luigi.Parameter(default="v00")99 def requires(self):100 return {101 "create_report": UploadReport(102 grb_name=self.grb_name,103 report_type=self.report_type,104 version=self.version,105 ),106 "data_file": CreateHealpix(107 grb_name=self.grb_name,108 report_type=self.report_type,109 version=self.version,110 ),111 }112 def output(self):113 filename = f"{self.report_type}_{self.version}_upload_healpix.done"114 return luigi.LocalTarget(115 os.path.join(116 base_dir,117 self.grb_name,118 self.report_type,119 self.version,120 "upload",121 filename,122 )123 )124 def run(self):125 126 upload_datafile(127 grb_name=self.grb_name,128 report_type=self.report_type,129 data_file=self.input()["data_file"].path,130 file_type="healpix",131 version=self.version,132 wait_time=float(133 morgoth_config["upload"]["plot"]["interval"]134 ),135 max_time=float(136 morgoth_config["upload"]["plot"]["max_time"]137 ),138 )139 if_dir_containing_file_not_existing_then_make(self.output().path)140 os.system(f"touch {self.output().path}")141class UploadHealpixSysErr(luigi.Task):142 grb_name = luigi.Parameter()143 report_type = luigi.Parameter()144 version = luigi.Parameter(default="v00")145 def requires(self):146 return {147 "create_report": UploadReport(148 grb_name=self.grb_name,149 report_type=self.report_type,150 version=self.version,151 ),152 "data_file": CreateHealpixSysErr(153 grb_name=self.grb_name,154 report_type=self.report_type,155 version=self.version,156 ),157 }158 def output(self):159 filename = f"{self.report_type}_{self.version}_upload_healpixsyserr.done"160 return luigi.LocalTarget(161 os.path.join(162 base_dir,163 self.grb_name,164 self.report_type,165 self.version,166 "upload",167 filename,168 )169 )170 def run(self):171 upload_datafile(172 grb_name=self.grb_name,173 report_type=self.report_type,174 data_file=self.input()["data_file"].path,175 file_type="healpixSysErr",176 version=self.version,177 wait_time=float(178 morgoth_config["upload"]["plot"]["interval"]179 ),180 max_time=float(181 morgoth_config["upload"]["plot"]["max_time"]182 ),183 )184 filename = f"{self.report_type}_{self.version}_upload_plot_location.done"185 tmp = os.path.join(186 base_dir, self.grb_name, self.report_type, self.version, "upload", filename187 )188 if_dir_containing_file_not_existing_then_make(self.output().path)189 os.system(f"touch {self.output().path}")190class UploadAllPlots(luigi.Task):191 grb_name = luigi.Parameter()192 report_type = luigi.Parameter()193 version = luigi.Parameter(default="v00")194 def requires(self):195 return {196 "lightcurves": UploadAllLightcurves(197 grb_name=self.grb_name,198 report_type=self.report_type,199 version=self.version,200 ),201 "location": UploadLocationPlot(202 grb_name=self.grb_name,203 report_type=self.report_type,204 version=self.version,205 ),206 "corner": UploadCornerPlot(207 grb_name=self.grb_name,208 report_type=self.report_type,209 version=self.version,210 ),211 "molllocation": UploadMollLocationPlot(212 grb_name=self.grb_name,213 report_type=self.report_type,214 version=self.version,215 ),216 "satellite": UploadSatellitePlot(217 grb_name=self.grb_name,218 report_type=self.report_type,219 version=self.version,220 ),221 "spectrum": UploadSpectrumPlot(222 grb_name=self.grb_name,223 report_type=self.report_type,224 version=self.version,225 ),226 "3d_location": Upload3DLocationPlot(227 grb_name=self.grb_name,228 report_type=self.report_type,229 version=self.version,230 ),231 "balrogswift": UploadBalrogSwiftPlot(232 grb_name=self.grb_name,233 report_type=self.report_type,234 version=self.version,235 ),236 }237 def output(self):238 filename = f"{self.report_type}_{self.version}_upload_plot_all.done"239 return luigi.LocalTarget(240 os.path.join(241 base_dir,242 self.grb_name,243 self.report_type,244 self.version,245 "upload",246 filename,247 )248 )249 def run(self):250 filename = f"{self.report_type}_{self.version}_upload_plot_all.done"251 tmp = os.path.join(252 base_dir, self.grb_name, self.report_type, self.version, "upload", filename253 )254 if_dir_containing_file_not_existing_then_make(tmp)255 os.system(f"touch {tmp}")256class UploadAllLightcurves(luigi.Task):257 grb_name = luigi.Parameter()258 report_type = luigi.Parameter()259 version = luigi.Parameter(default="v00")260 def requires(self):261 return {262 "n0": UploadLightcurve(263 grb_name=self.grb_name,264 report_type=self.report_type,265 detector="n0",266 version=self.version,267 ),268 "n1": UploadLightcurve(269 grb_name=self.grb_name,270 report_type=self.report_type,271 detector="n1",272 version=self.version,273 ),274 "n2": UploadLightcurve(275 grb_name=self.grb_name,276 report_type=self.report_type,277 detector="n2",278 version=self.version,279 ),280 "n3": UploadLightcurve(281 grb_name=self.grb_name,282 report_type=self.report_type,283 detector="n3",284 version=self.version,285 ),286 "n4": UploadLightcurve(287 grb_name=self.grb_name,288 report_type=self.report_type,289 detector="n4",290 version=self.version,291 ),292 "n5": UploadLightcurve(293 grb_name=self.grb_name,294 report_type=self.report_type,295 detector="n5",296 version=self.version,297 ),298 "n6": UploadLightcurve(299 grb_name=self.grb_name,300 report_type=self.report_type,301 detector="n6",302 version=self.version,303 ),304 "n7": UploadLightcurve(305 grb_name=self.grb_name,306 report_type=self.report_type,307 detector="n7",308 version=self.version,309 ),310 "n8": UploadLightcurve(311 grb_name=self.grb_name,312 report_type=self.report_type,313 detector="n8",314 version=self.version,315 ),316 "n9": UploadLightcurve(317 grb_name=self.grb_name,318 report_type=self.report_type,319 detector="n9",320 version=self.version,321 ),322 "na": UploadLightcurve(323 grb_name=self.grb_name,324 report_type=self.report_type,325 detector="na",326 version=self.version,327 ),328 "nb": UploadLightcurve(329 grb_name=self.grb_name,330 report_type=self.report_type,331 detector="nb",332 version=self.version,333 ),334 "b0": UploadLightcurve(335 grb_name=self.grb_name,336 report_type=self.report_type,337 detector="b0",338 version=self.version,339 ),340 "b1": UploadLightcurve(341 grb_name=self.grb_name,342 report_type=self.report_type,343 detector="b1",344 version=self.version,345 ),346 }347 def output(self):348 filename = f"{self.report_type}_{self.version}_upload_plot_all_lightcurves.done"349 return luigi.LocalTarget(350 os.path.join(351 base_dir,352 self.grb_name,353 self.report_type,354 self.version,355 "upload",356 filename,357 )358 )359 def run(self):360 filename = f"{self.report_type}_{self.version}_upload_plot_all_lightcurves.done"361 tmp = os.path.join(362 base_dir, self.grb_name, self.report_type, self.version, "upload", filename363 )364 if_dir_containing_file_not_existing_then_make(tmp)365 os.system(f"touch {tmp}")366class UploadLightcurve(luigi.Task):367 grb_name = luigi.Parameter()368 report_type = luigi.Parameter()369 detector = luigi.Parameter()370 version = luigi.Parameter(default="v00")371 def requires(self):372 return {373 "create_report": UploadReport(374 grb_name=self.grb_name,375 report_type=self.report_type,376 version=self.version,377 ),378 "plot_file": CreateLightcurve(379 grb_name=self.grb_name,380 report_type=self.report_type,381 detector=self.detector,382 version=self.version,383 ),384 }385 def output(self):386 filename = f"{self.report_type}_{self.version}_{self.detector}_upload_plot_lightcurve.done"387 return luigi.LocalTarget(388 os.path.join(389 base_dir,390 self.grb_name,391 self.report_type,392 self.version,393 "upload",394 filename,395 )396 )397 def run(self):398 upload_plot(399 grb_name=self.grb_name,400 report_type=self.report_type,401 plot_file=self.input()["plot_file"].path,402 plot_type="lightcurve",403 version=self.version,404 wait_time=float(405 morgoth_config["upload"]["plot"]["interval"]406 ),407 max_time=float(408 morgoth_config["upload"]["plot"]["max_time"]409 ),410 det_name=self.detector,411 )412 filename = f"{self.report_type}_{self.version}_{self.detector}_upload_plot_lightcurve.done"413 tmp = os.path.join(414 base_dir, self.grb_name, self.report_type, self.version, "upload", filename415 )416 if_dir_containing_file_not_existing_then_make(tmp)417 os.system(f"touch {tmp}")418class UploadLocationPlot(luigi.Task):419 grb_name = luigi.Parameter()420 report_type = luigi.Parameter()421 version = luigi.Parameter(default="v00")422 def requires(self):423 return {424 "create_report": UploadReport(425 grb_name=self.grb_name,426 report_type=self.report_type,427 version=self.version,428 ),429 "plot_file": CreateLocationPlot(430 grb_name=self.grb_name,431 report_type=self.report_type,432 version=self.version,433 ),434 }435 def output(self):436 filename = f"{self.report_type}_{self.version}_upload_plot_location.done"437 return luigi.LocalTarget(438 os.path.join(439 base_dir,440 self.grb_name,441 self.report_type,442 self.version,443 "upload",444 filename,445 )446 )447 def run(self):448 upload_plot(449 grb_name=self.grb_name,450 report_type=self.report_type,451 plot_file=self.input()["plot_file"].path,452 plot_type="location",453 version=self.version,454 wait_time=float(455 morgoth_config["upload"]["plot"]["interval"]456 ),457 max_time=float(458 morgoth_config["upload"]["plot"]["max_time"]459 ),460 )461 filename = f"{self.report_type}_{self.version}_upload_plot_location.done"462 tmp = os.path.join(463 base_dir, self.grb_name, self.report_type, self.version, "upload", filename464 )465 if_dir_containing_file_not_existing_then_make(tmp)466 os.system(f"touch {tmp}")467class UploadCornerPlot(luigi.Task):468 grb_name = luigi.Parameter()469 report_type = luigi.Parameter()470 version = luigi.Parameter(default="v00")471 def requires(self):472 return {473 "create_report": UploadReport(474 grb_name=self.grb_name,475 report_type=self.report_type,476 version=self.version,477 ),478 "plot_file": CreateCornerPlot(479 grb_name=self.grb_name,480 report_type=self.report_type,481 version=self.version,482 ),483 }484 def output(self):485 filename = f"{self.report_type}_{self.version}_upload_plot_corner.done"486 return luigi.LocalTarget(487 os.path.join(488 base_dir,489 self.grb_name,490 self.report_type,491 self.version,492 "upload",493 filename,494 )495 )496 def run(self):497 upload_plot(498 grb_name=self.grb_name,499 report_type=self.report_type,500 plot_file=self.input()["plot_file"].path,501 plot_type="allcorner",502 version=self.version,503 wait_time=float(504 morgoth_config["upload"]["plot"]["interval"]505 ),506 max_time=float(507 morgoth_config["upload"]["plot"]["max_time"]508 ),509 )510 filename = f"{self.report_type}_{self.version}_upload_plot_corner.done"511 tmp = os.path.join(512 base_dir, self.grb_name, self.report_type, self.version, "upload", filename513 )514 if_dir_containing_file_not_existing_then_make(tmp)515 os.system(f"touch {tmp}")516class UploadMollLocationPlot(luigi.Task):517 grb_name = luigi.Parameter()518 report_type = luigi.Parameter()519 version = luigi.Parameter(default="v00")520 def requires(self):521 return {522 "create_report": UploadReport(523 grb_name=self.grb_name,524 report_type=self.report_type,525 version=self.version,526 ),527 "plot_file": CreateMollLocationPlot(528 grb_name=self.grb_name,529 report_type=self.report_type,530 version=self.version,531 ),532 }533 def output(self):534 filename = f"{self.report_type}_{self.version}_upload_plot_molllocation.done"535 return luigi.LocalTarget(536 os.path.join(537 base_dir,538 self.grb_name,539 self.report_type,540 self.version,541 "upload",542 filename,543 )544 )545 def run(self):546 upload_plot(547 grb_name=self.grb_name,548 report_type=self.report_type,549 plot_file=self.input()["plot_file"].path,550 plot_type="molllocation",551 version=self.version,552 wait_time=float(553 morgoth_config["upload"]["plot"]["interval"]554 ),555 max_time=float(556 morgoth_config["upload"]["plot"]["max_time"]557 ),558 )559 filename = f"{self.report_type}_{self.version}_upload_plot_molllocation.done"560 tmp = os.path.join(561 base_dir, self.grb_name, self.report_type, self.version, "upload", filename562 )563 if_dir_containing_file_not_existing_then_make(tmp)564 os.system(f"touch {tmp}")565class UploadSatellitePlot(luigi.Task):566 grb_name = luigi.Parameter()567 report_type = luigi.Parameter()568 version = luigi.Parameter(default="v00")569 def requires(self):570 return {571 "create_report": UploadReport(572 grb_name=self.grb_name,573 report_type=self.report_type,574 version=self.version,575 ),576 "plot_file": CreateSatellitePlot(577 grb_name=self.grb_name,578 report_type=self.report_type,579 version=self.version,580 ),581 }582 def output(self):583 filename = f"{self.report_type}_{self.version}_upload_plot_satellite.done"584 return luigi.LocalTarget(585 os.path.join(586 base_dir,587 self.grb_name,588 self.report_type,589 self.version,590 "upload",591 filename,592 )593 )594 def run(self):595 upload_plot(596 grb_name=self.grb_name,597 report_type=self.report_type,598 plot_file=self.input()["plot_file"].path,599 plot_type="satellite",600 version=self.version,601 wait_time=float(602 morgoth_config["upload"]["plot"]["interval"]603 ),604 max_time=float(605 morgoth_config["upload"]["plot"]["max_time"]606 ),607 )608 filename = f"{self.report_type}_{self.version}_upload_plot_satellite.done"609 tmp = os.path.join(610 base_dir, self.grb_name, self.report_type, self.version, "upload", filename611 )612 if_dir_containing_file_not_existing_then_make(tmp)613 os.system(f"touch {tmp}")614class UploadSpectrumPlot(luigi.Task):615 grb_name = luigi.Parameter()616 report_type = luigi.Parameter()617 version = luigi.Parameter(default="v00")618 def requires(self):619 return {620 "create_report": UploadReport(621 grb_name=self.grb_name,622 report_type=self.report_type,623 version=self.version,624 ),625 "plot_file": CreateSpectrumPlot(626 grb_name=self.grb_name,627 report_type=self.report_type,628 version=self.version,629 ),630 }631 def output(self):632 filename = f"{self.report_type}_{self.version}_upload_plot_spectrum.done"633 return luigi.LocalTarget(634 os.path.join(635 base_dir,636 self.grb_name,637 self.report_type,638 self.version,639 "upload",640 filename,641 )642 )643 def run(self):644 upload_plot(645 grb_name=self.grb_name,646 report_type=self.report_type,647 plot_file=self.input()["plot_file"].path,648 plot_type="spectrum",649 version=self.version,650 wait_time=float(651 morgoth_config["upload"]["plot"]["interval"]652 ),653 max_time=float(654 morgoth_config["upload"]["plot"]["max_time"]655 ),656 )657 filename = f"{self.report_type}_{self.version}_upload_plot_spectrum.done"658 tmp = os.path.join(659 base_dir, self.grb_name, self.report_type, self.version, "upload", filename660 )661 if_dir_containing_file_not_existing_then_make(tmp)662 os.system(f"touch {tmp}")663class Upload3DLocationPlot(luigi.Task):664 grb_name = luigi.Parameter()665 report_type = luigi.Parameter()666 version = luigi.Parameter(default="v00")667 def requires(self):668 return {669 "create_report": UploadReport(670 grb_name=self.grb_name,671 report_type=self.report_type,672 version=self.version,673 ),674 "plot_file": Create3DLocationPlot(675 grb_name=self.grb_name,676 report_type=self.report_type,677 version=self.version,678 ),679 }680 def output(self):681 filename = f"{self.report_type}_{self.version}_upload_plot_3dlocation.done"682 return luigi.LocalTarget(683 os.path.join(684 base_dir,685 self.grb_name,686 self.report_type,687 self.version,688 "upload",689 filename,690 )691 )692 def run(self):693 upload_plot(694 grb_name=self.grb_name,695 report_type=self.report_type,696 plot_file=self.input()["plot_file"].path,697 plot_type="3dlocation",698 version=self.version,699 wait_time=float(700 morgoth_config["upload"]["plot"]["interval"]701 ),702 max_time=float(703 morgoth_config["upload"]["plot"]["max_time"]704 ),705 )706 filename = f"{self.report_type}_{self.version}_upload_plot_3dlocation.done"707 tmp = os.path.join(708 base_dir, self.grb_name, self.report_type, self.version, "upload", filename709 )710 if_dir_containing_file_not_existing_then_make(tmp)711 os.system(f"touch {tmp}")712class UploadBalrogSwiftPlot(luigi.Task):713 grb_name = luigi.Parameter()714 report_type = luigi.Parameter()715 version = luigi.Parameter(default="v00")716 def requires(self):717 return {718 "create_report": UploadReport(719 grb_name=self.grb_name,720 report_type=self.report_type,721 version=self.version,722 ),723 "plot_file": CreateBalrogSwiftPlot(724 grb_name=self.grb_name,725 report_type=self.report_type,726 version=self.version,727 ),728 }729 def output(self):730 filename = f"{self.report_type}_{self.version}_upload_plot_balrogswift.done"731 return luigi.LocalTarget(732 os.path.join(733 base_dir,734 self.grb_name,735 self.report_type,736 self.version,737 "upload",738 filename,739 )740 )741 def run(self):742 upload_plot(743 grb_name=self.grb_name,744 report_type=self.report_type,745 plot_file=self.input()["plot_file"].path,746 plot_type="balrogswift",747 version=self.version,748 wait_time=float(749 morgoth_config["upload"]["plot"]["interval"]750 ),751 max_time=float(752 morgoth_config["upload"]["plot"]["max_time"]753 ),754 )755 filename = f"{self.report_type}_{self.version}_upload_plot_balrogswift.done"756 tmp = os.path.join(757 base_dir, self.grb_name, self.report_type, self.version, "upload", filename758 )759 if_dir_containing_file_not_existing_then_make(tmp)...

Full Screen

Full Screen

plots.py

Source:plots.py Github

copy

Full Screen

1import os2import luigi3import yaml4from morgoth.balrog_handlers import ProcessFitResults5from morgoth.downloaders import DownloadTrigdat, GatherTrigdatDownload6from morgoth.exceptions.custom_exceptions import UnkownReportType7from morgoth.utils.env import get_env_value8from morgoth.utils.plot_utils import (9 azimuthal_plot_sat_frame,10 create_corner_all_plot,11 create_corner_loc_plot,12 interactive_3D_plot,13 mollweide_plot,14 swift_gbm_plot,15)16base_dir = get_env_value("GBM_TRIGGER_DATA_DIR")17class CreateAllPlots(luigi.Task):18 grb_name = luigi.Parameter()19 report_type = luigi.Parameter()20 version = luigi.Parameter(default="v00")21 def requires(self):22 return {23 "lightcurves": CreateAllLightcurves(24 grb_name=self.grb_name,25 report_type=self.report_type,26 version=self.version,27 ),28 "location": CreateLocationPlot(29 grb_name=self.grb_name,30 report_type=self.report_type,31 version=self.version,32 ),33 "corner": CreateCornerPlot(34 grb_name=self.grb_name,35 report_type=self.report_type,36 version=self.version,37 ),38 "molllocation": CreateMollLocationPlot(39 grb_name=self.grb_name,40 report_type=self.report_type,41 version=self.version,42 ),43 "satellite": CreateSatellitePlot(44 grb_name=self.grb_name,45 report_type=self.report_type,46 version=self.version,47 ),48 "spectrum": CreateSpectrumPlot(49 grb_name=self.grb_name,50 report_type=self.report_type,51 version=self.version,52 ),53 "3dlocation": Create3DLocationPlot(54 grb_name=self.grb_name,55 report_type=self.report_type,56 version=self.version,57 ),58 "balrogswift": CreateBalrogSwiftPlot(59 grb_name=self.grb_name,60 report_type=self.report_type,61 version=self.version,62 ),63 }64 def output(self):65 filename = f"{self.report_type}_{self.version}_plot_all.txt"66 return luigi.LocalTarget(67 os.path.join(68 base_dir, self.grb_name, self.report_type, self.version, filename69 )70 )71 def run(self):72 filename = f"{self.report_type}_{self.version}_plot_all.txt"73 tmp = os.path.join(74 base_dir, self.grb_name, self.report_type, self.version, filename75 )76 os.system(f"touch {tmp}")77class CreateAllLightcurves(luigi.Task):78 grb_name = luigi.Parameter()79 report_type = luigi.Parameter()80 version = luigi.Parameter(default="v00")81 def requires(self):82 return {83 "n0": CreateLightcurve(84 grb_name=self.grb_name,85 report_type=self.report_type,86 detector="n0",87 version=self.version,88 ),89 "n1": CreateLightcurve(90 grb_name=self.grb_name,91 report_type=self.report_type,92 detector="n1",93 version=self.version,94 ),95 "n2": CreateLightcurve(96 grb_name=self.grb_name,97 report_type=self.report_type,98 detector="n2",99 version=self.version,100 ),101 "n3": CreateLightcurve(102 grb_name=self.grb_name,103 report_type=self.report_type,104 detector="n3",105 version=self.version,106 ),107 "n4": CreateLightcurve(108 grb_name=self.grb_name,109 report_type=self.report_type,110 detector="n4",111 version=self.version,112 ),113 "n5": CreateLightcurve(114 grb_name=self.grb_name,115 report_type=self.report_type,116 detector="n5",117 version=self.version,118 ),119 "n6": CreateLightcurve(120 grb_name=self.grb_name,121 report_type=self.report_type,122 detector="n6",123 version=self.version,124 ),125 "n7": CreateLightcurve(126 grb_name=self.grb_name,127 report_type=self.report_type,128 detector="n7",129 version=self.version,130 ),131 "n8": CreateLightcurve(132 grb_name=self.grb_name,133 report_type=self.report_type,134 detector="n8",135 version=self.version,136 ),137 "n9": CreateLightcurve(138 grb_name=self.grb_name,139 report_type=self.report_type,140 detector="n9",141 version=self.version,142 ),143 "na": CreateLightcurve(144 grb_name=self.grb_name,145 report_type=self.report_type,146 detector="na",147 version=self.version,148 ),149 "nb": CreateLightcurve(150 grb_name=self.grb_name,151 report_type=self.report_type,152 detector="nb",153 version=self.version,154 ),155 "b0": CreateLightcurve(156 grb_name=self.grb_name,157 report_type=self.report_type,158 detector="b0",159 version=self.version,160 ),161 "b1": CreateLightcurve(162 grb_name=self.grb_name,163 report_type=self.report_type,164 detector="b1",165 version=self.version,166 ),167 }168 def output(self):169 filename = f"{self.report_type}_{self.version}_plot_all_lightcurves.txt"170 return luigi.LocalTarget(171 os.path.join(172 base_dir, self.grb_name, self.report_type, self.version, filename173 )174 )175 def run(self):176 filename = f"{self.report_type}_{self.version}_plot_all_lightcurves.txt"177 tmp = os.path.join(178 base_dir, self.grb_name, self.report_type, self.version, filename179 )180 os.system(f"touch {tmp}")181class CreateLightcurve(luigi.Task):182 grb_name = luigi.Parameter()183 report_type = luigi.Parameter()184 detector = luigi.Parameter()185 version = luigi.Parameter(default="v00")186 def requires(self):187 return ProcessFitResults(188 grb_name=self.grb_name, report_type=self.report_type, version=self.version189 )190 def output(self):191 base_job = os.path.join(base_dir, self.grb_name, self.report_type, self.version)192 filename = f"{self.grb_name}_lightcurve_{self.report_type}_detector_{self.detector}_plot_{self.version}.png"193 return luigi.LocalTarget(194 os.path.join(base_job, "plots", "lightcurves", filename)195 )196 def run(self):197 # The lightcurve is created in the background fit Task, this task will check if the creation was successful198 pass199class CreateLocationPlot(luigi.Task):200 grb_name = luigi.Parameter()201 report_type = luigi.Parameter()202 version = luigi.Parameter(default="v00")203 def requires(self):204 return ProcessFitResults(205 grb_name=self.grb_name, report_type=self.report_type, version=self.version206 )207 def output(self):208 filename = (209 f"{self.grb_name}_location_plot_{self.report_type}_{self.version}.png"210 )211 return luigi.LocalTarget(212 os.path.join(213 base_dir,214 self.grb_name,215 self.report_type,216 self.version,217 "plots",218 filename,219 )220 )221 def run(self):222 with self.input()["result_file"].open() as f:223 result = yaml.safe_load(f)224 create_corner_loc_plot(225 post_equal_weights_file=self.input()["post_equal_weights"].path,226 model=result["fit_result"]["model"],227 save_path=self.output().path,228 )229class CreateCornerPlot(luigi.Task):230 grb_name = luigi.Parameter()231 report_type = luigi.Parameter()232 version = luigi.Parameter(default="v00")233 def requires(self):234 return ProcessFitResults(235 grb_name=self.grb_name, report_type=self.report_type, version=self.version236 )237 def output(self):238 filename = (239 f"{self.grb_name}_allcorner_plot_{self.report_type}_{self.version}.png"240 )241 return luigi.LocalTarget(242 os.path.join(243 base_dir,244 self.grb_name,245 self.report_type,246 self.version,247 "plots",248 filename,249 )250 )251 def run(self):252 with self.input()["result_file"].open() as f:253 result = yaml.safe_load(f)254 create_corner_all_plot(255 post_equal_weights_file=self.input()["post_equal_weights"].path,256 model=result["fit_result"]["model"],257 save_path=self.output().path,258 )259class CreateMollLocationPlot(luigi.Task):260 grb_name = luigi.Parameter()261 report_type = luigi.Parameter()262 version = luigi.Parameter(default="v00")263 def requires(self):264 return {265 "fit_result": ProcessFitResults(266 grb_name=self.grb_name,267 report_type=self.report_type,268 version=self.version,269 ),270 "trigdat_version": GatherTrigdatDownload(grb_name=self.grb_name),271 }272 def output(self):273 filename = (274 f"{base_dir}/{self.grb_name}/{self.report_type}/{self.version}/plots/"275 f"{self.grb_name}_molllocation_plot_{self.report_type}_{self.version}.png"276 )277 return luigi.LocalTarget(filename)278 def run(self):279 with self.input()["fit_result"]["result_file"].open() as f:280 result = yaml.safe_load(f)281 if self.report_type.lower() == "tte":282 with self.input()["trigdat_version"].open() as f:283 trigdat_version = yaml.safe_load(f)["trigdat_version"]284 trigdat_file = DownloadTrigdat(285 grb_name=self.grb_name, version=trigdat_version286 ).output()287 elif self.report_type.lower() == "trigdat":288 trigdat_file = DownloadTrigdat(289 grb_name=self.grb_name, version=self.version290 ).output()291 else:292 raise UnkownReportType(293 f"The report_type '{self.report_type}' is not valid!"294 )295 mollweide_plot(296 grb_name=self.grb_name,297 trigdat_file=trigdat_file.path,298 post_equal_weights_file=self.input()["fit_result"][299 "post_equal_weights"300 ].path,301 used_dets=result["time_selection"]["used_detectors"],302 model=result["fit_result"]["model"],303 ra=result["fit_result"]["ra"],304 dec=result["fit_result"]["dec"],305 swift=result["general"]["swift"],306 save_path=self.output().path,307 )308class CreateSatellitePlot(luigi.Task):309 grb_name = luigi.Parameter()310 report_type = luigi.Parameter()311 version = luigi.Parameter(default="v00")312 def requires(self):313 return {314 "fit_result": ProcessFitResults(315 grb_name=self.grb_name,316 report_type=self.report_type,317 version=self.version,318 ),319 "trigdat_version": GatherTrigdatDownload(grb_name=self.grb_name),320 }321 def output(self):322 filename = (323 f"{self.grb_name}_satellite_plot_{self.report_type}_{self.version}.png"324 )325 return luigi.LocalTarget(326 os.path.join(327 base_dir,328 self.grb_name,329 self.report_type,330 self.version,331 "plots",332 filename,333 )334 )335 def run(self):336 with self.input()["fit_result"]["result_file"].open() as f:337 result = yaml.safe_load(f)338 if self.report_type.lower() == "tte":339 with self.input()["trigdat_version"].open() as f:340 trigdat_version = yaml.safe_load(f)["trigdat_version"]341 trigdat_file = DownloadTrigdat(342 grb_name=self.grb_name, version=trigdat_version343 ).output()344 elif self.report_type.lower() == "trigdat":345 trigdat_file = DownloadTrigdat(346 grb_name=self.grb_name, version=self.version347 ).output()348 else:349 raise UnkownReportType(350 f"The report_type '{self.report_type}' is not valid!"351 )352 azimuthal_plot_sat_frame(353 grb_name=self.grb_name,354 trigdat_file=trigdat_file.path,355 ra=result["fit_result"]["ra"],356 dec=result["fit_result"]["dec"],357 save_path=self.output().path,358 )359class CreateSpectrumPlot(luigi.Task):360 grb_name = luigi.Parameter()361 report_type = luigi.Parameter()362 version = luigi.Parameter(default="v00")363 def requires(self):364 return ProcessFitResults(365 grb_name=self.grb_name, report_type=self.report_type, version=self.version366 )367 def output(self):368 filename = (369 f"{self.grb_name}_spectrum_plot_{self.report_type}_{self.version}.png"370 )371 return luigi.LocalTarget(372 os.path.join(373 base_dir,374 self.grb_name,375 self.report_type,376 self.version,377 "plots",378 filename,379 )380 )381 def run(self):382 # The spectrum plot is created in the balrog fit Task, this task will check if the creation was successful383 pass384class Create3DLocationPlot(luigi.Task):385 grb_name = luigi.Parameter()386 report_type = luigi.Parameter()387 version = luigi.Parameter(default="v00")388 def requires(self):389 return {390 "fit_result": ProcessFitResults(391 grb_name=self.grb_name,392 report_type=self.report_type,393 version=self.version,394 ),395 "trigdat_version": GatherTrigdatDownload(grb_name=self.grb_name),396 }397 def output(self):398 filename = (399 f"{self.grb_name}_3dlocation_plot_{self.report_type}_{self.version}.html"400 )401 return luigi.LocalTarget(402 os.path.join(403 base_dir,404 self.grb_name,405 self.report_type,406 self.version,407 "plots",408 filename,409 )410 )411 def run(self):412 with self.input()["fit_result"]["result_file"].open() as f:413 result = yaml.safe_load(f)414 if self.report_type.lower() == "tte":415 with self.input()["trigdat_version"].open() as f:416 trigdat_version = yaml.safe_load(f)["trigdat_version"]417 trigdat_file = DownloadTrigdat(418 grb_name=self.grb_name, version=trigdat_version419 ).output()420 elif self.report_type.lower() == "trigdat":421 trigdat_file = DownloadTrigdat(422 grb_name=self.grb_name, version=self.version423 ).output()424 else:425 raise UnkownReportType(426 f"The report_type '{self.report_type}' is not valid!"427 )428 interactive_3D_plot(429 trigdat_file=trigdat_file.path,430 post_equal_weights_file=self.input()["fit_result"][431 "post_equal_weights"432 ].path,433 used_dets=result["time_selection"]["used_detectors"],434 model=result["fit_result"]["model"],435 save_path=self.output().path,436 )437class CreateBalrogSwiftPlot(luigi.Task):438 grb_name = luigi.Parameter()439 report_type = luigi.Parameter()440 version = luigi.Parameter(default="v00")441 def requires(self):442 return ProcessFitResults(443 grb_name=self.grb_name, report_type=self.report_type, version=self.version444 )445 def output(self):446 filename = (447 f"{self.grb_name}_balrogswift_plot_{self.report_type}_{self.version}.png"448 )449 return luigi.LocalTarget(450 os.path.join(451 base_dir,452 self.grb_name,453 self.report_type,454 self.version,455 "plots",456 filename,457 )458 )459 def run(self):460 with self.input()["result_file"].open() as f:461 result = yaml.safe_load(f)462 swift_gbm_plot(463 grb_name=self.grb_name,464 post_equal_weights_file=self.input()["post_equal_weights"].path,465 model=result["fit_result"]["model"],466 ra=result["fit_result"]["ra"],467 dec=result["fit_result"]["dec"],468 swift=result["general"]["swift"],469 save_path=self.output().path,...

Full Screen

Full Screen

copy_report_type.py

Source:copy_report_type.py Github

copy

Full Screen

1# -*- encoding: utf-8 -*-2from optparse import make_option3import copy4from django.core.management.base import BaseCommand5from accounts.models import Authority6from common.models import Domain7from reports.models import ReportType, ReportState, ReportTypeCategory, CaseDefinition8class Command(BaseCommand):9 args = '<from_domain> <to_domain> <report_type_id report_type_id ...>'10 help = 'Copy report types'11 option_list = BaseCommand.option_list + (12 make_option(13 '--from-authority',14 action='store',15 dest='from_authority',16 default=None,17 help='Specify origin authority'18 ),19 make_option(20 '--to-authority',21 action='store',22 dest='to_authority',23 default=None,24 help='Specify target authority'25 ),26 make_option(27 '--force',28 action='store_true',29 dest='force',30 default=False,31 help='Whether to force mode, default is dry_run mode'32 )33 )34 def _copy_report_states(self, from_domain, to_domain, from_report_type, to_report_type, dry_run=True):35 original_report_states = ReportState.objects.filter(domain=from_domain, report_type=from_report_type)36 for original_report_state in original_report_states:37 should_save_report_state = False38 try:39 report_state = ReportState.objects.get(domain=to_domain, report_type=to_report_type, code=original_report_state.code)40 if report_state.name != original_report_state.name:41 report_state.name = original_report_state.name42 should_save_report_state = True43 except ReportState.DoesNotExist:44 should_save_report_state = True45 report_state = copy.deepcopy(original_report_state)46 report_state.pk = None47 report_state.domain = to_domain48 report_state.report_type = to_report_type49 if should_save_report_state:50 print "Will copy report state from %s to %s using this data:" % (from_domain.name, to_domain.name)51 print " [FROM] id:%s domain:%s authority:%s report_type:%s report_state_code:%s" % (52 original_report_state.pk, original_report_state.domain.name, original_report_state.report_type.authority, original_report_state.report_type, original_report_state.code)53 print " [TO] id:%s domain:%s authority:%s report_type:%s report_state_code:%s" % (54 report_state.pk, report_state.domain.name, report_state.report_type.authority, report_state.report_type, report_state.code)55 if not dry_run:56 report_state.save()57 print " - Saved id: %s" % report_state.pk58 print ""59 def _copy_case_definitions(self, from_domain, to_domain, from_report_type, to_report_type, dry_run=True):60 original_case_definitions = CaseDefinition.objects.filter(domain=from_domain, report_type=from_report_type)61 for original_case_definition in original_case_definitions:62 should_save_case_definition = False63 try:64 case_definition = CaseDefinition.objects.get(domain=to_domain, report_type=to_report_type, code=original_case_definition.code)65 if case_definition.epl != original_case_definition.epl:66 case_definition.epl = original_case_definition.epl67 should_save_case_definition = True68 if case_definition.description != original_case_definition.description:69 case_definition.description = original_case_definition.description70 should_save_case_definition = True71 if case_definition.accumulate != original_case_definition.accumulate:72 case_definition.accumulate = original_case_definition.accumulate73 should_save_case_definition = True74 if case_definition.window != original_case_definition.window:75 case_definition.window = original_case_definition.window76 should_save_case_definition = True77 except CaseDefinition.DoesNotExist:78 should_save_case_definition = True79 case_definition = copy.deepcopy(original_case_definition)80 case_definition.pk = None81 case_definition.domain = to_domain82 case_definition.report_type = to_report_type83 if should_save_case_definition:84 print "Will copy case definition from %s to %s using this data:" % (from_domain.name, to_domain.name)85 print " [FROM] id:%s domain:%s report_type:%s case_def_code:%s" % (86 original_case_definition.pk, original_case_definition.domain, original_case_definition.report_type.name, original_case_definition.code)87 print " [TO] id:%s domain:%s report_type:%s case_def_code:%s" % (88 case_definition.pk, case_definition.domain.name, case_definition.report_type.name, original_case_definition.code)89 if not dry_run:90 case_definition.from_state = ReportState.objects.get(domain=to_domain, report_type=to_report_type, code=original_case_definition.from_state.code)91 case_definition.to_state = ReportState.objects.get(domain=to_domain, report_type=to_report_type, code=original_case_definition.to_state.code)92 case_definition.save()93 print " - Saved id: %s" % case_definition.pk94 print "--"95 def _copy_report_type_categories(self, from_domain, to_domain, dry_run=True):96 original_report_type_categories = ReportTypeCategory.objects.filter(domain=from_domain)97 for original_report_type_category in original_report_type_categories:98 try:99 report_type_category = ReportTypeCategory.objects.get(domain=to_domain, code=original_report_type_category.code)100 if report_type_category.name != original_report_type_category.name:101 report_type_category.name = original_report_type_category.name102 print "Will update report type category from %s to %s using this data:" % (103 from_domain.name, to_domain.name)104 print " [FROM] id:%s domain:%s code:%s" % (105 original_report_type_category.pk, original_report_type_category.domain,106 original_report_type_category.code)107 print " [TO] id:%s domain:%s code:%s" % (108 report_type_category.pk, report_type_category.domain.name, report_type_category.code)109 if not dry_run:110 report_type_category.save()111 print " - Saved id: %s" % report_type_category.pk112 except ReportTypeCategory.DoesNotExist:113 report_type_category = copy.deepcopy(original_report_type_category)114 report_type_category.pk = None115 report_type_category.domain = to_domain116 print "Will copy report type category from %s to %s using this data:" % (from_domain.name, to_domain.name)117 print " [FROM] id:%s domain:%s code:%s" % (118 original_report_type_category.pk, original_report_type_category.domain, original_report_type_category.code)119 print " [TO] id:%s domain:%s code:%s" % (120 report_type_category.pk, report_type_category.domain.name, report_type_category.code)121 if not dry_run:122 report_type_category.save()123 print " - Saved id: %s" % report_type_category.pk124 print "--"125 def _copy_report_types(self, from_domain, to_domain, from_authority=None, to_authority=None, report_type_ids=[],126 dry_run=True):127 if dry_run:128 print ">> DRY RUN <<\n"129 self._copy_report_type_categories(from_domain, to_domain, dry_run)130 original_report_types = ReportType.objects.filter(domain=from_domain)131 if from_authority:132 original_report_types = original_report_types.filter(authority=from_authority)133 if report_type_ids:134 original_report_types = original_report_types.filter(id__in=report_type_ids)135 for original_report_type in original_report_types:136 should_save_report_type = False137 try:138 report_type = ReportType.objects.get(domain=to_domain, code=original_report_type.code)139 if report_type.form_definition != original_report_type.form_definition:140 report_type.form_definition = original_report_type.form_definition141 should_save_report_type = True142 if report_type.name != original_report_type.name:143 report_type.name = original_report_type.name144 should_save_report_type = True145 if report_type.template != original_report_type.template:146 report_type.template = original_report_type.template147 should_save_report_type = True148 if report_type.django_template != original_report_type.django_template:149 report_type.django_template = original_report_type.django_template150 should_save_report_type = True151 if report_type.summary_template != original_report_type.summary_template:152 report_type.summary_template = original_report_type.summary_template153 should_save_report_type = True154 except ReportType.DoesNotExist:155 should_save_report_type = True156 report_type = copy.deepcopy(original_report_type)157 report_type.pk = None158 report_type.domain = to_domain159 report_type.default_state = None160 if not dry_run and original_report_type.category:161 report_type.category = ReportTypeCategory.objects.get(domain=to_domain, code=original_report_type.category.code)162 if to_authority:163 report_type.authority = to_authority164 else:165 report_type.authority = None166 if should_save_report_type:167 print "Will copy report type from %s to %s using this data:" % (from_domain.name, to_domain.name)168 print " [FROM] id:%s domain:%s authority:%s report_type:%s" % (original_report_type.pk, original_report_type.domain.name, original_report_type.authority, original_report_type.name)169 print " [TO] id:%s domain:%s authority:%s report_type:%s" % (report_type.pk, report_type.domain.name, report_type.authority, report_type.name)170 if not dry_run:171 report_type.save(set_default_state=True)172 print " - Saved id: %s" % report_type.pk173 print ""174 # copy report states175 self._copy_report_states(from_domain, to_domain, from_report_type=original_report_type,176 to_report_type=report_type, dry_run=dry_run)177 # copy case definitions178 self._copy_case_definitions(from_domain, to_domain, from_report_type=original_report_type,179 to_report_type=report_type, dry_run=dry_run)180 if should_save_report_type:181 default_state = ReportState.objects.get(report_type=report_type, code='report')182 if not report_type.default_state or default_state.id != report_type.default_state.id:183 print ">> Then will set default state to %s" % default_state.name184 if not dry_run:185 report_type.default_state = default_state186 report_type.save()187 print "---------------------"188 def handle(self, *args, **options):189 from_domain = Domain.objects.get(id=args[0])190 to_domain = Domain.objects.get(id=args[1])191 report_type_ids = list(args[2:])192 from_authority = options['from_authority']193 if from_authority:194 from_authority = Authority.objects.get(domain=from_domain, id=from_authority)195 to_authority = options['to_authority']196 if to_authority:197 to_authority = Authority.objects.get(domain=to_domain, id=to_authority)198 dry_run = not options['force']...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful