Best Python code snippet using autotest_python
equity_barrier_option.py
Source:equity_barrier_option.py
1###############################################################################2# Copyright (C) 2018, 2019, 2020 Dominic O'Kane3###############################################################################45import numpy as np6from enum import Enum78from ...utils.error import FinError9from ...utils.global_vars import gDaysInYear10from ...products.equity.equity_option import EquityOption11from ...models.process_simulator import FinProcessSimulator12from ...market.curves.discount_curve import DiscountCurve13from ...utils.helpers import label_to_string, check_argument_types14from ...utils.date import Date151617from ...utils.math import N1819# TODO: SOME REDESIGN ON THE MONTE CARLO PROCESS IS PROBABLY NEEDED2021###############################################################################222324class EquityBarrierTypes(Enum):25 DOWN_AND_OUT_CALL = 126 DOWN_AND_IN_CALL = 227 UP_AND_OUT_CALL = 328 UP_AND_IN_CALL = 429 UP_AND_OUT_PUT = 530 UP_AND_IN_PUT = 631 DOWN_AND_OUT_PUT = 732 DOWN_AND_IN_PUT = 83334###############################################################################353637class EquityBarrierOption(EquityOption):38 """ Class to hold details of an Equity Barrier Option. It also39 calculates the option price using Black Scholes for 8 different40 variants on the Barrier structure in enum EquityBarrierTypes. """4142 def __init__(self,43 expiry_date: Date,44 strike_price: float,45 option_type: EquityBarrierTypes,46 barrier_level: float,47 num_observations_per_year: (int, float) = 252,48 notional: float = 1.0):49 """ Create the EquityBarrierOption by specifying the expiry date,50 strike price, option type, barrier level, the number of observations51 per year and the notional. """5253 check_argument_types(self.__init__, locals())5455 self._expiry_date = expiry_date56 self._strike_price = float(strike_price)57 self._barrier_level = float(barrier_level)58 self._num_observations_per_year = int(num_observations_per_year)5960 if option_type not in EquityBarrierTypes:61 raise FinError("Option Type " + str(option_type) + " unknown.")6263 self._option_type = option_type64 self._notional = notional6566###############################################################################6768 def value(self,69 valuation_date: Date,70 stock_price: (float, np.ndarray),71 discount_curve: DiscountCurve,72 dividend_curve: DiscountCurve,73 model):74 """ This prices an Equity Barrier option using the formulae given in75 the paper by Clewlow, Llanos and Strickland December 1994 which can be76 found at7778 https://warwick.ac.uk/fac/soc/wbs/subjects/finance/research/wpaperseries/1994/94-54.pdf79 """8081 if isinstance(valuation_date, Date) == False:82 raise FinError("Valuation date is not a Date")8384 if valuation_date > self._expiry_date:85 raise FinError("Valuation date after expiry date.")8687 if discount_curve._valuation_date != valuation_date:88 raise FinError(89 "Discount Curve valuation date not same as option valuation date")9091 if dividend_curve._valuation_date != valuation_date:92 raise FinError(93 "Dividend Curve valuation date not same as option valuation date")9495 if isinstance(stock_price, int):96 stock_price = float(stock_price)9798 if isinstance(stock_price, float):99 stock_prices = [stock_price]100 else:101 stock_prices = stock_price102103 values = []104 for s in stock_prices:105 v = self._value_one(valuation_date, s, discount_curve,106 dividend_curve, model)107 values.append(v)108109 if isinstance(stock_price, float):110 return values[0]111 else:112 return np.array(values)113114###############################################################################115116 def _value_one(self,117 valuation_date: Date,118 stock_price: (float, np.ndarray),119 discount_curve: DiscountCurve,120 dividend_curve: DiscountCurve,121 model):122 """ This values a single option. Because of its structure it cannot123 easily be vectorised which is why it has been wrapped. """124125 texp = (self._expiry_date - valuation_date) / gDaysInYear126127 if texp < 0:128 raise FinError("Option expires before value date.")129130 texp = max(texp, 1e-6)131132 lnS0k = np.log(stock_price / self._strike_price)133 sqrtT = np.sqrt(texp)134135 r = discount_curve.cc_rate(self._expiry_date)136 q = dividend_curve.cc_rate(self._expiry_date)137138 k = self._strike_price139 s = stock_price140 h = self._barrier_level141142 volatility = model._volatility143 sigmaRootT = volatility * sqrtT144 v2 = volatility * volatility145 mu = r - q146 d1 = (lnS0k + (mu + v2 / 2.0) * texp) / sigmaRootT147 d2 = (lnS0k + (mu - v2 / 2.0) * texp) / sigmaRootT148 df = np.exp(-r * texp)149 dq = np.exp(-q * texp)150151 c = s * dq * N(d1) - k * df * N(d2)152 p = k * df * N(-d2) - s * dq * N(-d1)153# print("CALL:",c,"PUT:",p)154155 if self._option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL and s <= h:156 return 0.0157 elif self._option_type == EquityBarrierTypes.UP_AND_OUT_CALL and s >= h:158 return 0.0159 elif self._option_type == EquityBarrierTypes.UP_AND_OUT_PUT and s >= h:160 return 0.0161 elif self._option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT and s <= h:162 return 0.0163 elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_CALL and s <= h:164 return c165 elif self._option_type == EquityBarrierTypes.UP_AND_IN_CALL and s >= h:166 return c167 elif self._option_type == EquityBarrierTypes.UP_AND_IN_PUT and s >= h:168 return p169 elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_PUT and s <= h:170 return p171172 num_observations = 1 + texp * self._num_observations_per_year173174 # Correction by Broadie, Glasserman and Kou, Mathematical Finance, 1997175 # Adjusts the barrier for discrete and not continuous observations176 h_adj = h177 t = texp / num_observations178179 if self._option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL:180 h_adj = h * np.exp(-0.5826 * volatility * np.sqrt(t))181 elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_CALL:182 h_adj = h * np.exp(-0.5826 * volatility * np.sqrt(t))183 elif self._option_type == EquityBarrierTypes.UP_AND_IN_CALL:184 h_adj = h * np.exp(0.5826 * volatility * np.sqrt(t))185 elif self._option_type == EquityBarrierTypes.UP_AND_OUT_CALL:186 h_adj = h * np.exp(0.5826 * volatility * np.sqrt(t))187 elif self._option_type == EquityBarrierTypes.UP_AND_IN_PUT:188 h_adj = h * np.exp(0.5826 * volatility * np.sqrt(t))189 elif self._option_type == EquityBarrierTypes.UP_AND_OUT_PUT:190 h_adj = h * np.exp(0.5826 * volatility * np.sqrt(t))191 elif self._option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT:192 h_adj = h * np.exp(-0.5826 * volatility * np.sqrt(t))193 elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_PUT:194 h_adj = h * np.exp(-0.5826 * volatility * np.sqrt(t))195 else:196 raise FinError("Unknown barrier option type." +197 str(self._option_type))198199 h = h_adj200201 if abs(volatility) < 1e-5:202 volatility = 1e-5203204 l = (mu + v2 / 2.0) / v2205 y = np.log(h * h / (s * k)) / sigmaRootT + l * sigmaRootT206 x1 = np.log(s / h) / sigmaRootT + l * sigmaRootT207 y1 = np.log(h / s) / sigmaRootT + l * sigmaRootT208 hOverS = h / s209210 if self._option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL:211 if h >= k:212 c_do = s * dq * N(x1) - k * df * N(x1 - sigmaRootT) \213 - s * dq * pow(hOverS, 2.0 * l) * N(y1) \214 + k * df * pow(hOverS, 2.0 * l - 2.0) * N(y1 - sigmaRootT)215 price = c_do216 else:217 c_di = s * dq * pow(hOverS, 2.0 * l) * N(y) \218 - k * df * pow(hOverS, 2.0 * l - 2.0) * N(y - sigmaRootT)219 price = c - c_di220 elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_CALL:221 if h <= k:222 c_di = s * dq * pow(hOverS, 2.0 * l) * N(y) \223 - k * df * pow(hOverS, 2.0 * l - 2.0) * N(y - sigmaRootT)224 price = c_di225 else:226 c_do = s * dq * N(x1) \227 - k * df * N(x1 - sigmaRootT) \228 - s * dq * pow(hOverS, 2.0 * l) * N(y1) \229 + k * df * pow(hOverS, 2.0 * l - 2.0) * N(y1 - sigmaRootT)230 price = c - c_do231 elif self._option_type == EquityBarrierTypes.UP_AND_IN_CALL:232 if h >= k:233 c_ui = s * dq * N(x1) - k * df * N(x1 - sigmaRootT) \234 - s * dq * pow(hOverS, 2.0 * l) * (N(-y) - N(-y1)) \235 + k * df * pow(hOverS, 2.0 * l - 2.0) * \236 (N(-y + sigmaRootT) - N(-y1 + sigmaRootT))237 price = c_ui238 else:239 price = c240 elif self._option_type == EquityBarrierTypes.UP_AND_OUT_CALL:241 if h > k:242 c_ui = s * dq * N(x1) - k * df * N(x1 - sigmaRootT) \243 - s * dq * pow(hOverS, 2.0 * l) * (N(-y) - N(-y1)) \244 + k * df * pow(hOverS, 2.0 * l - 2.0) * \245 (N(-y + sigmaRootT) - N(-y1 + sigmaRootT))246 price = c - c_ui247 else:248 price = 0.0249 elif self._option_type == EquityBarrierTypes.UP_AND_IN_PUT:250 if h > k:251 p_ui = -s * dq * pow(hOverS, 2.0 * l) * N(-y) \252 + k * df * pow(hOverS, 2.0 * l - 2.0) * N(-y + sigmaRootT)253 price = p_ui254 else:255 p_uo = -s * dq * N(-x1) \256 + k * df * N(-x1 + sigmaRootT) \257 + s * dq * pow(hOverS, 2.0 * l) * N(-y1) \258 - k * df * pow(hOverS, 2.0 * l - 2.0) * \259 N(-y1 + sigmaRootT)260 price = p - p_uo261 elif self._option_type == EquityBarrierTypes.UP_AND_OUT_PUT:262 if h >= k:263 p_ui = -s * dq * pow(hOverS, 2.0 * l) * N(-y) \264 + k * df * pow(hOverS, 2.0 * l - 2.0) * N(-y + sigmaRootT)265 price = p - p_ui266 else:267 p_uo = -s * dq * N(-x1) \268 + k * df * N(-x1 + sigmaRootT) \269 + s * dq * pow(hOverS, 2.0 * l) * N(-y1) \270 - k * df * pow(hOverS, 2.0 * l - 2.0) * \271 N(-y1 + sigmaRootT)272 price = p_uo273 elif self._option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT:274 if h >= k:275 price = 0.0276 else:277 p_di = -s * dq * N(-x1) \278 + k * df * N(-x1 + sigmaRootT) \279 + s * dq * pow(hOverS, 2.0 * l) * (N(y) - N(y1)) \280 - k * df * pow(hOverS, 2.0 * l - 2.0) * \281 (N(y - sigmaRootT) - N(y1 - sigmaRootT))282 price = p - p_di283 elif self._option_type == EquityBarrierTypes.DOWN_AND_IN_PUT:284 if h >= k:285 price = p286 else:287 p_di = -s * dq * N(-x1) \288 + k * df * N(-x1 + sigmaRootT) \289 + s * dq * pow(hOverS, 2.0 * l) * (N(y) - N(y1)) \290 - k * df * pow(hOverS, 2.0 * l - 2.0) * \291 (N(y - sigmaRootT) - N(y1 - sigmaRootT))292 price = p_di293 else:294 raise FinError("Unknown barrier option type." +295 str(self._option_type))296297 v = price * self._notional298 return v299300###############################################################################301302 def value_mc(self,303 valuation_date: Date,304 stock_price: float,305 discount_curve: DiscountCurve,306 dividend_curve: DiscountCurve,307 process_type,308 model_params,309 numAnnObs: int = 252,310 num_paths: int = 10000,311 seed: int = 4242):312 """ A Monte-Carlo based valuation of the barrier option which simulates313 the evolution of the stock price of at a specified number of annual314 observation times until expiry to examine if the barrier has been315 crossed and the corresponding value of the final payoff, if any. It316 assumes a GBM model for the stock price. """317318 texp = (self._expiry_date - valuation_date) / gDaysInYear319 num_time_steps = int(texp * numAnnObs)320 K = self._strike_price321 B = self._barrier_level322 option_type = self._option_type323324 process = FinProcessSimulator()325326 r = discount_curve.zero_rate(self._expiry_date)327328 # TODO - NEED TO DECIDE IF THIS IS PART OF MODEL PARAMS OR NOT ??????????????329330 r = discount_curve.cc_rate(self._expiry_date)331 q = dividend_curve.cc_rate(self._expiry_date)332333 #######################################################################334335 if option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL and stock_price <= B:336 return 0.0337 elif option_type == EquityBarrierTypes.UP_AND_OUT_CALL and stock_price >= B:338 return 0.0339 elif option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT and stock_price <= B:340 return 0.0341 elif option_type == EquityBarrierTypes.UP_AND_OUT_PUT and stock_price >= B:342 return 0.0343344 #######################################################################345346 simple_call = False347 simple_put = False348349 if option_type == EquityBarrierTypes.DOWN_AND_IN_CALL and stock_price <= B:350 simple_call = True351 elif option_type == EquityBarrierTypes.UP_AND_IN_CALL and stock_price >= B:352 simple_call = True353 elif option_type == EquityBarrierTypes.UP_AND_IN_PUT and stock_price >= B:354 simple_put = True355 elif option_type == EquityBarrierTypes.DOWN_AND_IN_PUT and stock_price <= B:356 simple_put = True357358 if simple_put or simple_call:359 Sall = process.get_process(360 process_type, texp, model_params, 1, num_paths, seed)361362 if simple_call:363 c = (np.maximum(Sall[:, -1] - K, 0.0)).mean()364 c = c * np.exp(-r * texp)365 return c366367 if simple_put:368 p = (np.maximum(K - Sall[:, -1], 0.0)).mean()369 p = p * np.exp(-r * texp)370 return p371372 # Get full set of paths373 Sall = process.get_process(process_type, texp, model_params, num_time_steps,374 num_paths, seed)375376 (num_paths, num_time_steps) = Sall.shape377378 if option_type == EquityBarrierTypes.DOWN_AND_IN_CALL or \379 option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL or \380 option_type == EquityBarrierTypes.DOWN_AND_IN_PUT or \381 option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT:382383 barrierCrossedFromAbove = [False] * num_paths384385 for p in range(0, num_paths):386 barrierCrossedFromAbove[p] = np.any(Sall[p] <= B)387388 if option_type == EquityBarrierTypes.UP_AND_IN_CALL or \389 option_type == EquityBarrierTypes.UP_AND_OUT_CALL or \390 option_type == EquityBarrierTypes.UP_AND_IN_PUT or \391 option_type == EquityBarrierTypes.UP_AND_OUT_PUT:392393 barrierCrossedFromBelow = [False] * num_paths394 for p in range(0, num_paths):395 barrierCrossedFromBelow[p] = np.any(Sall[p] >= B)396397 payoff = np.zeros(num_paths)398 ones = np.ones(num_paths)399400 if option_type == EquityBarrierTypes.DOWN_AND_OUT_CALL:401 payoff = np.maximum(Sall[:, -1] - K, 0.0) * \402 (ones - barrierCrossedFromAbove)403 elif option_type == EquityBarrierTypes.DOWN_AND_IN_CALL:404 payoff = np.maximum(Sall[:, -1] - K, 0.0) * barrierCrossedFromAbove405 elif option_type == EquityBarrierTypes.UP_AND_IN_CALL:406 payoff = np.maximum(Sall[:, -1] - K, 0.0) * barrierCrossedFromBelow407 elif option_type == EquityBarrierTypes.UP_AND_OUT_CALL:408 payoff = np.maximum(Sall[:, -1] - K, 0.0) * \409 (ones - barrierCrossedFromBelow)410 elif option_type == EquityBarrierTypes.UP_AND_IN_PUT:411 payoff = np.maximum(K - Sall[:, -1], 0.0) * barrierCrossedFromBelow412 elif option_type == EquityBarrierTypes.UP_AND_OUT_PUT:413 payoff = np.maximum(K - Sall[:, -1], 0.0) * \414 (ones - barrierCrossedFromBelow)415 elif option_type == EquityBarrierTypes.DOWN_AND_OUT_PUT:416 payoff = np.maximum(K - Sall[:, -1], 0.0) * \417 (ones - barrierCrossedFromAbove)418 elif option_type == EquityBarrierTypes.DOWN_AND_IN_PUT:419 payoff = np.maximum(K - Sall[:, -1], 0.0) * barrierCrossedFromAbove420 else:421 raise FinError("Unknown barrier option type." +422 str(self._option_type))423424 v = payoff.mean() * np.exp(- r * texp)425426 return v * self._notional427428###############################################################################429430 def __repr__(self):431 s = label_to_string("OBJECT TYPE", type(self).__name__)432 s += label_to_string("EXPIRY DATE", self._expiry_date)433 s += label_to_string("STRIKE PRICE", self._strike_price)434 s += label_to_string("OPTION TYPE", self._option_type)435 s += label_to_string("BARRIER LEVEL", self._barrier_level)436 s += label_to_string("NUM OBSERVATIONS",437 self._num_observations_per_year)438 s += label_to_string("NOTIONAL", self._notional, "")439 return s440441###############################################################################442443 def _print(self):444 """ Simple print function for backward compatibility. """445 print(self)446
...
barrier.py
Source:barrier.py
1import re2from grblogtools.parsers.util import typeconvert_groupdict3class BarrierParser:4 # The pattern indicating the initialization of the parser5 barrier_start_pattern = re.compile(6 r"Iter(\s+)Primal(\s+)Dual(\s+)Primal(\s+)Dual(\s+)Compl(\s+)Time"7 )8 barrier_ordering_pattern = re.compile(r"Ordering time: (?P<OrderingTime>[\d\.]+)s")9 # The pattern indicating the barrier progress10 barrier_progress_pattern = re.compile(11 r"\s*(?P<Iteration>\d+)(?P<Indicator>\s|\*)\s+(?P<PObj>[^\s]+)\s+(?P<DObj>[^\s]+)\s+(?P<PRes>[^\s]+)\s+(?P<DRes>[^\s]+)\s+(?P<Compl>[^\s]+)\s+(?P<Time>\d+)s"12 )13 # The pattern indicating the crossover14 barrier_crossover_pattern = re.compile(15 r"\s*Push phase complete: Pinf (?P<PushPhasePInf>[^\s]+), Dinf (?P<PushPhaseDInf>[^\s]+)\s+(?P<PushPhaseEndTime>\d+)s"16 )17 # The pattern indicating the termination of the barrier algorithm18 barrier_termination_patterns = [19 re.compile(20 r"Barrier solved model in (?P<BarIterCount>[^\s]+) iterations and (?P<Runtime>[^\s]+) seconds"21 ),22 re.compile(23 r"Barrier performed (?P<BarIterCount>\d+) iterations in (?P<Runtime>[^\s]+) seconds"24 ),25 ]26 def __init__(self):27 """Initialize the Barrier parser."""28 self._summary = {}29 self._progress = []30 self._started = False31 def parse(self, line: str) -> bool:32 """Parse the given log line to populate summary and progress data.33 Args:34 line (str): A line in the log file.35 Returns:36 bool: Return True if the given line is matched by some pattern.37 """38 barrier_ordering_match = BarrierParser.barrier_ordering_pattern.match(line)39 if barrier_ordering_match:40 self._summary.update(typeconvert_groupdict(barrier_ordering_match))41 return True42 if not self._started:43 match = BarrierParser.barrier_start_pattern.match(line)44 if match:45 self._started = True46 return True47 return False48 progress_match = BarrierParser.barrier_progress_pattern.match(line)49 if progress_match:50 entry = {"Type": "barrier"}51 entry.update(typeconvert_groupdict(progress_match))52 self._progress.append(entry)53 return True54 for barrier_termination_pattern in BarrierParser.barrier_termination_patterns:55 barrier_termination_match = barrier_termination_pattern.match(line)56 if barrier_termination_match:57 self._summary.update(typeconvert_groupdict(barrier_termination_match))58 return True59 crossover_match = BarrierParser.barrier_crossover_pattern.match(line)60 if crossover_match:61 self._summary.update(typeconvert_groupdict(crossover_match))62 return True63 return False64 def get_summary(self) -> dict:65 """Return the current parsed summary."""66 return self._summary67 def get_progress(self) -> list:68 """Return the detailed progress in the barrier method."""...
test_barrier.py
Source:test_barrier.py
1from unittest import TestCase, main2from grblogtools.parsers.barrier import BarrierParser3from grblogtools.parsers.util import parse_block4example_log_barrier = """5Iter Primal Dual Primal Dual Compl Time6 0 4.56435085e+07 1.53061018e+04 1.69e+05 8.58e+00 1.59e+03 2s7 1 3.76722276e+07 -5.41297282e+05 8.07e+04 9.12e+00 8.17e+02 2s8 17 2.17403572e+02 2.17403571e+02 3.93e-14 7.11e-15 7.71e-13 5s9Barrier solved model in 17 iterations and 4.83 seconds (6.45 work units)10Optimal objective 2.17403572e+0211Crossover log...12 57249 DPushes remaining with DInf 0.0000000e+00 8s13 0 DPushes remaining with DInf 0.0000000e+00 9s14 9342 PPushes remaining with PInf 1.2118572e-05 9s15 0 PPushes remaining with PInf 0.0000000e+00 9s16 Push phase complete: Pinf 0.0000000e+00, Dinf 1.8540725e-14 9s17"""18expected_summary_barrier = {19 "BarIterCount": 17,20 "Runtime": 4.83,21 "PushPhasePInf": 0.0000000e00,22 "PushPhaseDInf": 1.8540725e-14,23 "PushPhaseEndTime": 9,24}25expected_progress_barrier = [26 {27 "Type": "barrier",28 "Iteration": 0,29 "Indicator": " ",30 "PObj": 45643508.5,31 "DObj": 15306.1018,32 "PRes": 169000.0,33 "DRes": 8.58,34 "Compl": 1590.0,35 "Time": 2,36 },37 {38 "Type": "barrier",39 "Iteration": 1,40 "Indicator": " ",41 "PObj": 37672227.6,42 "DObj": -541297.282,43 "PRes": 80700.0,44 "DRes": 9.12,45 "Compl": 817.0,46 "Time": 2,47 },48 {49 "Type": "barrier",50 "Iteration": 17,51 "Indicator": " ",52 "PObj": 217.403572,53 "DObj": 217.403571,54 "PRes": 3.93e-14,55 "DRes": 7.11e-15,56 "Compl": 7.71e-13,57 "Time": 5,58 },59]60class TestBarrier(TestCase):61 def setUp(self):62 self.barrier_parser = BarrierParser()63 def test_get_summary_progress(self):64 parse_block(self.barrier_parser, example_log_barrier)65 self.assertEqual(self.barrier_parser.get_summary(), expected_summary_barrier)66 self.assertEqual(self.barrier_parser.get_progress(), expected_progress_barrier)67if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!