How to use strategy method in pandera

Best Python code snippet using pandera_python

ctaEngine.py

Source:ctaEngine.py Github

copy

Full Screen

1# encoding: UTF-82'''3本文件中实现了CTA策略引擎,针对CTA类型的策略,抽象简化了部分底层接口的功能。4'''5from __future__ import division6import json7import os8import traceback9from collections import OrderedDict10from datetime import datetime, timedelta11from copy import copy12from vnpy.event import Event13from vnpy.trader.vtEvent import *14from vnpy.trader.vtConstant import *15from vnpy.trader.vtObject import VtTickData, VtBarData16from vnpy.trader.vtGateway import VtSubscribeReq, VtOrderReq, VtCancelOrderReq, VtLogData17from vnpy.trader.vtFunction import todayDate, getJsonPath18from .ctaBase import *19from .strategy import STRATEGY_CLASS20########################################################################21class CtaEngine(object):22 """CTA策略引擎"""23 settingFileName = 'CTA_setting.json'24 settingfilePath = getJsonPath(settingFileName, __file__)25 26 STATUS_FINISHED = set([STATUS_REJECTED, STATUS_CANCELLED, STATUS_ALLTRADED])27 #----------------------------------------------------------------------28 def __init__(self, mainEngine, eventEngine):29 """Constructor"""30 self.mainEngine = mainEngine31 self.eventEngine = eventEngine32 33 # 当前日期34 self.today = todayDate()35 36 # 保存策略实例的字典37 # key为策略名称,value为策略实例,注意策略名称不允许重复38 self.strategyDict = {}39 40 # 保存vtSymbol和策略实例映射的字典(用于推送tick数据)41 # 由于可能多个strategy交易同一个vtSymbol,因此key为vtSymbol42 # value为包含所有相关strategy对象的list43 self.tickStrategyDict = {}44 45 # 保存vtOrderID和strategy对象映射的字典(用于推送order和trade数据)46 # key为vtOrderID,value为strategy对象47 self.orderStrategyDict = {} 48 49 # 本地停止单编号计数50 self.stopOrderCount = 051 # stopOrderID = STOPORDERPREFIX + str(stopOrderCount)52 53 # 本地停止单字典54 # key为stopOrderID,value为stopOrder对象55 self.stopOrderDict = {} # 停止单撤销后不会从本字典中删除56 self.workingStopOrderDict = {} # 停止单撤销后会从本字典中删除57 58 # 保存策略名称和委托号列表的字典59 # key为name,value为保存orderID(限价+本地停止)的集合60 self.strategyOrderDict = {}61 62 # 成交号集合,用来过滤已经收到过的成交推送63 self.tradeSet = set()64 65 # 引擎类型为实盘66 self.engineType = ENGINETYPE_TRADING67 68 # 注册日式事件类型69 self.mainEngine.registerLogEvent(EVENT_CTA_LOG)70 71 # 注册事件监听72 self.registerEvent()73 74 #----------------------------------------------------------------------75 def sendOrder(self, vtSymbol, orderType, price, volume, strategy):76 """发单"""77 contract = self.mainEngine.getContract(vtSymbol)78 79 req = VtOrderReq()80 req.symbol = contract.symbol81 req.exchange = contract.exchange82 req.vtSymbol = contract.vtSymbol83 req.price = self.roundToPriceTick(contract.priceTick, price)84 req.volume = volume85 86 req.productClass = strategy.productClass87 req.currency = strategy.currency 88 89 # 设计为CTA引擎发出的委托只允许使用限价单90 req.priceType = PRICETYPE_LIMITPRICE 91 92 # CTA委托类型映射93 if orderType == CTAORDER_BUY:94 req.direction = DIRECTION_LONG95 req.offset = OFFSET_OPEN96 97 elif orderType == CTAORDER_SELL:98 req.direction = DIRECTION_SHORT99 req.offset = OFFSET_CLOSE100 101 elif orderType == CTAORDER_SHORT:102 req.direction = DIRECTION_SHORT103 req.offset = OFFSET_OPEN104 105 elif orderType == CTAORDER_COVER:106 req.direction = DIRECTION_LONG107 req.offset = OFFSET_CLOSE108 109 # 委托转换110 reqList = self.mainEngine.convertOrderReq(req)111 vtOrderIDList = []112 113 if not reqList:114 return vtOrderIDList115 116 for convertedReq in reqList:117 vtOrderID = self.mainEngine.sendOrder(convertedReq, contract.gatewayName) # 发单118 self.orderStrategyDict[vtOrderID] = strategy # 保存vtOrderID和策略的映射关系119 self.strategyOrderDict[strategy.name].add(vtOrderID) # 添加到策略委托号集合中120 vtOrderIDList.append(vtOrderID)121 122 self.writeCtaLog(u'策略%s发送委托,%s,%s,%s@%s' 123 %(strategy.name, vtSymbol, req.direction, volume, price))124 125 return vtOrderIDList126 127 #----------------------------------------------------------------------128 def cancelOrder(self, vtOrderID):129 """撤单"""130 # 查询报单对象131 order = self.mainEngine.getOrder(vtOrderID)132 133 # 如果查询成功134 if order:135 # 检查是否报单还有效,只有有效时才发出撤单指令136 orderFinished = (order.status==STATUS_ALLTRADED or order.status==STATUS_CANCELLED)137 if not orderFinished:138 req = VtCancelOrderReq()139 req.symbol = order.symbol140 req.exchange = order.exchange141 req.frontID = order.frontID142 req.sessionID = order.sessionID143 req.orderID = order.orderID144 self.mainEngine.cancelOrder(req, order.gatewayName) 145 #----------------------------------------------------------------------146 def sendStopOrder(self, vtSymbol, orderType, price, volume, strategy):147 """发停止单(本地实现)"""148 self.stopOrderCount += 1149 stopOrderID = STOPORDERPREFIX + str(self.stopOrderCount)150 151 so = StopOrder()152 so.vtSymbol = vtSymbol153 so.orderType = orderType154 so.price = price155 so.volume = volume156 so.strategy = strategy157 so.stopOrderID = stopOrderID158 so.status = STOPORDER_WAITING159 160 if orderType == CTAORDER_BUY:161 so.direction = DIRECTION_LONG162 so.offset = OFFSET_OPEN163 elif orderType == CTAORDER_SELL:164 so.direction = DIRECTION_SHORT165 so.offset = OFFSET_CLOSE166 elif orderType == CTAORDER_SHORT:167 so.direction = DIRECTION_SHORT168 so.offset = OFFSET_OPEN169 elif orderType == CTAORDER_COVER:170 so.direction = DIRECTION_LONG171 so.offset = OFFSET_CLOSE 172 173 # 保存stopOrder对象到字典中174 self.stopOrderDict[stopOrderID] = so175 self.workingStopOrderDict[stopOrderID] = so176 177 # 保存stopOrderID到策略委托号集合中178 self.strategyOrderDict[strategy.name].add(stopOrderID)179 180 # 推送停止单状态181 strategy.onStopOrder(so)182 183 return [stopOrderID]184 185 #----------------------------------------------------------------------186 def cancelStopOrder(self, stopOrderID):187 """撤销停止单"""188 # 检查停止单是否存在189 if stopOrderID in self.workingStopOrderDict:190 so = self.workingStopOrderDict[stopOrderID]191 strategy = so.strategy192 193 # 更改停止单状态为已撤销194 so.status = STOPORDER_CANCELLED195 196 # 从活动停止单字典中移除197 del self.workingStopOrderDict[stopOrderID]198 199 # 从策略委托号集合中移除200 s = self.strategyOrderDict[strategy.name]201 if stopOrderID in s:202 s.remove(stopOrderID)203 204 # 通知策略205 strategy.onStopOrder(so)206 #----------------------------------------------------------------------207 def processStopOrder(self, tick):208 """收到行情后处理本地停止单(检查是否要立即发出)"""209 vtSymbol = tick.vtSymbol210 211 # 首先检查是否有策略交易该合约212 if vtSymbol in self.tickStrategyDict:213 # 遍历等待中的停止单,检查是否会被触发214 for so in self.workingStopOrderDict.values():215 if so.vtSymbol == vtSymbol:216 longTriggered = so.direction==DIRECTION_LONG and tick.lastPrice>=so.price # 多头停止单被触发217 shortTriggered = so.direction==DIRECTION_SHORT and tick.lastPrice<=so.price # 空头停止单被触发218 219 if longTriggered or shortTriggered:220 # 买入和卖出分别以涨停跌停价发单(模拟市价单)221 if so.direction==DIRECTION_LONG:222 price = tick.upperLimit223 else:224 price = tick.lowerLimit225 226 # 发出市价委托227 self.sendOrder(so.vtSymbol, so.orderType, price, so.volume, so.strategy)228 229 # 从活动停止单字典中移除该停止单230 del self.workingStopOrderDict[so.stopOrderID]231 232 # 从策略委托号集合中移除233 s = self.strategyOrderDict[so.strategy.name]234 if so.stopOrderID in s:235 s.remove(so.stopOrderID)236 237 # 更新停止单状态,并通知策略238 so.status = STOPORDER_TRIGGERED239 so.strategy.onStopOrder(so)240 #----------------------------------------------------------------------241 def processTickEvent(self, event):242 """处理行情推送"""243 tick = event.dict_['data']244 245 tick = copy(tick)246 247 # 收到tick行情后,先处理本地停止单(检查是否要立即发出)248 self.processStopOrder(tick)249 250 # 推送tick到对应的策略实例进行处理251 if tick.vtSymbol in self.tickStrategyDict:252 # tick时间可能出现异常数据,使用try...except实现捕捉和过滤253 try:254 # 添加datetime字段255 if not tick.datetime:256 tick.datetime = datetime.strptime(' '.join([tick.date, tick.time]), '%Y%m%d %H:%M:%S.%f')257 except ValueError:258 self.writeCtaLog(traceback.format_exc())259 return260 261 # 逐个推送到策略实例中262 l = self.tickStrategyDict[tick.vtSymbol]263 for strategy in l:264 self.callStrategyFunc(strategy, strategy.onTick, tick)265 266 #----------------------------------------------------------------------267 def processOrderEvent(self, event):268 """处理委托推送"""269 order = event.dict_['data']270 271 vtOrderID = order.vtOrderID272 273 if vtOrderID in self.orderStrategyDict:274 strategy = self.orderStrategyDict[vtOrderID] 275 276 # 如果委托已经完成(拒单、撤销、全成),则从活动委托集合中移除277 if order.status in self.STATUS_FINISHED:278 s = self.strategyOrderDict[strategy.name]279 if vtOrderID in s:280 s.remove(vtOrderID)281 282 self.callStrategyFunc(strategy, strategy.onOrder, order)283 284 #----------------------------------------------------------------------285 def processTradeEvent(self, event):286 """处理成交推送"""287 trade = event.dict_['data']288 289 # 过滤已经收到过的成交回报290 if trade.vtTradeID in self.tradeSet:291 return292 self.tradeSet.add(trade.vtTradeID)293 294 # 将成交推送到策略对象中295 if trade.vtOrderID in self.orderStrategyDict:296 strategy = self.orderStrategyDict[trade.vtOrderID]297 298 # 计算策略持仓299 if trade.direction == DIRECTION_LONG:300 strategy.pos += trade.volume301 else:302 strategy.pos -= trade.volume303 304 self.callStrategyFunc(strategy, strategy.onTrade, trade)305 306 # 保存策略持仓到数据库307 self.saveSyncData(strategy) 308 309 #----------------------------------------------------------------------310 def registerEvent(self):311 """注册事件监听"""312 self.eventEngine.register(EVENT_TICK, self.processTickEvent)313 self.eventEngine.register(EVENT_ORDER, self.processOrderEvent)314 self.eventEngine.register(EVENT_TRADE, self.processTradeEvent)315 # ROBINLIN316 # self.eventEngine.register(EVENT_ACCOUNT, self.processAccountEvent)317 # self.eventEngine.register(EVENT_POSITION, self.processPositionEvent)318 #ROBINLIN----------------------------------------------------------------------319 def processAccountEvent(self,event):320 """账户资金更新"""321 # self.writeCtaLog("processAccountEvent::账户资金更新" )322 account = event.dict_['data']323 for name in self.strategyDict.keys():324 strategy = self.strategyDict[name]325 if strategy.inited:326 self.callStrategyFunc(strategy, strategy.onAccountEvent, account)327 #ROBINLIN----------------------------------------------------------------------328 def processPositionEvent(self, event):329 """持仓更新"""330 # self.writeCtaLog("processPositionEvent::持仓更新")331 position = event.dict_['data']332 for name in self.strategyDict.keys():333 strategy = self.strategyDict[name]334 if strategy.inited:335 self.callStrategyFunc(strategy, strategy.onPositionEvent, position)336 337 #----------------------------------------------------------------------338 def insertData(self, dbName, collectionName, data):339 """插入数据到数据库(这里的data可以是VtTickData或者VtBarData)"""340 self.mainEngine.dbInsert(dbName, collectionName, data.__dict__)341 342 #----------------------------------------------------------------------343 def loadBar(self, dbName, collectionName, days):344 """从数据库中读取Bar数据,startDate是datetime对象"""345 startDate = self.today - timedelta(days)346 347 d = {'datetime':{'$gte':startDate}}348 barData = self.mainEngine.dbQuery(dbName, collectionName, d, 'datetime')349 350 l = []351 for d in barData:352 bar = VtBarData()353 bar.__dict__ = d354 l.append(bar)355 return l356 357 #----------------------------------------------------------------------358 def loadTick(self, dbName, collectionName, days):359 """从数据库中读取Tick数据,startDate是datetime对象"""360 startDate = self.today - timedelta(days)361 362 d = {'datetime':{'$gte':startDate}}363 tickData = self.mainEngine.dbQuery(dbName, collectionName, d, 'datetime')364 365 l = []366 for d in tickData:367 tick = VtTickData()368 tick.__dict__ = d369 l.append(tick)370 return l 371 372 #----------------------------------------------------------------------373 def writeCtaLog(self, content):374 """快速发出CTA模块日志事件"""375 log = VtLogData()376 log.logContent = content377 log.gatewayName = 'CTA_STRATEGY'378 event = Event(type_=EVENT_CTA_LOG)379 event.dict_['data'] = log380 self.eventEngine.put(event) 381 382 #----------------------------------------------------------------------383 def loadStrategy(self, setting):384 """载入策略"""385 try:386 name = setting['name']387 className = setting['className']388 except Exception:389 msg = traceback.format_exc()390 self.writeCtaLog(u'载入策略出错:%s' %msg)391 return392 393 # 获取策略类394 strategyClass = STRATEGY_CLASS.get(className, None)395 if not strategyClass:396 self.writeCtaLog(u'找不到策略类:%s' %className)397 return398 399 # 防止策略重名400 if name in self.strategyDict:401 self.writeCtaLog(u'策略实例重名:%s' %name)402 else:403 # 创建策略实例404 strategy = strategyClass(self, setting) 405 self.strategyDict[name] = strategy406 407 # 创建委托号列表408 self.strategyOrderDict[name] = set()409 410 # 保存Tick映射关系411 if strategy.vtSymbol in self.tickStrategyDict:412 l = self.tickStrategyDict[strategy.vtSymbol]413 else:414 l = []415 self.tickStrategyDict[strategy.vtSymbol] = l416 l.append(strategy)417 418 #----------------------------------------------------------------------419 def subscribeMarketData(self, strategy):420 """订阅行情"""421 # 订阅合约422 contract = self.mainEngine.getContract(strategy.vtSymbol)423 if contract:424 req = VtSubscribeReq()425 req.symbol = contract.symbol426 req.exchange = contract.exchange427 428 # 对于IB接口订阅行情时所需的货币和产品类型,从策略属性中获取429 req.currency = strategy.currency430 req.productClass = strategy.productClass431 432 self.mainEngine.subscribe(req, contract.gatewayName)433 else:434 self.writeCtaLog(u'%s的交易合约%s无法找到' %(strategy.name, strategy.vtSymbol))435 #----------------------------------------------------------------------436 def initStrategy(self, name):437 """初始化策略"""438 if name in self.strategyDict:439 strategy = self.strategyDict[name]440 441 if not strategy.inited:442 strategy.inited = True443 self.callStrategyFunc(strategy, strategy.onInit)444 self.loadSyncData(strategy) # 初始化完成后加载同步数据445 self.subscribeMarketData(strategy) # 加载同步数据后再订阅行情446 else:447 self.writeCtaLog(u'请勿重复初始化策略实例:%s' %name)448 else:449 self.writeCtaLog(u'策略实例不存在:%s' %name) 450 #---------------------------------------------------------------------451 def startStrategy(self, name):452 """启动策略"""453 if name in self.strategyDict:454 strategy = self.strategyDict[name]455 456 if strategy.inited and not strategy.trading:457 strategy.trading = True458 self.callStrategyFunc(strategy, strategy.onStart)459 else:460 self.writeCtaLog(u'策略实例不存在:%s' %name)461 462 #----------------------------------------------------------------------463 def stopStrategy(self, name):464 """停止策略"""465 if name in self.strategyDict:466 strategy = self.strategyDict[name]467 468 if strategy.trading:469 #strategy.trading = False470 self.callStrategyFunc(strategy, strategy.onStop)471 472 # 对该策略发出的所有限价单进行撤单473 for vtOrderID, s in self.orderStrategyDict.items():474 if s is strategy:475 self.cancelOrder(vtOrderID)476 477 # 对该策略发出的所有本地停止单撤单478 for stopOrderID, so in self.workingStopOrderDict.items():479 if so.strategy is strategy:480 self.cancelStopOrder(stopOrderID)481 strategy.trading = False #RL move from front to here, let sync data working482 else:483 self.writeCtaLog(u'策略实例不存在:%s' %name) 484 485 #----------------------------------------------------------------------486 def initAll(self):487 """全部初始化"""488 for name in self.strategyDict.keys():489 self.initStrategy(name) 490 491 #----------------------------------------------------------------------492 def startAll(self):493 """全部启动"""494 for name in self.strategyDict.keys():495 self.startStrategy(name)496 497 #----------------------------------------------------------------------498 def stopAll(self):499 """全部停止"""500 for name in self.strategyDict.keys():501 self.stopStrategy(name) 502 503 #----------------------------------------------------------------------504 def saveSetting(self):505 """保存策略配置"""506 with open(self.settingfilePath, 'w') as f:507 l = []508 509 for strategy in self.strategyDict.values():510 setting = {}511 for param in strategy.paramList:512 setting[param] = strategy.__getattribute__(param)513 l.append(setting)514 515 jsonL = json.dumps(l, indent=4)516 f.write(jsonL)517 518 #----------------------------------------------------------------------519 def loadSetting(self):520 """读取策略配置"""521 with open(self.settingfilePath) as f:522 l = json.load(f)523 524 for setting in l:525 self.loadStrategy(setting)526 527 #----------------------------------------------------------------------528 def getStrategyVar(self, name):529 """获取策略当前的变量字典"""530 if name in self.strategyDict:531 strategy = self.strategyDict[name]532 varDict = OrderedDict()533 534 for key in strategy.varList:535 varDict[key] = strategy.__getattribute__(key)536 537 return varDict538 else:539 self.writeCtaLog(u'策略实例不存在:' + name) 540 return None541 542 #----------------------------------------------------------------------543 def getStrategyParam(self, name):544 """获取策略的参数字典"""545 if name in self.strategyDict:546 strategy = self.strategyDict[name]547 paramDict = OrderedDict()548 549 for key in strategy.paramList: 550 paramDict[key] = strategy.__getattribute__(key)551 552 return paramDict553 else:554 self.writeCtaLog(u'策略实例不存在:' + name) 555 return None556 557 #----------------------------------------------------------------------558 def getStrategyNames(self):559 """查询所有策略名称"""560 return self.strategyDict.keys() 561 562 #----------------------------------------------------------------------563 def putStrategyEvent(self, name):564 """触发策略状态变化事件(通常用于通知GUI更新)"""565 strategy = self.strategyDict[name]566 d = {k:strategy.__getattribute__(k) for k in strategy.varList}567 568 event = Event(EVENT_CTA_STRATEGY+name)569 event.dict_['data'] = d570 self.eventEngine.put(event)571 572 d2 = {k:str(v) for k,v in d.items()}573 d2['name'] = name574 event2 = Event(EVENT_CTA_STRATEGY)575 event2.dict_['data'] = d2576 self.eventEngine.put(event2) 577 578 #----------------------------------------------------------------------579 def callStrategyFunc(self, strategy, func, params=None):580 """调用策略的函数,若触发异常则捕捉"""581 try:582 if params:583 func(params)584 else:585 func()586 except Exception:587 # 停止策略,修改状态为未初始化588 strategy.trading = False589 strategy.inited = False590 591 # 发出日志592 content = '\n'.join([u'策略%s触发异常已停止' %strategy.name,593 traceback.format_exc()])594 self.writeCtaLog(content)595 596 #----------------------------------------------------------------------597 def saveSyncData(self, strategy):598 """保存策略的持仓情况到数据库"""599 flt = {'name': strategy.name,600 'vtSymbol': strategy.vtSymbol}601 602 d = copy(flt)603 for key in strategy.syncList:604 d[key] = strategy.__getattribute__(key)605 606 self.mainEngine.dbUpdate(POSITION_DB_NAME, strategy.className,607 d, flt, True)608 609 content = u'策略%s同步数据保存成功,当前持仓%s' %(strategy.name, strategy.pos)610 #self.writeCtaLog(content)611 612 #----------------------------------------------------------------------613 def loadSyncData(self, strategy):614 """从数据库载入策略的持仓情况"""615 flt = {'name': strategy.name,616 'vtSymbol': strategy.vtSymbol}617 syncData = self.mainEngine.dbQuery(POSITION_DB_NAME, strategy.className, flt)618 619 if not syncData:620 return621 622 d = syncData[0]623 624 for key in strategy.syncList:625 if key in d:626 strategy.__setattr__(key, d[key])627 628 #----------------------------------------------------------------------629 def roundToPriceTick(self, priceTick, price):630 """取整价格到合约最小价格变动"""631 if not priceTick:632 return price633 634 newPrice = round(price/priceTick, 0) * priceTick635 return newPrice 636 637 #----------------------------------------------------------------------638 def stop(self):639 """停止"""640 pass641 642 #----------------------------------------------------------------------643 def cancelAll(self, name):644 """全部撤单"""645 s = self.strategyOrderDict[name]646 647 # 遍历列表,全部撤单648 # 这里不能直接遍历集合s,因为撤单时会修改s中的内容,导致出错649 for orderID in list(s):650 if STOPORDERPREFIX in orderID:651 self.cancelStopOrder(orderID)652 else:...

Full Screen

Full Screen

credentials.test.js

Source:credentials.test.js Github

copy

Full Screen

1"use strict";2const rewire = require("rewire");3const should = require("should");4const sinon = require("sinon");5const {6 BadRequestError,7 Request,8 SizeLimitError,9} = require("../../../../index");10const KuzzleMock = require("../../../mocks/kuzzle.mock");11const SecurityController = rewire(12 "../../../../lib/api/controllers/securityController"13);14describe("Test: security controller - credentials", () => {15 let kuzzle;16 let request;17 let securityController;18 beforeEach(() => {19 kuzzle = new KuzzleMock();20 securityController = new SecurityController();21 kuzzle.pluginsManager.listStrategies.returns(["someStrategy"]);22 });23 describe("#createCredentials", () => {24 it("should call the plugin create method", () => {25 const methodStub = sinon.stub().resolves({ foo: "bar" });26 request = new Request({27 controller: "security",28 action: "createCredentials",29 strategy: "someStrategy",30 body: {31 some: "credentials",32 },33 _id: "someUserId",34 });35 kuzzle.pluginsManager.getStrategyMethod.returns(methodStub);36 return securityController.createCredentials(request).then((result) => {37 should(result).be.deepEqual({ foo: "bar" });38 should(kuzzle.pluginsManager.getStrategyMethod).be.calledTwice();39 should(40 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[0]41 ).be.eql("someStrategy");42 should(43 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[1]44 ).be.eql("validate");45 should(46 kuzzle.pluginsManager.getStrategyMethod.secondCall.args[0]47 ).be.eql("someStrategy");48 should(49 kuzzle.pluginsManager.getStrategyMethod.secondCall.args[1]50 ).be.eql("create");51 should(methodStub).be.calledTwice();52 should(methodStub.firstCall.args[0]).be.eql(request);53 should(methodStub.firstCall.args[1]).be.deepEqual({54 some: "credentials",55 });56 should(methodStub.firstCall.args[2]).be.eql("someUserId");57 should(methodStub.firstCall.args[3]).be.eql("someStrategy");58 should(methodStub.secondCall.args[0]).be.eql(request);59 should(methodStub.secondCall.args[1]).be.deepEqual({60 some: "credentials",61 });62 should(methodStub.secondCall.args[2]).be.eql("someUserId");63 should(methodStub.secondCall.args[3]).be.eql("someStrategy");64 });65 });66 it("should throw if the user does not already exist", () => {67 request = new Request({68 controller: "security",69 action: "createCredentials",70 strategy: "someStrategy",71 body: {72 some: "credentials",73 },74 _id: "someUserId",75 });76 const error = new Error("foo");77 kuzzle.ask78 .withArgs("core:security:user:get", "someUserId")79 .rejects(error);80 return should(securityController.createCredentials(request)).rejectedWith(81 error82 );83 });84 });85 describe("#updateCredentials", () => {86 it("should call the plugin update method", () => {87 const methodStub = sinon.stub().resolves({ foo: "bar" });88 request = new Request({89 controller: "security",90 action: "createCredentials",91 strategy: "someStrategy",92 body: {93 some: "credentials",94 },95 _id: "someUserId",96 });97 kuzzle.pluginsManager.getStrategyMethod.returns(methodStub);98 return securityController.updateCredentials(request).then((result) => {99 should(result).be.deepEqual({ foo: "bar" });100 should(kuzzle.pluginsManager.getStrategyMethod).be.calledTwice();101 should(102 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[0]103 ).be.eql("someStrategy");104 should(105 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[1]106 ).be.eql("validate");107 should(108 kuzzle.pluginsManager.getStrategyMethod.secondCall.args[0]109 ).be.eql("someStrategy");110 should(111 kuzzle.pluginsManager.getStrategyMethod.secondCall.args[1]112 ).be.eql("update");113 should(methodStub).be.calledTwice();114 should(methodStub.firstCall.args[0]).be.eql(request);115 should(methodStub.firstCall.args[1]).be.deepEqual({116 some: "credentials",117 });118 should(methodStub.firstCall.args[2]).be.eql("someUserId");119 should(methodStub.firstCall.args[3]).be.eql("someStrategy");120 should(methodStub.secondCall.args[0]).be.eql(request);121 should(methodStub.secondCall.args[1]).be.deepEqual({122 some: "credentials",123 });124 should(methodStub.secondCall.args[2]).be.eql("someUserId");125 should(methodStub.secondCall.args[3]).be.eql("someStrategy");126 });127 });128 it("should throw if the user does not already exist", () => {129 request = new Request({130 controller: "security",131 action: "updateCredentials",132 strategy: "someStrategy",133 body: {134 some: "credentials",135 },136 _id: "someUserId",137 });138 const error = new Error("foo");139 kuzzle.ask140 .withArgs("core:security:user:get", "someUserId")141 .rejects(error);142 return should(securityController.updateCredentials(request)).rejectedWith(143 error144 );145 });146 });147 describe("#hasCredentials", () => {148 it("should call the plugin exists method", () => {149 const methodStub = sinon.stub().resolves({ foo: "bar" });150 request = new Request({151 controller: "security",152 action: "hasCredentials",153 strategy: "someStrategy",154 _id: "someUserId",155 });156 kuzzle.pluginsManager.getStrategyMethod.returns(methodStub);157 return securityController.hasCredentials(request).then((result) => {158 should(result).be.deepEqual({ foo: "bar" });159 should(kuzzle.pluginsManager.getStrategyMethod).be.calledOnce();160 should(161 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[0]162 ).be.eql("someStrategy");163 should(164 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[1]165 ).be.eql("exists");166 should(methodStub).be.calledOnce();167 should(methodStub.firstCall.args[0]).be.eql(request);168 should(methodStub.firstCall.args[1]).be.eql("someUserId");169 should(methodStub.firstCall.args[2]).be.eql("someStrategy");170 });171 });172 });173 describe("#validateCredentials", () => {174 it("should call the plugin validate method", () => {175 const methodStub = sinon.stub().resolves({ foo: "bar" });176 request = new Request({177 controller: "security",178 action: "validateCredentials",179 strategy: "someStrategy",180 body: {181 some: "credentials",182 },183 _id: "someUserId",184 });185 kuzzle.pluginsManager.getStrategyMethod.returns(methodStub);186 return securityController.validateCredentials(request).then((result) => {187 should(result).be.deepEqual({ foo: "bar" });188 should(kuzzle.pluginsManager.getStrategyMethod).be.calledOnce();189 should(190 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[0]191 ).be.eql("someStrategy");192 should(193 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[1]194 ).be.eql("validate");195 should(methodStub).be.calledOnce();196 should(methodStub.firstCall.args[0]).be.eql(request);197 should(methodStub.firstCall.args[1]).be.deepEqual({198 some: "credentials",199 });200 should(methodStub.firstCall.args[2]).be.eql("someUserId");201 should(methodStub.firstCall.args[3]).be.eql("someStrategy");202 });203 });204 });205 describe("#deleteCredentials", () => {206 it("should call the plugin delete method", () => {207 const methodStub = sinon.stub().resolves({ foo: "bar" });208 request = new Request({209 controller: "security",210 action: "deleteCredentials",211 strategy: "someStrategy",212 _id: "someUserId",213 });214 kuzzle.pluginsManager.getStrategyMethod.returns(methodStub);215 return securityController.deleteCredentials(request).then((result) => {216 should(result).be.deepEqual({ acknowledged: true });217 should(kuzzle.pluginsManager.getStrategyMethod).be.calledOnce();218 should(219 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[0]220 ).be.eql("someStrategy");221 should(222 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[1]223 ).be.eql("delete");224 should(methodStub).be.calledOnce();225 should(methodStub.firstCall.args[0]).be.eql(request);226 should(methodStub.firstCall.args[1]).be.eql("someUserId");227 should(methodStub.firstCall.args[2]).be.eql("someStrategy");228 });229 });230 });231 describe("#searchUsersByCredentials", () => {232 let query;233 let methodStub;234 let stubResult;235 beforeEach(() => {236 query = {237 bool: {238 must: [239 {240 match: {241 credentials: "test@test.com",242 },243 },244 ],245 },246 };247 request = new Request({248 controller: "security",249 action: "searchUsersByCredentials",250 strategy: "someStrategy",251 body: { query },252 });253 securityController.translateKoncorde = sinon.stub().resolves();254 stubResult = {255 hits: [{ credentials: "test@test.com", kuid: "kuid" }],256 total: 1,257 };258 methodStub = sinon.stub().resolves(stubResult);259 kuzzle.pluginsManager.getStrategyMethod.returns(methodStub);260 });261 it("should return the result of the appropriate method from the right strategy plugin", async () => {262 const result = await securityController.searchUsersByCredentials(request);263 should(securityController.translateKoncorde).not.be.called();264 should(kuzzle.pluginsManager.getStrategyMethod).be.calledOnce();265 should(kuzzle.pluginsManager.getStrategyMethod).be.calledWith(266 "someStrategy",267 "search"268 );269 should(methodStub).be.calledOnce();270 should(methodStub.firstCall.args[0]).be.eql({ query });271 should(result).be.deepEqual(stubResult);272 });273 it("should throw if the optional method search has not been implemented", () => {274 kuzzle.pluginsManager.getStrategyMethod.returns(undefined);275 return should(276 securityController.searchUsersByCredentials(request)277 ).rejectedWith({ id: "plugin.strategy.missing_optional_method" });278 });279 it("should reject if the size argument exceeds server configuration", () => {280 kuzzle.config.limits.documentsFetchCount = 1;281 request.input.args.size = 10;282 return should(283 securityController.searchUsersByCredentials(request)284 ).rejectedWith(SizeLimitError, {285 id: "services.storage.get_limit_exceeded",286 });287 });288 it('should reject if the "lang" is not supported', () => {289 request.input.args.lang = "turkish";290 return should(291 securityController.searchUsersByCredentials(request)292 ).rejectedWith(BadRequestError, { id: "api.assert.invalid_argument" });293 });294 it('should call the "translateKoncorde" method if "lang" is "koncorde"', async () => {295 request.input.body = {296 query: { equals: { credentials: "test@test.com" } },297 };298 request.input.args.lang = "koncorde";299 await securityController.searchUsersByCredentials(request);300 should(securityController.translateKoncorde).be.calledWith({301 equals: { credentials: "test@test.com" },302 });303 });304 });305 describe("#getCredentials", () => {306 it("should call the plugin getInfo method if it is provided", () => {307 const methodStub = sinon.stub().resolves({ foo: "bar" });308 request = new Request({309 controller: "security",310 action: "getCredentials",311 strategy: "someStrategy",312 _id: "someUserId",313 });314 kuzzle.pluginsManager.hasStrategyMethod.returns(true);315 kuzzle.pluginsManager.getStrategyMethod.returns(methodStub);316 return securityController.getCredentials(request).then((result) => {317 should(result).be.deepEqual({ foo: "bar" });318 should(kuzzle.pluginsManager.hasStrategyMethod).be.calledOnce();319 should(320 kuzzle.pluginsManager.hasStrategyMethod.firstCall.args[0]321 ).be.eql("someStrategy");322 should(323 kuzzle.pluginsManager.hasStrategyMethod.firstCall.args[1]324 ).be.eql("getInfo");325 should(kuzzle.pluginsManager.getStrategyMethod).be.calledOnce();326 should(327 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[0]328 ).be.eql("someStrategy");329 should(330 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[1]331 ).be.eql("getInfo");332 should(methodStub).be.calledOnce();333 should(methodStub.firstCall.args[0]).be.eql(request);334 should(methodStub.firstCall.args[1]).be.eql("someUserId");335 should(methodStub.firstCall.args[2]).be.eql("someStrategy");336 });337 });338 it("should resolve to an empty object if getInfo method is not provided", () => {339 const methodStub = sinon.stub().resolves({ foo: "bar" });340 request = new Request({341 controller: "security",342 action: "getCredentials",343 strategy: "someStrategy",344 _id: "someUserId",345 });346 kuzzle.pluginsManager.hasStrategyMethod.returns(false);347 kuzzle.pluginsManager.getStrategyMethod.returns(methodStub);348 return securityController.getCredentials(request).then((result) => {349 should(result).be.deepEqual({});350 should(kuzzle.pluginsManager.hasStrategyMethod).be.calledOnce();351 should(352 kuzzle.pluginsManager.hasStrategyMethod.firstCall.args[0]353 ).be.eql("someStrategy");354 should(355 kuzzle.pluginsManager.hasStrategyMethod.firstCall.args[1]356 ).be.eql("getInfo");357 should(kuzzle.pluginsManager.getStrategyMethod.callCount).be.eql(0);358 });359 });360 });361 describe("#getCredentialsById", () => {362 it("should call the plugin getById method if it is provided", () => {363 const methodStub = sinon.stub().resolves({ foo: "bar" });364 request = new Request({365 controller: "security",366 action: "getCredentials",367 strategy: "someStrategy",368 _id: "someUserId",369 });370 kuzzle.pluginsManager.hasStrategyMethod.returns(true);371 kuzzle.pluginsManager.getStrategyMethod.returns(methodStub);372 return securityController.getCredentialsById(request).then((result) => {373 should(result).be.deepEqual({ foo: "bar" });374 should(kuzzle.pluginsManager.hasStrategyMethod).be.calledOnce();375 should(376 kuzzle.pluginsManager.hasStrategyMethod.firstCall.args[0]377 ).be.eql("someStrategy");378 should(379 kuzzle.pluginsManager.hasStrategyMethod.firstCall.args[1]380 ).be.eql("getById");381 should(kuzzle.pluginsManager.getStrategyMethod).be.calledOnce();382 should(383 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[0]384 ).be.eql("someStrategy");385 should(386 kuzzle.pluginsManager.getStrategyMethod.firstCall.args[1]387 ).be.eql("getById");388 should(methodStub).be.calledOnce();389 should(methodStub.firstCall.args[0]).be.eql(request);390 should(methodStub.firstCall.args[1]).be.eql("someUserId");391 should(methodStub.firstCall.args[2]).be.eql("someStrategy");392 });393 });394 it("should resolve to an empty object if getById method is not provided", () => {395 const methodStub = sinon.stub().resolves({ foo: "bar" });396 request = new Request({397 controller: "security",398 action: "getCredentials",399 strategy: "someStrategy",400 _id: "someUserId",401 });402 kuzzle.pluginsManager.hasStrategyMethod.returns(false);403 kuzzle.pluginsManager.getStrategyMethod.returns(methodStub);404 return securityController.getCredentialsById(request).then((result) => {405 should(result).be.deepEqual({});406 should(kuzzle.pluginsManager.hasStrategyMethod).be.calledOnce();407 should(408 kuzzle.pluginsManager.hasStrategyMethod.firstCall.args[0]409 ).be.eql("someStrategy");410 should(411 kuzzle.pluginsManager.hasStrategyMethod.firstCall.args[1]412 ).be.eql("getById");413 should(kuzzle.pluginsManager.getStrategyMethod.callCount).be.eql(0);414 });415 });416 });417 describe("#getCredentialFields", () => {418 it("should return the list of a strategy's fields", () => {419 request = new Request({420 controller: "security",421 action: "getCredentialFields",422 strategy: "someStrategy",423 });424 kuzzle.pluginsManager.getStrategyFields.returns([425 "aField",426 "anotherField",427 ]);428 return securityController.getCredentialFields(request).then((result) => {429 should(result).be.deepEqual(["aField", "anotherField"]);430 should(kuzzle.pluginsManager.getStrategyFields).be.calledOnce();431 should(432 kuzzle.pluginsManager.getStrategyFields.firstCall.args[0]433 ).be.eql("someStrategy");434 });435 });436 });437 describe("#getAllCredentialFields", () => {438 it("should return the list of all strategies' fields", () => {439 request = new Request({440 controller: "security",441 action: "getAllCredentialFields",442 strategy: "someStrategy",443 });444 kuzzle.pluginsManager.listStrategies.returns([445 "someStrategy",446 "someOtherStrategy",447 ]);448 kuzzle.pluginsManager.getStrategyFields.returns([449 "aField",450 "anotherField",451 ]);452 return securityController453 .getAllCredentialFields(request)454 .then((result) => {455 should(result).be.deepEqual({456 someStrategy: ["aField", "anotherField"],457 someOtherStrategy: ["aField", "anotherField"],458 });459 should(kuzzle.pluginsManager.getStrategyFields).be.calledTwice();460 should(461 kuzzle.pluginsManager.getStrategyFields.firstCall.args[0]462 ).be.eql("someStrategy");463 should(464 kuzzle.pluginsManager.getStrategyFields.secondCall.args[0]465 ).be.eql("someOtherStrategy");466 });467 });468 });...

Full Screen

Full Screen

10.py

Source:10.py Github

copy

Full Screen

...5 {6 'cases': [7 {8 'code': r"""9 >>> bacon_strategy(0, 9, cutoff=8, num_rolls=5)10 511 """,12 'hidden': False,13 'locked': False14 },15 {16 'code': r"""17 >>> bacon_strategy(9, 0, cutoff=6, num_rolls=5)18 019 """,20 'hidden': False,21 'locked': False22 },23 {24 'code': r"""25 >>> bacon_strategy(50, 2, cutoff=7, num_rolls=5)26 027 """,28 'hidden': False,29 'locked': False30 },31 {32 'code': r"""33 >>> bacon_strategy(32, 0, cutoff=8, num_rolls=4)34 435 """,36 'hidden': False,37 'locked': False38 },39 {40 'code': r"""41 >>> bacon_strategy(20, 1, cutoff=1, num_rolls=4)42 043 """,44 'hidden': False,45 'locked': False46 },47 {48 'code': r"""49 >>> from tests.check_strategy import check_strategy50 >>> check_strategy(bacon_strategy)51 """,52 'hidden': False,53 'locked': False54 }55 ],56 'scored': True,57 'setup': r"""58 >>> from hog import *59 """,60 'teardown': '',61 'type': 'doctest'62 },63 {64 'cases': [65 {66 'code': r"""67 >>> bacon_strategy(44, 47, 0, 4)68 069 """,70 'hidden': False,71 'locked': False72 },73 {74 'code': r"""75 >>> bacon_strategy(37, 12, 8, 10)76 077 """,78 'hidden': False,79 'locked': False80 },81 {82 'code': r"""83 >>> bacon_strategy(40, 15, 13, 9)84 985 """,86 'hidden': False,87 'locked': False88 },89 {90 'code': r"""91 >>> bacon_strategy(24, 3, 8, 1)92 193 """,94 'hidden': False,95 'locked': False96 },97 {98 'code': r"""99 >>> bacon_strategy(46, 55, 5, 2)100 0101 """,102 'hidden': False,103 'locked': False104 },105 {106 'code': r"""107 >>> bacon_strategy(99, 78, 15, 7)108 7109 """,110 'hidden': False,111 'locked': False112 },113 {114 'code': r"""115 >>> bacon_strategy(10, 73, 3, 5)116 0117 """,118 'hidden': False,119 'locked': False120 },121 {122 'code': r"""123 >>> bacon_strategy(47, 68, 3, 4)124 0125 """,126 'hidden': False,127 'locked': False128 },129 {130 'code': r"""131 >>> bacon_strategy(67, 84, 17, 10)132 10133 """,134 'hidden': False,135 'locked': False136 },137 {138 'code': r"""139 >>> bacon_strategy(92, 54, 1, 7)140 0141 """,142 'hidden': False,143 'locked': False144 },145 {146 'code': r"""147 >>> bacon_strategy(9, 15, 0, 2)148 0149 """,150 'hidden': False,151 'locked': False152 },153 {154 'code': r"""155 >>> bacon_strategy(25, 63, 16, 2)156 2157 """,158 'hidden': False,159 'locked': False160 },161 {162 'code': r"""163 >>> bacon_strategy(75, 27, 6, 2)164 0165 """,166 'hidden': False,167 'locked': False168 },169 {170 'code': r"""171 >>> bacon_strategy(82, 48, 10, 1)172 1173 """,174 'hidden': False,175 'locked': False176 },177 {178 'code': r"""179 >>> bacon_strategy(88, 12, 7, 10)180 0181 """,182 'hidden': False,183 'locked': False184 },185 {186 'code': r"""187 >>> bacon_strategy(72, 12, 5, 8)188 0189 """,190 'hidden': False,191 'locked': False192 },193 {194 'code': r"""195 >>> bacon_strategy(41, 69, 1, 5)196 0197 """,198 'hidden': False,199 'locked': False200 },201 {202 'code': r"""203 >>> bacon_strategy(15, 6, 16, 7)204 7205 """,206 'hidden': False,207 'locked': False208 },209 {210 'code': r"""211 >>> bacon_strategy(42, 19, 5, 2)212 0213 """,214 'hidden': False,215 'locked': False216 },217 {218 'code': r"""219 >>> bacon_strategy(93, 98, 8, 4)220 0221 """,222 'hidden': False,223 'locked': False224 },225 {226 'code': r"""227 >>> bacon_strategy(99, 90, 15, 10)228 10229 """,230 'hidden': False,231 'locked': False232 },233 {234 'code': r"""235 >>> bacon_strategy(73, 79, 4, 1)236 0237 """,238 'hidden': False,239 'locked': False240 },241 {242 'code': r"""243 >>> bacon_strategy(4, 44, 0, 5)244 0245 """,246 'hidden': False,247 'locked': False248 },249 {250 'code': r"""251 >>> bacon_strategy(83, 40, 9, 7)252 7253 """,254 'hidden': False,255 'locked': False256 },257 {258 'code': r"""259 >>> bacon_strategy(34, 3, 0, 8)260 0261 """,262 'hidden': False,263 'locked': False264 },265 {266 'code': r"""267 >>> bacon_strategy(4, 62, 15, 7)268 7269 """,270 'hidden': False,271 'locked': False272 },273 {274 'code': r"""275 >>> bacon_strategy(53, 62, 6, 1)276 0277 """,278 'hidden': False,279 'locked': False280 },281 {282 'code': r"""283 >>> bacon_strategy(19, 56, 8, 9)284 0285 """,286 'hidden': False,287 'locked': False288 },289 {290 'code': r"""291 >>> bacon_strategy(1, 5, 0, 4)292 0293 """,294 'hidden': False,295 'locked': False296 },297 {298 'code': r"""299 >>> bacon_strategy(85, 34, 8, 1)300 0301 """,302 'hidden': False,303 'locked': False304 },305 {306 'code': r"""307 >>> bacon_strategy(37, 37, 13, 5)308 5309 """,310 'hidden': False,311 'locked': False312 },313 {314 'code': r"""315 >>> bacon_strategy(82, 87, 16, 3)316 3317 """,318 'hidden': False,319 'locked': False320 },321 {322 'code': r"""323 >>> bacon_strategy(87, 43, 5, 7)324 0325 """,326 'hidden': False,327 'locked': False328 },329 {330 'code': r"""331 >>> bacon_strategy(20, 7, 2, 3)332 0333 """,334 'hidden': False,335 'locked': False336 },337 {338 'code': r"""339 >>> bacon_strategy(33, 85, 4, 4)340 4341 """,342 'hidden': False,343 'locked': False344 },345 {346 'code': r"""347 >>> bacon_strategy(73, 15, 12, 8)348 8349 """,350 'hidden': False,351 'locked': False352 },353 {354 'code': r"""355 >>> bacon_strategy(5, 98, 8, 2)356 0357 """,358 'hidden': False,359 'locked': False360 },361 {362 'code': r"""363 >>> bacon_strategy(15, 76, 3, 4)364 0365 """,366 'hidden': False,367 'locked': False368 },369 {370 'code': r"""371 >>> bacon_strategy(33, 75, 19, 5)372 5373 """,374 'hidden': False,375 'locked': False376 },377 {378 'code': r"""379 >>> bacon_strategy(9, 41, 0, 5)380 0381 """,382 'hidden': False,383 'locked': False384 },385 {386 'code': r"""387 >>> bacon_strategy(70, 91, 7, 6)388 6389 """,390 'hidden': False,391 'locked': False392 },393 {394 'code': r"""395 >>> bacon_strategy(64, 35, 12, 3)396 3397 """,398 'hidden': False,399 'locked': False400 },401 {402 'code': r"""403 >>> bacon_strategy(51, 92, 14, 8)404 8405 """,406 'hidden': False,407 'locked': False408 },409 {410 'code': r"""411 >>> bacon_strategy(68, 64, 17, 6)412 6413 """,414 'hidden': False,415 'locked': False416 },417 {418 'code': r"""419 >>> bacon_strategy(20, 35, 17, 4)420 4421 """,422 'hidden': False,423 'locked': False424 },425 {426 'code': r"""427 >>> bacon_strategy(75, 30, 3, 1)428 0429 """,430 'hidden': False,431 'locked': False432 },433 {434 'code': r"""435 >>> bacon_strategy(61, 69, 8, 5)436 0437 """,438 'hidden': False,439 'locked': False440 },441 {442 'code': r"""443 >>> bacon_strategy(7, 6, 7, 9)444 9445 """,446 'hidden': False,447 'locked': False448 },449 {450 'code': r"""451 >>> bacon_strategy(0, 51, 17, 5)452 5453 """,454 'hidden': False,455 'locked': False456 },457 {458 'code': r"""459 >>> bacon_strategy(42, 45, 8, 6)460 0461 """,462 'hidden': False,463 'locked': False464 },465 {466 'code': r"""467 >>> bacon_strategy(48, 96, 11, 2)468 2469 """,470 'hidden': False,471 'locked': False472 },473 {474 'code': r"""475 >>> bacon_strategy(57, 96, 9, 6)476 0477 """,478 'hidden': False,479 'locked': False480 },481 {482 'code': r"""483 >>> bacon_strategy(28, 11, 13, 8)484 8485 """,486 'hidden': False,487 'locked': False488 },489 {490 'code': r"""491 >>> bacon_strategy(25, 29, 5, 7)492 0493 """,494 'hidden': False,495 'locked': False496 },497 {498 'code': r"""499 >>> bacon_strategy(69, 2, 15, 8)500 8501 """,502 'hidden': False,503 'locked': False504 },505 {506 'code': r"""507 >>> bacon_strategy(77, 26, 7, 9)508 0509 """,510 'hidden': False,511 'locked': False512 },513 {514 'code': r"""515 >>> bacon_strategy(85, 15, 0, 3)516 0517 """,518 'hidden': False,519 'locked': False520 },521 {522 'code': r"""523 >>> bacon_strategy(79, 86, 5, 7)524 0525 """,526 'hidden': False,527 'locked': False528 },529 {530 'code': r"""531 >>> bacon_strategy(35, 32, 14, 6)532 6533 """,534 'hidden': False,535 'locked': False536 },537 {538 'code': r"""539 >>> bacon_strategy(49, 44, 13, 8)540 8541 """,542 'hidden': False,543 'locked': False544 },545 {546 'code': r"""547 >>> bacon_strategy(77, 65, 6, 4)548 4549 """,550 'hidden': False,551 'locked': False552 },553 {554 'code': r"""555 >>> bacon_strategy(99, 18, 2, 1)556 0557 """,558 'hidden': False,559 'locked': False560 },561 {562 'code': r"""563 >>> bacon_strategy(18, 24, 17, 10)564 10565 """,566 'hidden': False,567 'locked': False568 },569 {570 'code': r"""571 >>> bacon_strategy(44, 11, 18, 1)572 1573 """,574 'hidden': False,575 'locked': False576 },577 {578 'code': r"""579 >>> bacon_strategy(68, 38, 17, 5)580 5581 """,582 'hidden': False,583 'locked': False584 },585 {586 'code': r"""587 >>> bacon_strategy(46, 63, 8, 6)588 6589 """,590 'hidden': False,591 'locked': False592 },593 {594 'code': r"""595 >>> bacon_strategy(20, 60, 19, 6)596 6597 """,598 'hidden': False,599 'locked': False600 },601 {602 'code': r"""603 >>> bacon_strategy(67, 53, 10, 6)604 6605 """,606 'hidden': False,607 'locked': False608 },609 {610 'code': r"""611 >>> bacon_strategy(63, 39, 4, 1)612 0613 """,614 'hidden': False,615 'locked': False616 },617 {618 'code': r"""619 >>> bacon_strategy(54, 75, 9, 8)620 0621 """,622 'hidden': False,623 'locked': False624 },625 {626 'code': r"""627 >>> bacon_strategy(78, 86, 18, 9)628 9629 """,630 'hidden': False,631 'locked': False632 },633 {634 'code': r"""635 >>> bacon_strategy(45, 11, 8, 9)636 0637 """,638 'hidden': False,639 'locked': False640 },641 {642 'code': r"""643 >>> bacon_strategy(88, 19, 14, 6)644 6645 """,646 'hidden': False,647 'locked': False648 },649 {650 'code': r"""651 >>> bacon_strategy(22, 18, 14, 1)652 1653 """,654 'hidden': False,655 'locked': False656 },657 {658 'code': r"""659 >>> bacon_strategy(30, 91, 9, 10)660 10661 """,662 'hidden': False,663 'locked': False664 },665 {666 'code': r"""667 >>> bacon_strategy(19, 81, 8, 1)668 0669 """,670 'hidden': False,671 'locked': False672 },673 {674 'code': r"""675 >>> bacon_strategy(33, 7, 0, 2)676 0677 """,678 'hidden': False,679 'locked': False680 },681 {682 'code': r"""683 >>> bacon_strategy(87, 95, 11, 6)684 6685 """,686 'hidden': False,687 'locked': False688 },689 {690 'code': r"""691 >>> bacon_strategy(69, 86, 8, 10)692 10693 """,694 'hidden': False,695 'locked': False696 },697 {698 'code': r"""699 >>> bacon_strategy(87, 61, 10, 4)700 4701 """,702 'hidden': False,703 'locked': False704 },705 {706 'code': r"""707 >>> bacon_strategy(47, 60, 6, 4)708 0709 """,710 'hidden': False,711 'locked': False712 },713 {714 'code': r"""715 >>> bacon_strategy(67, 65, 14, 4)716 4717 """,718 'hidden': False,719 'locked': False720 },721 {722 'code': r"""723 >>> bacon_strategy(3, 66, 3, 7)724 0725 """,726 'hidden': False,727 'locked': False728 },729 {730 'code': r"""731 >>> bacon_strategy(82, 23, 8, 8)732 8733 """,734 'hidden': False,735 'locked': False736 },737 {738 'code': r"""739 >>> bacon_strategy(42, 89, 14, 1)740 1741 """,742 'hidden': False,743 'locked': False744 },745 {746 'code': r"""747 >>> bacon_strategy(32, 13, 4, 4)748 0749 """,750 'hidden': False,751 'locked': False752 },753 {754 'code': r"""755 >>> bacon_strategy(20, 96, 12, 4)756 4757 """,758 'hidden': False,759 'locked': False760 },761 {762 'code': r"""763 >>> bacon_strategy(77, 59, 15, 7)764 7765 """,766 'hidden': False,767 'locked': False768 },769 {770 'code': r"""771 >>> bacon_strategy(88, 32, 15, 2)772 2773 """,774 'hidden': False,775 'locked': False776 },777 {778 'code': r"""779 >>> bacon_strategy(19, 30, 4, 7)780 0781 """,782 'hidden': False,783 'locked': False784 },785 {786 'code': r"""787 >>> bacon_strategy(91, 29, 18, 4)788 4789 """,790 'hidden': False,791 'locked': False792 },793 {794 'code': r"""795 >>> bacon_strategy(50, 46, 10, 3)796 3797 """,798 'hidden': False,799 'locked': False800 },801 {802 'code': r"""803 >>> bacon_strategy(42, 67, 18, 7)804 7805 """,806 'hidden': False,807 'locked': False808 },809 {810 'code': r"""811 >>> bacon_strategy(37, 91, 4, 9)812 0813 """,814 'hidden': False,815 'locked': False816 },817 {818 'code': r"""819 >>> bacon_strategy(59, 82, 0, 6)820 0821 """,822 'hidden': False,823 'locked': False824 },825 {826 'code': r"""827 >>> bacon_strategy(22, 41, 19, 7)828 7829 """,830 'hidden': False,831 'locked': False832 },833 {834 'code': r"""835 >>> bacon_strategy(84, 90, 6, 5)836 0837 """,838 'hidden': False,839 'locked': False840 },841 {842 'code': r"""843 >>> bacon_strategy(90, 35, 9, 4)844 0845 """,846 'hidden': False,847 'locked': False848 },849 {850 'code': r"""851 >>> bacon_strategy(90, 42, 1, 5)852 0853 """,854 'hidden': False,855 'locked': False856 },857 {858 'code': r"""859 >>> bacon_strategy(1, 35, 8, 10)860 0861 """,862 'hidden': False,863 'locked': False864 }865 ],866 'scored': True,867 'setup': r"""868 >>> from hog import *869 """,870 'teardown': '',871 'type': 'doctest'872 }873 ]...

Full Screen

Full Screen

test_strategy.py

Source:test_strategy.py Github

copy

Full Screen

...9from freqtrade.strategy import import_strategy10from freqtrade.strategy.default_strategy import DefaultStrategy11from freqtrade.strategy.interface import IStrategy12from freqtrade.resolvers import StrategyResolver13def test_import_strategy(caplog):14 caplog.set_level(logging.DEBUG)15 default_config = {}16 strategy = DefaultStrategy(default_config)17 strategy.some_method = lambda *args, **kwargs: 4218 assert strategy.__module__ == 'freqtrade.strategy.default_strategy'19 assert strategy.some_method() == 4220 imported_strategy = import_strategy(strategy, default_config)21 assert dir(strategy) == dir(imported_strategy)22 assert imported_strategy.__module__ == 'freqtrade.strategy'23 assert imported_strategy.some_method() == 4224 assert (25 'freqtrade.strategy',26 logging.DEBUG,27 'Imported strategy freqtrade.strategy.default_strategy.DefaultStrategy '28 'as freqtrade.strategy.DefaultStrategy',29 ) in caplog.record_tuples30def test_search_strategy():31 default_config = {}32 default_location = Path(__file__).parent.parent.joinpath('strategy').resolve()33 assert isinstance(34 StrategyResolver._search_object(35 directory=default_location,36 object_type=IStrategy,37 kwargs={'config': default_config},38 object_name='DefaultStrategy'39 ),40 IStrategy41 )42 assert StrategyResolver._search_object(43 directory=default_location,44 object_type=IStrategy,45 kwargs={'config': default_config},46 object_name='NotFoundStrategy'47 ) is None48def test_load_strategy(result):49 resolver = StrategyResolver({'strategy': 'TestStrategy'})50 metadata = {'pair': 'ETH/BTC'}51 assert 'adx' in resolver.strategy.advise_indicators(result, metadata=metadata)52def test_load_strategy_byte64(result):53 with open("freqtrade/tests/strategy/test_strategy.py", "r") as file:54 encoded_string = urlsafe_b64encode(file.read().encode("utf-8")).decode("utf-8")55 resolver = StrategyResolver({'strategy': 'TestStrategy:{}'.format(encoded_string)})56 assert 'adx' in resolver.strategy.advise_indicators(result, 'ETH/BTC')57def test_load_strategy_invalid_directory(result, caplog):58 resolver = StrategyResolver()59 extra_dir = path.join('some', 'path')60 resolver._load_strategy('TestStrategy', config={}, extra_dir=extra_dir)61 assert (62 'freqtrade.resolvers.strategy_resolver',63 logging.WARNING,64 'Path "{}" does not exist'.format(extra_dir),65 ) in caplog.record_tuples66 assert 'adx' in resolver.strategy.advise_indicators(result, {'pair': 'ETH/BTC'})67def test_load_not_found_strategy():68 strategy = StrategyResolver()69 with pytest.raises(ImportError,70 match=r"Impossible to load Strategy 'NotFoundStrategy'."71 r" This class does not exist or contains Python code errors"):72 strategy._load_strategy(strategy_name='NotFoundStrategy', config={})73def test_strategy(result):74 config = {'strategy': 'DefaultStrategy'}75 resolver = StrategyResolver(config)76 metadata = {'pair': 'ETH/BTC'}77 assert resolver.strategy.minimal_roi[0] == 0.0478 assert config["minimal_roi"]['0'] == 0.0479 assert resolver.strategy.stoploss == -0.1080 assert config['stoploss'] == -0.1081 assert resolver.strategy.ticker_interval == '5m'82 assert config['ticker_interval'] == '5m'83 df_indicators = resolver.strategy.advise_indicators(result, metadata=metadata)84 assert 'adx' in df_indicators85 dataframe = resolver.strategy.advise_buy(df_indicators, metadata=metadata)86 assert 'buy' in dataframe.columns87 dataframe = resolver.strategy.advise_sell(df_indicators, metadata=metadata)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run pandera automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful