Best Python code snippet using SeleniumBase
kite.py
Source:kite.py
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3import base644import datetime5import json6import re7import struct8import urllib9import requests10from selenium.webdriver.remote.command import Command11from kitefront.browser import Browser12from kitefront.data.historical_data import HistoricalData13from kitefront.orders.orders import Orders14from kitefront.portfolio.portfolio import Portfolio15class Kite(Orders, Portfolio, HistoricalData):16 _b = None17 EXCHANGE_MAP = {18 "nse": 1,19 "nfo": 2,20 "cds": 3,21 "bse": 4,22 "bfo": 5,23 "bsecds": 6,24 "mcx": 7,25 "mcxsx": 8,26 "indices": 927 }28 # Default connection timeout29 CONNECT_TIMEOUT = 3030 # Default Reconnect max delay.31 RECONNECT_MAX_DELAY = 6032 # Default reconnect attempts33 RECONNECT_MAX_TRIES = 5034 # Default root API endpoint. It's possible to35 # override this by passing the `root` parameter during initialisation.36 ROOT_URI = "wss://ws.kite.trade"37 # Available streaming modes.38 MODE_FULL = "full"39 MODE_QUOTE = "quote"40 MODE_LTP = "ltp"41 def __init__(self, username, password, pin, headless=False):42 self._username = username43 self._password = password44 self._pin = pin45 self._b = Browser(headless=headless)46 self._b.new("kite")47 self._b.on("kite").open("https://kite.zerodha.com")48 self._b.ensure('//*[@id="container"]/div/div/div/form/div[1]/h2', 'Login to Kite')49 self._b.type_in('//*[@id="container"]/div/div/div/form/div[2]/input', self._username)50 self._b.type_in('//*[@id="container"]/div/div/div/form/div[3]/input', self._password)51 self._b.click_on('//*[@id="container"]/div/div/div/form/div[4]/button').wait()52 self._b.ensure('//*[@id="container"]/div/div/div/form/div[2]/div/label', 'PIN')53 self._b.type_in('//*[@id="container"]/div/div/div/form/div[2]/div/input', self._pin)54 self._b.click_on('//*[@id="container"]/div/div/div/form/div[3]/button').wait()55 self._b.open('https://kite.zerodha.com/dashboard')56 if self._b.current_url != 'https://kite.zerodha.com/dashboard':57 raise Exception("Could not login to Kite")58 self._b.read_local_storage()59 self._b.read_cookies()60 def watch(self, exchange, instrument, instrument_token):61 self._b.watch(62 ''.join([str(i) for i in [exchange, instrument, instrument_token]]),63 'https://kite.zerodha.com/chart/web/ciq/' + '/'.join(64 [str(i) for i in [exchange, instrument, instrument_token]])65 )66 def context(self):67 return {68 'user_id': self._b.ls['__storejs_kite_user_id'],69 'public_token': self._b.ls['__storejs_kite_public_token'],70 'api_key': 'kitefront',71 'access_token': ''72 }73 def context_str(self):74 return urllib.urlencode(self.context())75 def ciqrandom(self):76 tabex_master = self._b.execute(Command.GET_LOCAL_STORAGE_ITEM, {'key': 'tabex_default_master'})['value']77 ciqrandom = \78 self._b.execute(Command.GET_LOCAL_STORAGE_ITEM, {'key': 'tabex_default_router_' + str(tabex_master)})[79 'value']80 return str(ciqrandom)81 def stream(self):82 packets = []83 logs = self._b.get_log('performance')84 for entry in logs:85 log = json.loads(entry['message'])86 method = log['message']['method']87 if re.match('Network.webSocketFrameReceived', method):88 data = base64.decodestring(log['message']['params']['response']['payloadData'])89 for packet in self._parse_binary(data):90 packets.append(packet)91 return packets92 def _split_packets(self, bin):93 """Split the data to individual packets of ticks."""94 # Ignore heartbeat data.95 if len(bin) < 2:96 return []97 number_of_packets = self._unpack_int(bin, 0, 2, byte_format="H")98 packets = []99 j = 2100 for i in range(number_of_packets):101 packet_length = self._unpack_int(bin, j, j + 2, byte_format="H")102 packets.append(bin[j + 2: j + 2 + packet_length])103 j = j + 2 + packet_length104 return packets105 def _unpack_int(self, bin, start, end, byte_format="I"):106 """Unpack binary data as unsgined interger."""107 return struct.unpack(">" + byte_format, bin[start:end])[0]108 def _parse_binary(self, bin):109 """Parse binary data to a (list of) ticks structure."""110 packets = self._split_packets(bin) # split data to individual ticks packet111 data = []112 for packet in packets:113 instrument_token = self._unpack_int(packet, 0, 4)114 segment = instrument_token & 0xff # Retrive segment constant from instrument_token115 divisor = 10000000.0 if segment == self.EXCHANGE_MAP["cds"] else 100.0116 # All indices are not tradable117 tradable = False if segment == self.EXCHANGE_MAP["indices"] else True118 # LTP packets119 if len(packet) == 8:120 data.append({121 "tradable": tradable,122 "mode": self.MODE_LTP,123 "instrument_token": instrument_token,124 "last_price": self._unpack_int(packet, 4, 8) / divisor125 })126 # Indices quote and full mode127 elif len(packet) == 28 or len(packet) == 32:128 mode = self.MODE_QUOTE if len(packet) == 28 else self.MODE_FULL129 d = {130 "tradable": tradable,131 "mode": mode,132 "instrument_token": instrument_token,133 "last_price": self._unpack_int(packet, 4, 8) / divisor,134 "ohlc": {135 "high": self._unpack_int(packet, 8, 12) / divisor,136 "low": self._unpack_int(packet, 12, 16) / divisor,137 "open": self._unpack_int(packet, 16, 20) / divisor,138 "close": self._unpack_int(packet, 20, 24) / divisor139 }140 }141 # Compute the change price using close price and last price142 d["change"] = 0143 if (d["ohlc"]["close"] != 0):144 d["change"] = (d["last_price"] - d["ohlc"]["close"]) * 100 / d["ohlc"]["close"]145 # Full mode with timestamp146 if len(packet) == 32:147 try:148 timestamp = datetime.fromtimestamp(self._unpack_int(packet, 28, 32))149 except Exception:150 timestamp = None151 d["timestamp"] = timestamp152 data.append(d)153 # Quote and full mode154 elif len(packet) == 44 or len(packet) == 184:155 mode = self.MODE_QUOTE if len(packet) == 44 else self.MODE_FULL156 d = {157 "tradable": tradable,158 "mode": mode,159 "instrument_token": instrument_token,160 "last_price": self._unpack_int(packet, 4, 8) / divisor,161 "last_quantity": self._unpack_int(packet, 8, 12),162 "average_price": self._unpack_int(packet, 12, 16) / divisor,163 "volume": self._unpack_int(packet, 16, 20),164 "buy_quantity": self._unpack_int(packet, 20, 24),165 "sell_quantity": self._unpack_int(packet, 24, 28),166 "ohlc": {167 "open": self._unpack_int(packet, 28, 32) / divisor,168 "high": self._unpack_int(packet, 32, 36) / divisor,169 "low": self._unpack_int(packet, 36, 40) / divisor,170 "close": self._unpack_int(packet, 40, 44) / divisor171 }172 }173 # Compute the change price using close price and last price174 d["change"] = 0175 if (d["ohlc"]["close"] != 0):176 d["change"] = (d["last_price"] - d["ohlc"]["close"]) * 100 / d["ohlc"]["close"]177 # Parse full mode178 if len(packet) == 184:179 try:180 last_trade_time = datetime.fromtimestamp(self._unpack_int(packet, 44, 48))181 except Exception:182 last_trade_time = None183 try:184 timestamp = datetime.fromtimestamp(self._unpack_int(packet, 60, 64))185 except Exception:186 timestamp = None187 d["last_trade_time"] = last_trade_time188 d["oi"] = self._unpack_int(packet, 48, 52)189 d["oi_day_high"] = self._unpack_int(packet, 52, 56)190 d["oi_day_low"] = self._unpack_int(packet, 56, 60)191 d["timestamp"] = timestamp192 # Market depth entries.193 depth = {194 "buy": [],195 "sell": []196 }197 # Compile the market depth lists.198 for i, p in enumerate(range(64, len(packet), 12)):199 depth["sell" if i >= 5 else "buy"].append({200 "quantity": self._unpack_int(packet, p, p + 4),201 "price": self._unpack_int(packet, p + 4, p + 8) / divisor,202 "orders": self._unpack_int(packet, p + 8, p + 10, byte_format="H")203 })204 d["depth"] = depth205 data.append(d)206 return data207 def request(self, method, endpoint, **kwargs):208 cookies = '; '.join([cookie['name'] + '=' + cookie['value'] for cookie in self._b.cookies])209 headers = {210 'authorization': 'enctoken ' + str(self._b.ls['__storejs_kite_enctoken']).strip('"'),211 'accept-encoding': 'gzip, deflate',212 'accept-language': 'en-US,en;q=0.9,th;q=0.8',213 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.157 Safari/537.36',214 'x-kite-version': '2.1.0',215 'x-kite-userid': str(self._b.ls['__storejs_kite_user_id']).strip('"'),216 'accept': 'application/json, text/plain, */*',217 'referer': 'https://kite.zerodha.com/positions',218 'authority': 'kite.zerodha.com',219 'cookie': cookies220 }221 if method == 'POST':222 headers['content-type'] = 'application/x-www-form-urlencoded'223 response = requests.request(224 method,225 'https://kite.zerodha.com/oms' + endpoint,226 headers=headers,227 **kwargs228 )...
class_selenium.py
Source:class_selenium.py
1# -*- coding: utf-8 -*-2from selenium.webdriver.remote.command import Command34__author__ = 'senchuk'567class SeleniumMethods():89 @staticmethod10 def get_local_storage_keys(driver):11 """12 Gets the local storage keys list.1314 :Usage:15 driver.local_storage_keys16 """17 return driver.execute(Command.GET_LOCAL_STORAGE_KEYS)['value']1819 @staticmethod20 def get_local_storage_items(driver, key):21 """22 Gets the local storage item by key.2324 :Usage:25 driver.local_storage_items('local_key')26 """27 return driver.execute(Command.GET_LOCAL_STORAGE_ITEM, {'key': key})['value']2829 @staticmethod30 def get_all_cookies_items(driver):31 """ Gets the all cookies.32 """33 return driver.execute(Command.GET_ALL_COOKIES)3435 @staticmethod36 def get_cookies_item(driver, key):37 """ Gets the cookies by key.38 """39 # TODO: Not found command in selenium40 return driver.execute(Command.GET_COOKIE, {'key': key})['value']4142
...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!