Best Python code snippet using SeleniumLibrary
crawler_template.py
Source:crawler_template.py
...47 self.log_clean()48 self.__SETTINGS__()49 self.main()50 def __SETTINGS__(self):51 self.log_start(extra=1)52 # BROWSER CONFIG53 ################54 # Configured for MacOS55 self.CHROME_PATH = "/Users/home/Library/Application Support/Google/Chrome/"56 self.CHROME_PROFILE = "Profile 5"57 # Don't change this. It's the only option for slenium_stealth58 self.USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.53 Safari/537.36"59 # PROJECT SPECIFIC60 def main(self):61 self.log_start(extra=1)62 self.web()63 ###################################64 # GENERAL HELPERS AND UTILS BEGIN #65 ###################################66 # SELENIUM CONFIG67 #################68 def web(self):69 self.log_start()70 self.web_options()71 self.web_driver()72 self.web_action()73 self.web_shh()74 self.web_clear()75 def web_options(self):76 self.log_start()77 self.options = wd.ChromeOptions()78 self.options.add_argument(f"--user-data-dir={self.CHROME_PATH}")79 self.options.add_argument(f"--profile-directory={self.CHROME_PROFILE}")80 self.options.add_argument("--user-agent=%s" % self.USER_AGENT)81 self.options.add_argument("start-maximized")82 self.options.add_experimental_option("excludeSwitches", ["enable-automation"])83 self.options.add_experimental_option("useAutomationExtension", 0)84 def web_driver(self):85 self.log_start()86 self.driver = sv(CHROME_DRIVER)87 self.driver = wd.Chrome(service=self.driver, options=self.options)88 self.driver.set_script_timeout(1000)89 self.driver.implicitly_wait(30)90 def web_action(self):91 self.log_start()92 self.action = ac(self.driver)93 def web_shh(self):94 self.log_start()95 shh(96 driver=self.driver,97 user_agent=self.USER_AGENT,98 languages=["en-US", "en"],99 vendor="Google Inc.",100 platform="Win32",101 webgl_vendor="Intel Inc.",102 renderer="Intel Iris OpenGL Engine",103 fix_hairline=1,104 run_on_insecure_origins=0,105 )106 def web_clear(self):107 self.log_start()108 # I think they place a blocked cookie in browser ...109 # sneaky little fuckers. Ruins the proxy rotation110 self.driver.delete_all_cookies()111 # SELENIUM HELPERS112 ##################113 def web_go(self, url, get=0, https=1):114 self.log_start()115 if https:116 self.driver.get(f"https://{url}")117 else:118 self.driver.get(f"{url}")119 if get:120 return self.bs_html()121 def web_scroll(self, count):122 self.log_start()123 for i in range(count):124 l0(f"{stack()[1][3]} status ({i}/{count})")125 html = self.bs_html()126 body = self.bs_element(html, key="body")127 l0("Down keys to be sent.")128 body.send_keys(Keys.PAGE_DOWN)129 self.zzz(mn=0.25 , mx=1)130 def web_type(self, element, txt):131 self.log_start()132 element.click()133 for char in txt:134 self.zzz(mn=0.1, mx=0.2)135 pf = [1] * 59136 pf.append(0)137 pf = choice(pf)138 if pf == 1:139 element.send_keys(char)140 else:141 element.send_keys(choice(abc))142 element.send_keys(Keys.BACK_SPACE)143 element.send_keys(char)144 # BS4 HELPERS145 #############146 def bs_children(self, element):147 self.log_start()148 elements = element.children149 for e in elements:150 if len(str(e)) > 1:151 yield bs(str(e), "html.parser")152 def bs_click(self, html, key, val=0, n=0):153 self.log_start()154 if val:155 element = html.find_all(attrs={key: val})[n]156 else:157 element = html.find_all(key)[n]158 xpath = self.bs_xpath(element)159 wdw(self.driver, 30).until(160 ec.element_to_be_clickable((By.XPATH, xpath))161 ).click()162 self.zzz()163 def bs_element(self, html, key, val=0, n=0):164 self.log_start()165 if val:166 element = html.find_all(attrs={key: val})[n]167 else:168 element = html.find_all(key)[n]169 xpath = self.bs_xpath(element)170 l0(f"grabbed xpath: {xpath}")171 element = self.driver.find_element(By.XPATH, xpath)172 l0(f"found element: {element}")173 return element174 def bs_html(self, pretty=0):175 self.log_start()176 src = self.driver.page_source177 l0("grabbed page src")178 html = bs(src, "html.parser")179 l0("parsed page src")180 if pretty:181 l0("making html pretty")182 return html.prettify()183 l0("html complete")184 return html185 def bs_read(self, page):186 self.log_start()187 f = open(f"{DIR_HTML}/{page}.html", "r")188 html = f.read()189 soup = bs(html, "html.parser")190 self.bs_save(page=page, html=soup)191 return soup192 def bs_save(self, page, html=0):193 self.log_start()194 if html:195 html = html.prettify()196 else:197 html = self.bs_html(pretty=1)198 with open(f"{DIR_HTML}{page}.html", "w") as fout:199 fout.write(html)200 def bs_title(self):201 html = self.bs_html(pretty=1)202 soup = bs(html, "html.parser")203 title = soup.title.string204 title = self.str_clean(str(title))205 return title206 def bs_xpath(self, element):207 self.log_start()208 """209 Generate xpath of soup element210 :param element: bs4 text or node211 :return: xpath as string212 """213 components = []214 child = element if element.name else element.parent215 for parent in child.parents:216 """217 @type parent: bs4.element.Tag218 """219 previous = islice(parent.children, 0, parent.contents.index(child))220 xpath_tag = child.name221 xpath_index = sum(1 for i in previous if i.name == xpath_tag) + 1222 components.append(223 xpath_tag if xpath_index == 1 else "%s[%d]" % (xpath_tag, xpath_index)224 )225 child = parent226 components.reverse()227 return "/%s" % "/".join(components)228 # LOGGING HELPERS229 #################230 def log_config(self):231 t = str(dt.now()).split(".")[0].replace(" ", "@")232 lbc(233 filename=f"{DIR_LOGS}{t}.log",234 encoding="utf-8",235 format="%(asctime)s - %(levelname)s - %(message)s",236 datefmt="%Y-%m-%d@%H:%M:%S",237 level=0,238 )239 lsh().setLevel(0)240 def log_clean(self, store=3):241 self.log_start()242 logs = listdir(DIR_LOGS)243 names = []244 for i in range(len(logs)):245 names.append(str(logs[i]).split(".")[0])246 names.sort(key=lambda date: dt.strptime(date, "%Y-%m-%d@%H:%M:%S"))247 keepers = names[-store:]248 for f in logs:249 name = str(f).split(".")[0]250 if name not in keepers:251 remove(f"{DIR_LOGS}{f}")252 def log_start(self, nap=0, extra=0, n=3):253 if extra:254 look = ">" * 45255 else:256 look = ">" * 15257 name = str(stack()[1][n])258 l0(f"{look} starting {name} ")259 if nap:260 self.zzz()261 return name262 def log_except(self, e):263 look = "!" * 15264 return l3(f"{look} error at {stack()[1][3]}: {e}")265 # JSON HELPERS266 ##############267 def json_dump(self, content, dest):268 self.log_start()269 pth = f"{DIR_ROOT}/{uu()}{uu()}.json"270 with open(pth, "w") as f:271 dump(content, f, indent=4)272 try:273 with open(pth) as f:274 q = load(f)275 replace(pth, dest)276 except:277 remove(pth)278 def json_load(self, pth):279 self.log_start()280 try:281 with open(pth, "r") as json_file:282 data = load(json_file)283 except FileNotFoundError as e:284 self.log_except(e)285 data = {}286 self.json_dump(data, pth)287 return data288 # OTHER289 #######290 # string cleaning hahaha .. get it? like sPring cleaning hahaha!291 def str_clean(self, s, json=0):292 self.log_start()293 s = sub(r"â¦", "...", s)294 s = sub(r"[`ââââ¸â¸â¸â¸â¸â¸]", "'", s)295 s = sub(r"[ââ]|(\'\')|(,,)", '"', s)296 s = sub(r"\s+", " ", s).strip()297 if json:298 l0("cleaning json")299 s = s.replace("'", '"')300 return s301 def zzz(self, mn=1, mx=5):302 t = round(uniform(mn, mx), 7)303 if t > 5:304 l0(f"sleeping for {t}")305 sleep(t)306if __name__ == "__main__":...
test_double_license.py
Source:test_double_license.py
...52 return dobData53 u'''æ·»å å人ææ'''54 def add_double_license_001(self):55 #æ¥å¿å¼å§è®°å½56 self.log.log_start("add_double_license")57 #è·åå人ææçæ°æ®58 dobData = self.get_double_data("add_double_license")59 for dataRow in range(len(dobData)):60 data = dobData[dataRow]61 try:62 #å¦æä¸æ¯ç¬¬1è¡,读åæ°æ®63 if dataRow != 0:64 self.double.click_double_license_button(data[1])65 self.authElem.click_all_approver()66 self.authElem.click_all_candidate()67 self.authElem.click_start_association()68 self.authElem.click_create_relate()69 self.cmf.click_login_msg_button()70 #ç¹å»è¿å71 self.authElem.click_child_page_back_button()72 self.log.log_detail(u"æ·»å å人æææå", True)73 except Exception as e:74 print ("add_double_license fail: ") + str(e)75 self.log.log_end("add_double_license")76 u'''å人审æ¹åç»ç«¯å®¡æ¹'''77 def same_termina_approvel_002(self):78 self.cmf.select_role_by_text(u"è¿ç»´æä½å")79 #æ¥å¿å¼å§è®°å½80 self.log.log_start("same_termina_approvel")81 #è·åå人审æ¹ç³è¯·çæ°æ®82 dobData = self.get_double_data("double_license_sso")83 for dataRow in range(len(dobData)):84 data = dobData[dataRow]85 try:86 #å¦ææ¯ç¬¬1è¡,读åæ°æ®87 if dataRow == 1:88 self.double.send_double_license_applicant(data)89 self.double.check_ico_len(data[1])90 self.loginElem.quit()91 except Exception as e:92 print ("same_termina_approvel fail: ") + str(e)93 self.log.log_end("same_termina_approvel")94 u'''å人审æ¹ç³è¯·äººå·²ä¸çº¿å®¡æ¹è¿æ'''95 def termina_expired_approvel_003(self):96 self.comsuit.use_new_user_login()97 #æ¥å¿å¼å§è®°å½98 self.log.log_start("Expired_approvel")99 #è·åå人审æ¹ç³è¯·çæ°æ®100 dobData = self.get_double_data("double_license_sso")101 expData = self.get_double_data("termina_approvel")102 for dataRow in range(len(dobData)):103 data = dobData[dataRow]104 try:105 #å¦æä¸æ¯ç¬¬1è¡,读åæ°æ®106 if dataRow == 2:107 self.double.send_double_license_applicant(data)108 number = self.acproval.get_new_process_number()109 self.loginElem.quit()110 self.double.approver_by_process_approval(expData, number)111 except Exception as e:112 print ("Expired_approvel fail: ") + str(e)113 self.log.log_end("Expired_approvel")114 u'''å人审æ¹å®¡æ¹äººæç»ç³è¯·'''115 def termina_deny_approvel_004(self):116 self.comsuit.use_new_user_login()117 #æ¥å¿å¼å§è®°å½118 self.log.log_start("deny_double_approvel")119 #è·åå人审æ¹ç³è¯·çæ°æ®120 dobData = self.get_double_data("double_license_sso")121 expData = self.get_double_data("deny_approvel")122 for dataRow in range(len(dobData)):123 data = dobData[dataRow]124 try:125 #å¦æä¸æ¯ç¬¬1è¡,读åæ°æ®126 if dataRow == 2:127 self.double.send_double_license_applicant(data)128 number = self.acproval.get_new_process_number()129 self.double.approver_remote_approval(expData, number)130 self.cmf.select_menu(u"è¿ç»´æä½", u"SSO")131 self.double.check_ico_len(data[1])132 self.loginElem.quit()133 except Exception as e:134 print ("deny_double_approvel fail: ") + str(e)135 self.log.log_end("deny_double_approvel")136 u'''å人审æ¹å®¡æ¹äººåæç³è¯·'''137 def termina_agree_approvel_005(self):138 self.comsuit.use_new_user_login()139 #æ¥å¿å¼å§è®°å½140 self.log.log_start("agree_double_approvel")141 #è·åå人审æ¹ç³è¯·çæ°æ®142 dobData = self.get_double_data("double_license_sso")143 expData = self.get_double_data("agree_approvel")144 for dataRow in range(len(dobData)):145 data = dobData[dataRow]146 try:147 #å¦æä¸æ¯ç¬¬1è¡,读åæ°æ®148 if dataRow == 2:149 self.double.send_double_license_applicant(data)150 number = self.acproval.get_new_process_number()151 self.double.approver_remote_approval(expData, number)152 self.cmf.select_menu(u"è¿ç»´æä½", u"SSO")153 self.double.check_ico_len(data[1])154 self.loginElem.quit()155 except Exception as e:156 print ("agree_double_approvel fail: ") + str(e)157 self.log.log_end("agree_double_approvel")158 u'''访é®å®¡æ¹æµç¨ä»»å¡æ¥è¯¢'''159 def double_query_process_task_006(self):160 #æ¥å¿å¼å§è®°å½161 self.log.log_start("double_query_process_task")162 #è·åæµç¨ä»»å¡æ¥è¯¢çæ°æ®163 taskData = self.get_double_data("process_task")164 for dataRow in range(len(taskData)):165 data = taskData[dataRow]166 try:167 #å¦æä¸æ¯ç¬¬1è¡,读åæ°æ®168 if dataRow != 0:169 if dataRow == 1:170 self.acproval.user_login(data[1])171 self.flow.query_process_task(data)172 self.log.log_detail(data[0], True)173 except Exception as e:174 print ("double_query_process_task fail: ") + str(e)175 self.log.log_end("double_query_process_task")176 u'''访é®å®¡æ¹ä¸ªäººåå²æ¥è¯¢'''177 def double_query_personal_history_007(self):178 #æ¥å¿å¼å§è®°å½179 self.log.log_start("double_query_personal_history")180 #è·å个人åå²æ¥è¯¢çæ°æ®181 perData = self.get_double_data("personal_history")182 for dataRow in range(len(perData)):183 data = perData[dataRow]184 try:185 #å¦æä¸æ¯ç¬¬1è¡,读åæ°æ®186 if dataRow != 0:187 self.flow.query_personal_history(data)188 self.log.log_detail(data[0], True)189 except Exception as e:190 print ("double_query_personal_history fail: ") + str(e)191 self.loginElem.quit()192 self.log.log_end("double_query_personal_history")193 u'''访é®å®¡æ¹ç³è¯·åå²æ¥è¯¢'''194 def double_query_apply_history_008(self):195 self.comsuit.use_new_user_login()196 #æ¥å¿å¼å§è®°å½197 self.log.log_start("double_query_apply_history")198 #è·åç³è¯·åå²æ¥è¯¢çæ°æ®199 applyData = self.get_double_data("apply_history")200 for dataRow in range(len(applyData)):201 data = applyData[dataRow]202 try:203 #å¦æä¸æ¯ç¬¬1è¡,读åæ°æ®204 if dataRow != 0:205 self.flow.query_apply_history(data)206 self.log.log_detail(data[0], True)207 except Exception as e:208 print ("double_query_apply_history fail: ") + str(e)209 self.log.log_end("double_query_apply_history")210 u'''访é®å®¡æ¹å
¨é¨åå²æ¥è¯¢'''211 def double_query_all_history_009(self):212 self.comsuit.dep_switch_to_sys()213 #æ¥å¿å¼å§è®°å½214 self.log.log_start("double_query_all_history")215 #è·åå
¨é¨åå²æ¥è¯¢çæ°æ®216 allData = self.get_double_data("all_history")217 for dataRow in range(len(allData)):218 data = allData[dataRow]219 try:220 #å¦æä¸æ¯ç¬¬1è¡,读åæ°æ®221 if dataRow != 0:222 self.flow.query_all_history(data)223 self.log.log_detail(data[0], True)224 except Exception as e:225 print ("double_query_all_history fail: ") + str(e)226 self.log.log_end("double_query_all_history")227 u'''访é®å®¡æ¹é¨é¨åå²æ¥è¯¢'''228 def double_query_department_history_010(self):229 self.comsuit.sys_switch_to_dep()230 #æ¥å¿å¼å§è®°å½231 self.log.log_start("double_query_department_history")232 #è·åæµç¨ä»»å¡æ¥è¯¢çæ°æ®233 deprtData = self.get_double_data("department_history")234 for dataRow in range(len(deprtData)):235 data = deprtData[dataRow]236 try:237 #å¦æä¸æ¯ç¬¬1è¡,读åæ°æ®238 if dataRow != 0:239 self.flow.query_department_history(data)240 self.log.log_detail(data[0], True)241 except Exception as e:242 print ("double_query_department_history fail: ") + str(e)...
simplemovingaverage.py
Source:simplemovingaverage.py
1import datetime2import sys3import pandas as pd4from live_trading.logging_ import logging_5from .signals import Signals6STOP_LOSS = .27MINS_SELL_BEFORE_EOB = 158class SimpleMovingAverage(Signals):9 def __init__(self, strategy, long_periods, short_periods):10 super(SimpleMovingAverage, self).__init__()11 self.strategy = strategy12 self.long_periods = long_periods13 self.short_periods = short_periods14 # @classmethod15 def __buy_conditions(self, ticker):16 log_start = datetime.datetime.now()17 log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))18 self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())19 # try:20 # short_ma_greater_than_long = self.__moving_averages(ticker)21 # except:22 # short_ma_greater_than_long = True23 return_minimal = True24 status = True25 while status:26 # if not short_ma_greater_than_long:27 # return False28 if not return_minimal:29 status = False30 break31 if not status:32 self.strategy.add_to_manager(self.strategy.strat_name, 'n False buy_conditions', 'increment_1')33 log.log(return_minimal)34 return status35 def __sell_conditions(self, ticker):36 log_start = datetime.datetime.now()37 log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))38 self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())39 price_higher_than_bought = self.__price_higher_than_bought(ticker)40 sell_at_eob = self.__sell_at_eob_f()41 try:42 short_ma_greater_than_long = self.__moving_averages(ticker)43 except Exception as e:44 short_ma_greater_than_long = True45 print('this may be problematic')46 status = True47 while status:48 if not price_higher_than_bought:49 status = False50 if not sell_at_eob:51 status = False52 if short_ma_greater_than_long:53 status = False54 break55 if not status:56 self.strategy.add_to_manager(self.strategy.strat_name, 'n False sell_conditions', 'increment_1')57 log.log(price_higher_than_bought58 ,short_ma_greater_than_long59 ,sell_at_eob)60 return status61 # @print_meth_name62 def price_size_select(self, ticker, ix, side):63 log_start = datetime.datetime.now()64 log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))65 self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())66 if side == 1:67 items = ticker.domBids68 else:69 items = ticker.domAsks70 _len = self.strategy.lt._ticker_len_n_type_check(items)71 if _len > 1:72 # if self.market_open.debugging:73 if self.strategy.debugging.dummy_data:74 # price, size = float(random.randint(10, 100)), random.randint(100, 10000)75 price, size = items[ix].price, items[ix].size76 else:77 price, size = items[ix].price, items[ix].size78 else:79 # if self.market_open.debugging:80 if self.strategy.debugging.dummy_data:81 # price, size = float(random.randint(10, 100)), random.randint(100, 10000)82 price, size = items.price, items.size83 else:84 price, size = items.price, items.size85 log.log(price86 ,size)87 return price, size88 def sell_at_eob_f(self):89 log_start = datetime.datetime.now()90 log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))91 self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())92 if self.strategy.debugging.dummy_time:93 _now = self.strategy.lt._manipulated_time(datetime.datetime.now())94 else:95 _now = datetime.datetime.now()96 if self.strategy.eob_min >= MINS_SELL_BEFORE_EOB:97 eob_dt = datetime.datetime.strptime("{}:{}:00".format(self.strategy.eob_hour,98 str(30 - MINS_SELL_BEFORE_EOB)), '%H:%M:%S')99 else:100 hour = self.strategy.eob_hour - 1101 minute = 60 + self.strategy.eob_min - MINS_SELL_BEFORE_EOB102 eob_dt = datetime.datetime.strptime("{}:{}:00".format(str(hour), str(minute)),'%H:%M:%S')103 sell_at_eob = self.strategy.lt.us_tz_op(_now).time() > eob_dt.time()104 log.log(sell_at_eob)105 return sell_at_eob106 def __price_higher_than_bought(self, ticker):107 log_start = datetime.datetime.now()108 log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))109 self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())110 symbol = ticker.contract.symbol111 held_ix = self.strategy.lt.portfolio_instr.index(symbol)112 held_price_ix = 2113 held_price = self.strategy.lt.portfolio[held_ix][held_price_ix]114 rank = 0115 if self.strategy.lt._ticker_len_n_type_check(ticker.domBids) > 1:116 curr_price = ticker.domAsks[rank].price117 else:118 curr_price = ticker.domAsks.price119 log.log(self.strategy.lt.portfolio120 ,curr_price)121 if held_price < curr_price:122 return True123 else:124 return False125 def stop_loss_f(self, ticker):126 log_start = datetime.datetime.now()127 log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))128 self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())129 if self.strategy.lt._ticker_len_n_type_check(ticker.domAsks) > 1:130 current_ask_price = ticker.domAsks[0].price131 else:132 current_ask_price = ticker.domAsks.price133 change = (current_ask_price - self.strategy.lt.portfolio[ticker.contract.symbol]) / current_ask_price134 log.log(current_ask_price135 ,change)136 if change < -STOP_LOSS:137 return True138 else:139 return False140 # @classmethod141 def __moving_averages(self, ticker):142 log_start = datetime.datetime.now()143 log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))144 self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())145 # ma procedure146 long_ma, short_ma = self.__ma_calc('collected', ticker)147 if long_ma == np.nan and short_ma == np.nan:148 raise Exception('not defined')149 elif long_ma > short_ma:150 short_ma_greater_than_long = False151 else:152 short_ma_greater_than_long = True153 log.log(long_ma154 ,short_ma)155 return short_ma_greater_than_long156 def __ma_calc(self, by, ticker):157 log_start = datetime.datetime.now()158 log = logging_.Logging(self.strategy.runtime_tm, log_start, str(self.__class__.__name__ + ',' + sys._getframe().f_code.co_name))159 self.strategy.add_to_manager(self.strategy.strat_name, *log.monitor())160 long_ma, short_ma = None, None161 if by == 'collected':162 rank = 0163 # side164 # bid = 1165 # ask = 0166 side = 1167 self.filtered_df = self.strategy.__hist_data_winners_df[(self.strategy.__hist_data_winners_df['symbol'] == ticker.contract.symbol) &168 (self.strategy.__hist_data_winners_df['side'] == side) &169 (self.strategy.__hist_data_winners_df['rank'] == rank)]170 _length = self.filtered_df.shape[0]171 if _length < self.long_periods:172 long_ma, short_ma = np.nan, np.nan173 else:174 long_ma = self.filtered_df.sort_values('unix_ts', ascending=False)[0:self.long_periods]['price'].mean()175 short_ma = self.filtered_df.sort_values('unix_ts', ascending=False)[0:self.short_periods]['price'].mean()176 # print(long_ma, short_ma)177 _unix_ts = self.filtered_df['unix_ts'].sort_values(ascending=False).iloc[0]178 if self.strategy.ma_counter == 0:179 self._ma_collection_df = pd.DataFrame([[ticker.contract.symbol, _unix_ts, short_ma, long_ma]])180 else:181 add_df = pd.DataFrame([[ticker.contract.symbol,182 _unix_ts,183 short_ma,184 long_ma]])185 self._ma_collection_df = pd.concat([self._ma_collection_df, add_df])186 log.log(long_ma187 ,short_ma)...
rootfinder.py
Source:rootfinder.py
1import jax.numpy as jnp2from jax import jit, grad, vmap3from jax import random4from jax import lax5from jax.ops import index, index_update6from jax.flatten_util import ravel_pytree7from functools import partial8@partial(jit, static_argnums=(0,))9def dual_bisect_method(10 func,11 aL=-1e5, bL=-1e-5, aR=1e-5, bR=1e5,12 tol=1e-6, maxiter=100):13 batch_func = vmap(func, (0))14 i = maxiter-1.015 saL, sbL, saR, sbR = jnp.sign(batch_func(jnp.array([aL, bL, aR, bR])))16 init_val = [aL, bL, aR, bR, saL, sbL, saR, sbR, i]17 def cond_fun(val):18 aL, bL, aR, bR, saL, sbL, saR, sbR, i = val19 return jnp.sum(bL-aL) + jnp.sum(bR-aR) + 100 * jnp.minimum(i, 0.0) > tol20 def body_fun(val):21 # unpack val22 aL, bL, aR, bR, saL, sbL, saR, sbR, i = val23 # new center points24 cL = (aL+bL)/2.025 cR = (aR+bR)/2.026 scL, scR = jnp.sign(batch_func(jnp.array([cL, cR])))27 # L28 aL = jnp.sum(cL * jnp.maximum( scL * saL, 0.0) + \29 aL * jnp.maximum( -1.0 * scL * saL, 0.0))30 bL = jnp.sum(cL * jnp.maximum( scL * sbL, 0.0) + \31 bL * jnp.maximum( -1.0 * scL * sbL, 0.0))32 saL = jnp.sum(scL * jnp.maximum( scL * saL, 0.0) + \33 saL * jnp.maximum( -1.0 * scL * saL, 0.0))34 sbL = jnp.sum(scL * jnp.maximum( scL * sbL, 0.0) + \35 sbL * jnp.maximum( -1.0 * scL * sbL, 0.0))36 # R37 aR = jnp.sum(cR * jnp.maximum( scR * saR, 0.0) + \38 aR * jnp.maximum( -1.0 * scR * saR, 0.0))39 bR = jnp.sum(cR * jnp.maximum( scR * sbR, 0.0) + \40 bR * jnp.maximum( -1.0 * scR * sbR, 0.0))41 saR = jnp.sum(scR * jnp.maximum( scR * saR, 0.0) + \42 saR * jnp.maximum( -1.0 * scR * saR, 0.0))43 sbR = jnp.sum(scR * jnp.maximum( scR * sbR, 0.0) + \44 sbR * jnp.maximum( -1.0 * scR * sbR, 0.0))45 i = i - 146 val = [aL, bL, aR, bR, saL, sbL, saR, sbR, i]47 return val48 val = lax.while_loop(cond_fun, body_fun, init_val)49 # unpack val50 aL, bL, aR, bR, saL, sbL, saR, sbR, i = val51 # new center points52 cL = (aL+bL)/2.053 cR = (aR+bR)/2.054 return [cL, cR]55@partial(jit, static_argnums=(0,))56def choose_start(57 func,58 log_start = -3.0, log_space = 1.0 / 5.0):59 batch_func = vmap(func, (0))60 i = 061 aL = -1.0 * jnp.power(10.0, log_start + i * log_space)62 bR = jnp.power(10.0, log_start + i * log_space)63 aL_val, bR_val = batch_func(jnp.array([aL, bR]))64 init_val = [aL, bR, aL_val, bR_val, i]65 def cond_fun(val):66 aL, bR, aL_val, bR_val, i = val67 return jnp.maximum(aL_val, 0.0) + jnp.maximum(bR_val, 0.0) > 0.068 def body_fun(val):69 aL, bR, aL_val, bR_val, i = val70 i = i+171 sign_aL = jnp.sign(aL_val)72 sign_bR = jnp.sign(bR_val)73 aL = jnp.sum(-1.0 * jnp.power(10.0, log_start + i * log_space) * jnp.maximum(sign_aL, 0.0) \74 + aL * jnp.maximum(-1.0 * sign_aL, 0.0))75 bR = jnp.sum(jnp.power(10.0, log_start + i * log_space) * jnp.maximum(sign_bR, 0.0) \76 + bR * jnp.maximum(-1.0 * sign_bR, 0.0))77 aL_val, bR_val = batch_func(jnp.array([aL, bR]))78 val = [aL, bR, aL_val, bR_val, i]79 return val80 val = lax.while_loop(cond_fun, body_fun, init_val)81 aL, bR, aL_val, bR_val, i = val82 return [aL, bR]83@partial(jit, static_argnums=(0,))84def bisect_method(85 func, a=-1e5, b=-1e-5,86 tol=1e-6, maxiter=100):87 batch_func = vmap(func, (0))88 i = maxiter-1.089 sa, sb = jnp.sign(batch_func(jnp.array([a, b])))90 init_val = [a, b, sa, sb, i]91 def cond_fun(val):92 a, b, sa, sb, i = val93 return jnp.sum(b-a) + 100 * jnp.minimum(i, 0.0) > tol94 def body_fun(val):95 # unpack val96 a, b, sa, sb, i = val97 # new center points98 c = (a+b)/2.099 sc = jnp.sign(func(c))100 # L101 a = jnp.sum(c * jnp.maximum( sc * sa, 0.0) + \102 a * jnp.maximum( -1.0 * sc * sa, 0.0))103 b = jnp.sum(c * jnp.maximum( sc * sb, 0.0) + \104 b * jnp.maximum( -1.0 * sc * sb, 0.0))105 sa = jnp.sum(sc * jnp.maximum( sc * sa, 0.0) + \106 sa * jnp.maximum( -1.0 * sc * sa, 0.0))107 sb = jnp.sum(sc * jnp.maximum( sc * sb, 0.0) + \108 sb * jnp.maximum( -1.0 * sc * sb, 0.0))109 i = i - 1110 val = [a, b, sa, sb, i]111 return val112 val = lax.while_loop(cond_fun, body_fun, init_val)113 # unpack val114 a, b, sa, sb, i = val115 # new center points116 c = (a+b)/2.0117 return c118@partial(jit, static_argnums=(0,))119def single_choose_start(120 func,121 log_start = -3.0, log_space = 1.0 / 5.0):122 i = 0123 a = 1.0 * jnp.power(10.0, log_start + i * log_space)124 a_val = func(a)125 init_val = [a, a_val, i]126 def cond_fun(val):127 a, a_val, i = val128 return jnp.maximum(a_val, 0.0) + 100 * jnp.minimum(100. - i, 0.0) > 0.0129 def body_fun(val):130 a, a_val, i = val131 i = i+1132 sign_a = jnp.sign(a_val)133 a = jnp.sum(jnp.power(10.0, log_start + i * log_space) * jnp.maximum(sign_a, 0.0) \134 + a * jnp.maximum(-1.0 * sign_a, 0.0))135 a_val = func(a)136 val = [a, a_val, i]137 return val138 val = lax.while_loop(cond_fun, body_fun, init_val)139 a, a_val, i = val...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!