How to use lock_server method in tempest

Best Python code snippet using tempest_python

paxos_proposers.py

Source:paxos_proposers.py Github

copy

Full Screen

1#/usr/bin/env python32import random3import time4import signal5import sys6import json7import socket8import threading9import numpy as np10from typing import List, Any, Dict, Set11from socketserver import TCPServer, BaseRequestHandler, ThreadingMixIn12OKGREEN = '\033[92m'13YELLOW = '\033[93m'14ENDC = '\033[0m'15PORT_ACCEPTOR = 10200 # group port range start, port = PORT + server_id16PORT_PROPOSER = 1021017PORT_REDIRECT = 1022018PORT_FETCH = 1023019PORT_HEARTBEAT = 10240 # group heartbeat port range start20WAITING_TIMEOUT = 3 # time to wait before resending potentially lost msgs21# enums for state machine command sequence22S_UNSURE = 'undefined'23S_NO_OP = 'no-op'24S_ELECT_LEADER_PREFIX = 'S_ELECT_LEADER_'25def S_ELECT_LEADER(leader_id):26 return S_ELECT_LEADER_PREFIX + str(leader_id)27# enums for paxos message types28T_PREP = 'prepare'29T_PREP_R = 'prepare-reply'30T_ACC = 'accept'31T_ACC_R = 'accept-reply'32T_LEARN = 'learn'33# phases34P_PREP = 035P_ACC = 136P_LEARN = 237# waiting status, used in resend logic to handle network outage38W_OK = 039W_WAITING_PREPARE_REPLIES = 140W_WAITING_ACCEPT_REPLIES = 241# TODO: waiting on learn?42def INIT_PROGRESS():43 return {44 'phase': P_PREP,45 'base_n': 0, # proposal number base46 'highest_proposed_v': '',47 'highest_proposed_n': -1, # highest proposal number I've promised48 'prepare': {}, # stores prepare responses49 'accepted_reply': {} # stores accept responses50 }51class HeartBeatRecvHandler(BaseRequestHandler):52 def handle(self):53 data = self.request.recv(1024).strip().decode('utf-8')54 try:55 self.server.recv_heartbeat(data)56 except ValueError:57 self.server.lock_server.log('[HeartbeatRecv] Could not parse the data as JSON: {}'.format(data))58 finally:59 # close the connection because everything is async60 self.request.close()61class HeartBeatRecvServer(TCPServer):62 allow_reuse_address = True63 last_heartbeat = None64 def __init__ (self, sid: int, lock_server):65 addr = ('localhost', PORT_HEARTBEAT + sid)66 self.heartrecvaddr = addr67 self.sid = sid68 self.lock_server = lock_server69 self.leaderHeartBeat = (-1, None)70 TCPServer.__init__(self, addr, HeartBeatRecvHandler)71 72 def recv_heartbeat(self, data):73 infos = data.split('_')74 leader_id = int(infos[1])75 self.lock_server.pastSlotId = int(infos[2])76 # if len(infos) > 2:77 # for singleInfo in infos[3:]:78 # if singleInfo[0] == 'a':79 # self.lock_server.failed_acceptors.add(int(singleInfo[1:]))80 # if singleInfo[0] == 'p':81 # self.lock_server.failed_proposers.add(int(singleInfo[1:]))82 # self.lock_server.log("Receive heart beat from Leader %s" % leader_id)83 if self.lock_server.leader == -1 and leader_id >= 0:84 self.lock_server.log(OKGREEN + "Change leader to {}".format(leader_id) + ENDC)85 self.lock_server.leader = leader_id86 self.leaderHeartBeat = (self.lock_server.leader, time.time())87 if self.lock_server.recover is True:88 self.request_decide() 89 elif self.lock_server.leader == leader_id:90 self.leaderHeartBeat = (self.lock_server.leader, time.time())91 elif self.lock_server.leader != leader_id:92 self.lock_server.log(OKGREEN + "Change leader to {}".format(leader_id) + ENDC)93 self.lock_server.leader = leader_id94 self.leaderHeartBeat = (self.lock_server.leader, time.time())95 if self.lock_server.recover is True:96 self.request_decide()97 # self.lock_server.log('Proposer #{} update heartbeat "{}" from Leader #{}'.format(98 # self.lock_server.server_id, self.leaderHeartBeat, leader_id))99 100 def request_decide(self):101 send_msg = "requestDecide_%d" % (self.lock_server.server_id)102 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)103 addr = ('localhost', PORT_PROPOSER + self.lock_server.leader)104 sock.settimeout(0.5)105 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)106 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)107 try:108 sock.connect(addr)109 sock.sendall(send_msg.encode('utf-8'))110 except ConnectionRefusedError:111 self.lock_server.log("Recover {} recover fail, please kill the process".format(self.lock_server.server_id))112 pass113 except socket.timeout as e:114 pass115 finally:116 sock.close()117class HeartBeatRecvThread(threading.Thread):118 def __init__(self, sid: int, lock_server):119 threading.Thread.__init__(self)120 self.server = HeartBeatRecvServer(sid, lock_server)121 122 def run(self):123 self.server.lock_server.log("Proposor %d start to run heartbeat receiver at PORT %s" % (self.server.sid, self.server.heartrecvaddr))124 self.server.serve_forever()125class HeartBeatCheckerThread(threading.Thread):126 def __init__(self, heartbeat_server, lock_server):127 threading.Thread.__init__(self)128 self.heartbeat_server = heartbeat_server129 self.lock_server = lock_server130 self.stopped = False131 def run(self):132 while not self.stopped:133 if self.lock_server.is_leader() is False:134 # self.lock_server.log('heartbeat information {}'.format(self.heartbeat_server.leaderHeartBeat))135 if self.heartbeat_server.leaderHeartBeat[0] >= 0 and self.heartbeat_server.leaderHeartBeat[1] is not None:136 wait_time = (self.lock_server.server_id) * 1 + 1137 if float(self.heartbeat_server.leaderHeartBeat[1]) + wait_time < time.time():138 self.lock_server.log('heartbeat check failed, trigger election')139 print("TimeOut = ", time.time() - self.heartbeat_server.leaderHeartBeat[1] + wait_time)140 self.lock_server.failed_proposers.add(self.lock_server.leader)141 self.lock_server.leader = -1142 self.lock_server.electLeader(increaseProposalNum=True)143 while self.lock_server.leader == -1:144 time.sleep(self.lock_server.abortWaitTime)145 self.lock_server.electLeader(increaseProposalNum=True)146 # break147 else:148 self.abortWaitTime = 1 + self.lock_server.server_id * 0.2149 else:150 self.abortWaitTime = 1 + self.lock_server.server_id * 0.2151 time.sleep(1.0)152class HeartBeatSenderThread(threading.Thread):153 def __init__(self, lock_server):154 threading.Thread.__init__(self)155 self.lock_server = lock_server156 self.stopped = False157 def run(self):158 time.sleep(0.5)159 self.lock_server.log("Proposor %d start to serves heartbeat service" % (self.lock_server.server_id))160 while not self.stopped:161 self.lock_server.send_heartbeat()162 if self.lock_server.recover is True and self.lock_server.leader >= 0:163 data = self.send_requestRecover()164 if data.split('_')[-1].lower() == "end":165 self.lock_server.log("Recover {}, recover success!".format(self.lock_server.server_id))166 self.lock_server.recover = False167 else:168 Cliend_Id, CmdId, SlotId, Cmd = [x for x in data.split('_')[1:]]169 Cliend_Id = int(Cliend_Id)170 CmdId = int(CmdId)171 SlotId = int(SlotId)172 self.lock_server.pastCommands[(Cliend_Id, CmdId)] = (SlotId, Cmd)173 time.sleep(0.5)174 def send_requestRecover(self):175 send_msg = 'requestRecover_{}'.format(self.lock_server.server_id)176 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)177 addr = ('localhost', PORT_PROPOSER + self.lock_server.leader)178 self.lock_server.log('Recover {} sending "{}" to Leader {}'.format(179 self.lock_server.server_id, send_msg, self.lock_server.leader))180 try:181 sock.connect(addr)182 sock.sendall(send_msg.encode('utf-8'))183 try:184 rtMessage = sock.recv(1024).decode('utf-8')185 except ConnectionResetError as e:186 self.lock_server.log("Recover {} recover fail, please kill the process".format(self.lock_server.server_id))187 raise e188 except socket.timeout as e:189 print("Recover node connect to leader Node {} is Time Out!, please kill the process".format(self.lock_server.server_id))190 raise e191 except ConnectionRefusedError as e:192 self.lock_server.log("Recover {} recover fail, please kill the process".format(self.lock_server.server_id))193 raise e194 finally:195 sock.close()196 return rtMessage197class FetchRecvHandler(BaseRequestHandler):198 def handle(self):199 data = self.request.recv(1024).strip().decode('utf-8')200 infos = data.split('_')201 clientId = int(infos[1])202 lockId = int(infos[2])203 rtMessage = self.server.fetchData(clientId, lockId)204 try:205 self.server.lock_server.log('Sendding result "{}" to client {}'.format(rtMessage, clientId))206 self.request.sendall(rtMessage.encode('utf-8'))207 except ValueError:208 self.server.lock_server.log('[FetchRecv] Could not understand the fetch')209 finally:210 # close the connection because everything is async211 self.request.close()212class FetchRecvServer(TCPServer):213 allow_reuse_address = True214 last_heartbeat = None215 def __init__ (self, sid: int, lock_server):216 addr = ('localhost', PORT_FETCH + sid)217 self.sid = sid218 self.lock_server = lock_server219 TCPServer.__init__(self, addr, FetchRecvHandler)220 221 def fetchData(self, clientId, lockId):222 self.lock_server.log('Client {} fetching data "{},lock No.{}" from Proposor {}'.format(clientId, clientId, lockId, self.lock_server.server_id))223 pastCmdList = []224 printStr = '{'225 for k, v in self.lock_server.pastCommands.items():226 printStr += '(Client ID: %d, Lamport TimeStamp: %d, Request: %s)' % (k[0], k[1], v[1])227 printStr += '}' 228 self.lock_server.log('Client %d check status of lock %d' % (clientId, lockId))229 self.lock_server.log(OKGREEN + 'Current Processed Sequence Request: {}'.format(printStr) + ENDC)230 for k, v in self.lock_server.pastCommands.items():231 pastCmdList.append((k[0], k[1], v[0], v[1])) # (Cliend Id, CmdId, SlotId, Cmd)232 pastCmdList.sort(key = lambda x: (x[2], x[0], x[1])) # (SlotId, Cliend Id, CmdId)233 self.lock_server.log('Print Past Command = {}'.format(pastCmdList))234 self.lock_server.locks = [None for _ in self.lock_server.locks]235 for cmd in pastCmdList:236 self.execute(cmd[0], cmd[1], cmd[3])237 if self.lock_server.locks[lockId] == None:238 rtMessage = "Lock %d is free." % lockId239 else:240 rtMessage = "Lock %d is owned by %d." % (lockId, self.lock_server.locks[lockId])241 # elif self.lock_server.locks[lockId] != clientId:242 # rtMessage = "Lock %d is owned by %d." % (lockId, self.lock_server.locks[lockId])243 # elif self.lock_server.locks[lockId] == clientId:244 # rtMessage = "Lock %d belongs to you." % lockId245 self.lock_server.locks = [None for _ in self.lock_server.locks]246 # self.lock_server.log('Data "{},lock No.{}" Result: "{}"'.format(clientId, lockId, rtMessage))247 248 return rtMessage249 250 def execute(self, clientId: int, cmdId: int, command: str):251 if "l" == command[0]:252 lockNode = int(command.split('-')[-1])253 if self.lock_server.locks[lockNode] == None:254 print("Lock %d is locked by %d" % (lockNode, clientId))255 self.lock_server.locks[lockNode] = clientId256 elif self.lock_server.locks[lockNode] == clientId:257 print("Lock %d is already locked by %d" % (lockNode, clientId))258 pass259 elif self.lock_server.locks[lockNode] != clientId:260 print("Lock %d cannot be locked by %d" % (lockNode, clientId))261 pass262 elif "u" == command[0]:263 unlockNode = int(command.split('-')[-1])264 if self.lock_server.locks[unlockNode] == None:265 print("Lock %d is already freed" % unlockNode)266 pass267 elif self.lock_server.locks[unlockNode] == clientId:268 self.lock_server.locks[unlockNode] = None269 print("Lock %d is unlocked by %d" % (unlockNode, clientId))270 elif self.lock_server.locks[unlockNode] != clientId:271 print("Lock %d cannot be unlocked by %d" % (unlockNode, clientId))272 pass273class FetchRecvThread(threading.Thread):274 def __init__(self, sid: int, lock_server):275 threading.Thread.__init__(self)276 self.server = FetchRecvServer(sid, lock_server)277 278 def run(self):279 self.server.lock_server.log("Proposor %d start to serves fetch service" % (self.server.sid))280 self.server.serve_forever()281class RedirectSenderThread(threading.Thread):282 def __init__(self, lock_server):283 threading.Thread.__init__(self)284 self.lock_server = lock_server285 self.stopped = False286 287 def run(self):288 time.sleep(1.0)289 self.lock_server.log("Proposor %d start to serves redirect service" % (self.lock_server.server_id))290 while not self.stopped:291 self.lock_server.redirect()292 time.sleep(1.0)293class LockHandler(BaseRequestHandler):294 '''295 Override the handle method to handle each request296 '''297 def handle(self):298 self.server.log("acquiring...")299 # with self.server.handler_lock:300 data = self.request.recv(1024).strip().decode('utf-8')301 self.server.log('Received message {}'.format(data), verbose=True)302 try:303 # data = json.loads(data)304 infos = data.split('_')305 self.server.log('Infos = {}'.format(infos), verbose=True)306 if infos[0] == 'client':307 self.server.processClient(infos)308 reply_msg = \309 'Request (Client ID: %d, Lamport TimeStamp: %d, Request: %s) has been processed' % \310 (int(infos[1]), int(infos[2]), infos[3])311 self.request.sendall(reply_msg.encode('utf-8'))312 elif infos[0] == 'promise':313 self.server.processPromise(infos)314 elif infos[0] == 'accepted':315 self.server.processAccepted(infos)316 elif infos[0] == 'decide':317 reply_msg = self.server.processDecide(infos)318 self.request.sendall(reply_msg.encode('utf-8'))319 elif infos[0] == 'leader':320 reply_msg = self.server.processLeader(infos)321 self.request.sendall(reply_msg.encode('utf-8'))322 elif infos[0] == 'requestDecide':323 self.server.generateRecoverCommands(int(infos[1]))324 self.server.sendDecideProposers.add(int(infos[1]))325 elif infos[0] == 'requestRecover':326 reply_msg = self.server.processRecoverCommands(int(infos[1]))327 self.request.settimeout(1)328 try:329 self.request.sendall(reply_msg.encode('utf-8'))330 except ConnectionRefusedError:331 print("Connection to recover Node {} is Refused!".format(int(infos[1])))332 except socket.timeout as e:333 print("Connection to recover Node {} is Time Out!".format(int(infos[1])))334 finally:335 pass336 self.request.settimeout(None)337 if reply_msg.split('_')[-1].lower() == 'end':338 self.server.sendDecideProposers.discard(int(infos[1]))339 elif infos[0] == 'recover':340 self.server.failed_proposers.discard(int(infos[1]))341 self.server.failed_acceptors.discard(int(infos[1]))342 # else:343 # self.server.receive_msg(self.request, data)344 except ValueError:345 self.server.log('Could not parse the data as String: {}'.format(data))346 finally:347 # close the connection because everything is async348 self.request.close()349 self.server.log("LockHandler.handle done")350class LockServer(TCPServer):351 # whether the server will allow the reuse of an address352 allow_reuse_address = True353 leader = -1 # initial leader354 is_electing = False # if this node is during the election period355 # state machine356 next_exe = 0 # the next executable command index357 sendDecideProposers = set()358 # an array of command sequence359 # the i-th element corresponds to the result of the i-th paxos instance360 # undecided commands are marked by S_UNSURE361 states = [] # type: List[str]362 pastCommands = {} # Key = (clientId, cmdId), Value = (SlotId, Command, Result)363 recoverCommands = {} # Key = RecoverId, Value = (Cliend Id, CmdId, SlotId, Cmd)364 futureCommands = {} # Key = (clientId, cmdId), Value = Command Message365 locks = [None for _ in range(11)] # locked by client n or None366 proposerNumber = None367 msg_count = 0368 slotId = 0369 pastSlotId = -1370 # stores per instance information371 # leader: prepare responses, accept responses, proposal number372 # follower: highest promised proposal number, highest accept373 progress = {} # type: Dict[int, Any]374 failed_proposers = set() # type: Set[int]375 failed_acceptors = set()376 377 handler_lock = threading.Lock()378 waiting = W_OK379 waiting_settime = 0380 wait_args = None381 # constructor382 def __init__ (self, sid: int, total: int, recover=False):383 self.server_id = sid384 self.total_nodes = total385 self.proposal_Number = total - sid386 self.abortWaitTime = 1 + self.server_id * 0.2387 self.recover = recover388 addr = ('localhost', PORT_PROPOSER + sid)389 TCPServer.__init__(self, addr, LockHandler)390 self.waitTime = 1 + sid * 0.2391 if recover is False:392 while(self.leader == -1 and self.server_id == 0):393 time.sleep(self.abortWaitTime)394 self.electLeader()395 def generateRecoverCommands(self, recoverId):396 if len(self.recoverCommands.get(recoverId, [])) == 0:397 self.log("Generate Recover Command for Recover Node %d" % recoverId)398 self.recoverCommands[recoverId] = []399 for xx in self.pastCommands.items(): # ((Cliend Id, CmdId), (SlotId, Cmd))400 print("Recover Command Generated = ", xx)401 self.recoverCommands[recoverId].append([str(xx[0][0]), str(xx[0][1]), str(xx[1][0]), str(xx[1][1])])402 403 # TODO: Add this back, send heartbeat to followers404 def send_heartbeat(self):405 if self.server_id == self.leader:406 msg = self.server_id407 send_msg = "heart_%d_%d" % (self.server_id, self.slotId)408 for i in range(self.total_nodes):409 if not i == self.server_id:410 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)411 addr = ('localhost', PORT_HEARTBEAT + i)412 # self.log('Leader #{} sending "{}" to Proposor #{}'.format(413 # self.server_id, send_msg, i))414 try:415 sock.connect(addr)416 sock.sendall(send_msg.encode('utf-8'))417 except ConnectionRefusedError:418 pass419 # print("Connection to proposer {} is Refused!".format(i))420 # if i not in self.failed_proposers: 421 # self.failed_proposers.add(i)422 finally:423 sock.close()424 # TODO: Add this back, send heartbeat to followers425 def processRecoverCommands(self, recoverId):426 # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)427 # sock.settimeout(0.5)428 # sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)429 # sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)430 # addr = ('localhost', PORT_PROPOSER + recoverId)431 if len(self.recoverCommands[recoverId]) > 0:432 send_msg = "recoverCommand_%s" % '_'.join(self.recoverCommands[recoverId].pop())433 else:434 send_msg = "recoverCommand_end"435 # try:436 # sock.connect(addr)437 # sock.sendall(send_msg.encode('utf-8'))438 # except ConnectionRefusedError:439 # print("Connection to recover Node {} is Refused!".format(recoverId))440 # send_msg = 'fail'441 # except socket.timeout as e:442 # print("Connection to recover Node {} is Time Out!".format(recoverId))443 # send_msg = 'fail'444 # finally:445 # sock.close()446 return send_msg447 # TODO: Add this back, send message to leaders448 def redirect(self):449 removeCmds = []450 for k in self.futureCommands.keys():451 if self.pastCommands.get(k) is not None:452 removeCmds.append(k)453 for rmk in removeCmds:454 self.futureCommands.pop(rmk)455 for k in self.futureCommands.keys():456 if self.pastCommands.get(k) is None:457 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)458 sock.settimeout(1.0)459 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)460 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)461 other_node = random.choice([xx for xx in range(self.total_nodes) if xx not in self.failed_proposers and xx != self.leader])462 if self.leader == self.server_id or self.leader == -1:463 addr = ('localhost', PORT_PROPOSER + other_node)464 self.log('Leader {} redirect "{}" to Proposer {}'.format(465 self.server_id, self.futureCommands[k], other_node)) 466 else:467 addr = ('localhost', PORT_PROPOSER + self.leader)468 self.log('Proposer {} redirect "{}" to Leader {}'.format(469 self.server_id, self.futureCommands[k], self.leader))470 try:471 sock.connect(addr)472 sock.sendall(self.futureCommands[k].encode('utf-8'))473 except ConnectionRefusedError:474 pass475 # print("Connection Refused!!!")476 except socket.timeout as e:477 self.log("socket timeout {}\nRedirect from Proposer{}: to Leader: {}".format(e, self.server_id, self.leader))478 finally:479 sock.close()480 break481 # TODO: Add this back, send heartbeat to followers482 def recv_heartbeat(self, data):483 leader_id = int(data.split('_')[1])484 timestamp = float(data.split('_')[2])485 486 if self.leader == -1 and leader_id >= 0:487 self.leader = leader_id488 self.leaderHeartBeat = (self.leader, time.time())489 elif self.leader == leader_id:490 self.leaderHeartBeat = (self.leader, time.time())491 elif self.leader != leader_id:492 self.leader = leader_id493 self.leaderHeartBeat = (self.leader, time.time())494 # self.log('Proposer #{} Receiving "{}" from Leader #{}'.format(495 # self.server_id, data, self.leader))496 return497 def log (self, msg, verbose=False):498 if (not VERBOSE) and verbose:499 return500 print('[S{}] {}'.format(self.server_id, msg))501 def is_leader (self):502 return self.leader == self.server_id503 def electLeader(self, increaseProposalNum=False):504 self.abortWaitTime *= 1.5505 if increaseProposalNum is True:506 self.proposal_Number += self.total_nodes507 send_msg = 'prepare_%d_%d' % (self.proposal_Number, self.server_id)508 tmpRecvs = []509 if self.leader != -1: return510 for i in range(self.total_nodes):511 if i in self.failed_acceptors:512 continue513 tmpRecvs.append(None)514 tmpRecvs[-1] = self.send_prepare(defaultPORT=PORT_ACCEPTOR, acceptorId=i, clientId=-1, cmdId=-1, payload=send_msg)515 if "abort" in tmpRecvs[-1]:516 return517 validRecvs = [(int(xx.split('_')[-1]), xx) for xx in tmpRecvs if "promise" in xx]518 if len(validRecvs) >= (self.total_nodes // 2) + 1:519 recvNoVal = {}520 for _, recv in validRecvs:521 recvNo = int(recv.split('_')[1])522 recvVal = int(recv.split('_')[2])523 if recvVal >= 0:524 recvNoVal[recvNo] = recvVal525 if len(recvNoVal.keys()) > 0:526 self.log("Receive Accepted Value = {}".format(int(recvNoVal[sorted(recvNoVal.keys())[-1]])))527 self.log("FAILED_PROPOSERS = {}".format(self.failed_proposers))528 if len(recvNoVal.keys()) == 0:529 myVal = self.server_id530 elif int(recvNoVal[sorted(recvNoVal.keys())[-1]]) in self.failed_proposers:531 myVal = self.server_id532 else:533 myVal = recvNoVal[sorted(recvNoVal.keys())[-1]]534 send_msg = 'accept_' + str(self.proposal_Number) + '_' + str(myVal) + '_' + str(self.server_id)535 self.log("Sendding Accept Message = {}".format(send_msg))536 tmpRecvs = []537 validAccepted = 0538 for accId, _ in validRecvs: 539 if i in self.failed_acceptors:540 continue541 tmpRecvs.append(None)542 tmpRecvs[-1] = self.send_elect_accept(defaultPORT=PORT_ACCEPTOR, acceptorId=accId, payload=send_msg)543 if 'accepted' in tmpRecvs[-1]:544 validAccepted += 1545 # if validAccepted >= self.total_nodes // 2 + 1:546 # break547 if validAccepted >= self.total_nodes // 2 + 1:548 append_msg = ''549 if len(self.failed_proposers) > 0:550 append_msg += '_' + ('_').join([str(x) for x in self.failed_proposers])551 send_msg = 'leader_' + str(myVal) + append_msg552 for i in range(self.total_nodes):553 if i in self.failed_proposers or i == self.server_id:554 continue555 reply_msg = self.send_elect_decide(defaultPORT=PORT_PROPOSER, proposerId=i, payload=send_msg, toWhom='proposer')556 if "leader" not in reply_msg or int(reply_msg.split('_')[1]) != myVal:557 break558 self.leader = myVal559 self.slotId = self.pastSlotId + 1560 return561 self.proposal_Number = self.proposal_Number + self.total_nodes562 def processLeader(self, infos):563 leaderId = int(infos[1])564 if len(infos) > 2:565 for xx in infos[2:]:566 self.failed_proposers.add(int(xx))567 self.leader = leaderId568 self.log(OKGREEN + "Change leader to {}".format(self.leader) + ENDC)569 # self.log("Proposer%d's choose %d as leader" % (self.server_id, self.leader))570 571 reply_msg = 'leader_' + str(self.leader)572 573 return reply_msg574 # send prepare messages to all nodes575 def send_prepare(self, defaultPORT: int, acceptorId: int, clientId: int, cmdId: int, payload):576 if random.random() > 1.00:577 self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, acceptorId) + "^" * 80)578 return "TimeOut"579 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)580 sock.settimeout(1)581 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)582 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)583 addr = ('localhost', defaultPORT + acceptorId)584 try:585 sock.connect(addr)586 sock.sendall((payload).encode('utf-8'))587 self.log('Proposer #{} Sending prepare_{} to Acceptor #{}'.format(588 self.server_id, payload.split('_')[1], acceptorId))589 try:590 rtMessage = sock.recv(1024).decode('utf-8')591 except ConnectionResetError as e:592 self.log(">>>>>>\nacceptorId: {}\npayload: {}\n<<<<<<<".format(acceptorId, payload))593 raise e594 except ConnectionRefusedError:595 self.log("Acceptor #{} is down, removing it".format(acceptorId))596 self.failed_acceptors.add(acceptorId)597 rtMessage = "NoAcceptor"598 except socket.timeout as e:599 self.log("socket timeout {}\nAcceptor_id: {}\nCommandInfo: ({}, {})".format(e, acceptorId, clientId, cmdId))600 601 finally:602 sock.close()603 return rtMessage604 605 # send decide messages to other proposers(replicas)606 def send_elect_decide(self, defaultPORT: int, proposerId: int, payload, toWhom):607 if random.random() > 1.00:608 self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, proposerId) + "^" * 80)609 time.sleep(1)610 return "TimeOut"611 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)612 sock.settimeout(1)613 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)614 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)615 addr = ('localhost', defaultPORT + proposerId)616 try:617 sock.connect(addr)618 sock.sendall((payload).encode('utf-8'))619 self.log('Proposer #{} Sending decide Leader #{} to {} #{}'.format(620 self.server_id, self.server_id, toWhom, proposerId))621 try:622 return sock.recv(1024).decode('utf-8')623 except ConnectionResetError as e:624 self.log(">>>>>>\n{}}Id: {}\npayload: {}\n<<<<<<<".format(toWhom, proposerId, payload))625 raise e626 except ConnectionRefusedError:627 self.log("{} #{} is down, removing it".format(toWhom, proposerId))628 if toWhom.lower() == "proposer":629 self.failed_proposers.add(proposerId)630 return "NoProposer"631 elif toWhom.lower() == "acceptor":632 self.failed_acceptors.add(proposerId)633 return "Acceptor"634 except socket.timeout as e:635 self.log("socket timeout {}\n To {}_id #{} Elect Leader #{}".format(e, toWhom, proposerId, self.server_id))636 return "TimeOut"637 finally:638 sock.close()639 def send_elect_accept(self, defaultPORT: int, acceptorId: int, payload):640 if random.random() > 1.00:641 self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, acceptorId) + "^" * 80)642 return "TimeOut"643 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)644 sock.settimeout(1)645 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)646 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)647 addr = ('localhost', defaultPORT + acceptorId)648 try:649 sock.connect(addr)650 sock.sendall((payload).encode('utf-8'))651 self.log('Proposer #{} Sending elect accept to Acceptor #{}'.format(652 self.server_id, acceptorId))653 try:654 return sock.recv(1024).decode('utf-8')655 except ConnectionResetError as e:656 self.log(">>>>>>\nacceptorId: {}\npayload: {}\n<<<<<<<".format(acceptorId, payload))657 raise e658 except ConnectionRefusedError:659 self.log("Acceptor #{} is down, removing it".format(acceptorId))660 self.failed_acceptors.add(acceptorId)661 return "NoAcceptor"662 except socket.timeout as e:663 self.log("socket timeout {}\nAcceptor_id: {}\nCommandInfo: ({}, {})".format(e, acceptorId, clientId, cmdId))664 return "TimeOut"665 finally:666 sock.close()667 def send_command_accept(self, defaultPORT: int, acceptorId: int, clientId: int, cmdId: int, payload):668 if random.random() > 1.00:669 self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, acceptorId) + "^" * 80)670 return "TimeOut"671 rtMessage = ""672 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)673 sock.settimeout(1)674 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)675 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)676 addr = ('localhost', defaultPORT + acceptorId)677 try:678 sock.connect(addr)679 sock.sendall((payload).encode('utf-8'))680 self.log('Proposer #{} Sending command "{}" to Acceptor #{}'.format(681 self.server_id, payload, acceptorId))682 try:683 rtMessage = sock.recv(1024).decode('utf-8')684 except ConnectionResetError as e:685 self.log(">>>>>>\nacceptorId: {}\npayload: {}\n<<<<<<<".format(acceptorId, payload))686 rtMessage = "Failed"687 raise e688 except ConnectionRefusedError:689 self.log("Acceptor #{} is down, removing it".format(acceptorId))690 self.failed_acceptors.add(acceptorId)691 rtMessage = "Failed"692 except socket.timeout as e:693 self.log("socket timeout {}\nAcceptor_id: {}\nCommandInfo: ({}, {})".format(e, acceptorId, clientId, cmdId))694 rtMessage = "TimeOut"695 finally:696 sock.close()697 return rtMessage698 def send_command_decide(self, defaultPORT: int, proposerId: int, clientId: int, cmdId: int, payload):699 if random.random() > 1.00:700 self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, proposerId) + "^" * 80)701 time.sleep(1)702 return "TimeOut"703 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)704 sock.settimeout(1)705 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)706 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)707 addr = ('localhost', defaultPORT + proposerId)708 try:709 sock.connect(addr)710 sock.sendall((payload).encode('utf-8'))711 self.log('SlotID: #{}, Proposer #{} Sending decide command {} to Proposer #{}'.format(712 self.slotId, self.server_id, payload, proposerId))713 try:714 rtMessage = sock.recv(1024).decode('utf-8')715 except ConnectionResetError as e:716 self.log(">>>>>>\nproposerId: {}\npayload: {}\n<<<<<<<".format(proposerId, payload))717 raise e718 except ConnectionRefusedError:719 self.log("Proposer #{} is down, removing it".format(proposerId))720 self.failed_proposers.add(proposerId)721 rtMessage = 'No Proposer'722 except socket.timeout as e:723 self.log("socket timeout {}\nProposer_id: {}\nCommandInfo: ({}, {})".format(e, proposerId, clientId, cmdId))724 rtMessage = 'No Proposer'725 finally:726 sock.close()727 return rtMessage728 def processDecide(self, infos):729 clientId = int(infos[1])730 cmdId = int(infos[2])731 cmdContent = infos[3]732 slotId = int(infos[4])733 # assert slotId == self.slotId, "Proposer%d's Slot ID %d does not equal to Recv Slot ID %d" % (slotId, slotId, self.slotId)734 if self.pastCommands.get((clientId, cmdId), False) is False:735 self.slotId = slotId736 self.pastCommands[(clientId, cmdId)] = (slotId, cmdContent)737 reply_msg = 'stored_' + str(clientId) + '_' + str(cmdId) + '_' + cmdContent738 739 return reply_msg740 # handle a client connection741 def processClient(self, infos):742 clientId = infos[1]743 cmdId = infos[2]744 cmdContent = infos[3]745 746 if self.pastCommands.get((clientId, cmdId), False) is not False:747 self.log("Command ({}, {}, {}) is already executed".format(clientId, cmdId, cmdContent))748 return749 elif self.is_leader() is False:750 self.log("I am not leader, store Command ({}, {}, {}) in futureCommands".format(clientId, cmdId, cmdContent))751 self.futureCommands[(int(clientId), int(cmdId))] = ('_').join(infos)752 return753 else:754 self.log("I am leader, store Command ({}, {}, {}) in futureCommands".format(clientId, cmdId, cmdContent))755 self.futureCommands[(int(clientId), int(cmdId))] = ('_').join(infos)756 self.log("I am leader, send Command ({}, {}, {}) to Acceptors".format(clientId, cmdId, cmdContent))757 send_msg = 'accept_' + str(self.proposal_Number) + '_' + cmdContent + '_' + str(self.server_id)758 tmpRecvs = []759 for acceptorId in range(self.total_nodes):760 if acceptorId not in self.failed_acceptors:761 tmpRecvs.append(None)762 tmpRecvs[-1] = self.send_command_accept(763 defaultPORT=PORT_ACCEPTOR, 764 acceptorId=acceptorId, 765 clientId=clientId, 766 cmdId=cmdId, 767 payload=send_msg,768 )769 self.log("Receive accepted message {} from Acceptor {}".format(tmpRecvs[-1], acceptorId))770 NumAccepted = len([xx for xx in tmpRecvs if isinstance(xx, str) and 'accepted' in xx])771 if NumAccepted < (self.total_nodes // 2) + 1:772 773 self.log("Reply from acceptor is not enough({}), store Command ({}, {}, {}) to futureCommands".format(NumAccepted, clientId, cmdId, cmdContent))774 self.futureCommands[(int(clientId), int(cmdId))] = ('_').join(infos)775 # reply_msg = 'resend_' + str(clientId) + '_' + str(cmdId) + '_' + str(cmdContent) + '_' + str(self.server_id)776 else:777 decide_msg = 'decide_' + str(clientId) + '_' + str(cmdId) + '_' + str(cmdContent) + '_' + str(self.slotId)778 tmpRecvs = []779 self.log("I am leader, send Decide of Command ({}, {}, {}) to Proposers".format(clientId, cmdId, cmdContent))780 for proposerId in range(self.total_nodes):781 if proposerId != self.server_id and proposerId not in self.failed_proposers: 782 tmpRecvs.append(None)783 tmpRecvs[-1] = self.send_command_decide(784 defaultPORT=PORT_PROPOSER, 785 proposerId=proposerId, 786 clientId=clientId, 787 cmdId=cmdId, 788 payload=decide_msg,789 )790 self.processDecide(decide_msg.split('_'))791 for proposerId in self.sendDecideProposers:792 self.send_command_decide(793 defaultPORT=PORT_PROPOSER, 794 proposerId=proposerId, 795 clientId=clientId, 796 cmdId=cmdId, 797 payload=decide_msg,798 )799 self.slotId += 1800 # NumExecuted = len([xx for xx in tmpRecvs if isinstance(xx, str) and 'stored' in xx])801 # if NumExecuted == (self.total_nodes // 2):802 # self.futureCommands[(int(clientId), int(cmdId))] = infos.joins('_')803 # else:804 # if self.pastCommands.get((clientId, cmdId), False) is False:805 # pastCommandsPair = self.execute(clientId, cmdId, cmdContent)806 # self.slotId += 1807 # self.pastCommands[(clientId, cmdId)] = pastCommandsPair808 # reply_msg = 'proceeded_' + str(clientId) + '_' + str(cmdId) + '_' + self.pastCommands[(clientId, cmdId)][2]809 # return reply_msg810 def log_progress(self):811 print("*" * 80 + "\n" + self.progress_str() + "\n" + "*" * 80)812 # when receiving a prepare reply813 # def on_prepare_reply (self, reply):814 # if not self.is_leader() and not self.is_electing:815 # return # only leader follows up with accept816 # idx = reply['instance']817 # # stores info into progress818 # pg_ref = self.progress[idx]819 # prep_ref = pg_ref['prepare']820 # if reply['server_id'] in prep_ref:821 # assert prep_ref[reply['server_id']] == reply, \822 # '{} != {}'.format(prep_ref[reply['server_id']], reply)823 # prep_ref[reply['server_id']] = reply824 # # do we receive a majority of replies?825 # if len(prep_ref) > self.total_nodes / 2:826 # # find out the highest numbered proposal827 # hn = -1828 # hv = ''829 # for sid, replied_reply in prep_ref.items():830 # if replied_reply['prep_n'] > hn:831 # hn = replied_reply['prep_n']832 # hv = replied_reply['prep_value']833 # assert hn >= 0834 # reply['command'] = hv835 # self.send_accept(reply)836 837 838 839 # send learn messages to all nodes840 # def send_learn (self, data, cmd):841 # payload = self.init_payload(data)842 # payload['type'] = T_LEARN843 # payload['command'] = cmd844 # replies = self.send_all(payload)845 # reply = self.check_stale(replies)846 # if reply is not None:847 # self.on_learn(reply)848 # return849 # payload['server_id'] = self.server_id850 # self.on_learn(payload)851 # when a learn message is received852 # def on_learn (self, request):853 # idx = request['instance']854 # cmd = request['command']855 # # update states856 # self.init_states_to(idx)857 # self.states[idx] = cmd858 # # update progress859 # self.init_progress(idx)860 # self.progress[idx]['command'] = cmd861 # if 'client' in request:862 # self.progress[idx]['client'] = request['client']863 # # execute if possible864 # self.execute()865 # # debug866 # self.log(self.states, verbose=True)867 # self.log(self.progress, verbose=True)868 # execute the commands869 # def execute(self):870 871 # for i in range(self.next_exe, len(self.states)):872 # cmd = self.states[i]873 # if cmd == S_UNSURE:874 # break # a hole in the sequence, we can't execute further875 # self.next_exe += 1876 # if cmd.startswith(S_ELECT_LEADER_PREFIX):877 # new_leader_id = int(cmd[len(S_ELECT_LEADER_PREFIX):])878 # self.log("resetting leader to {}".format(new_leader_id))879 # self.leader = new_leader_id880 # elif cmd.startswith("lock_"):881 # n_lock = int(cmd.lstrip("lock_"))882 # assert n_lock >= 0, str(n_lock)883 # assert n_lock < len(self.locks), str(n_lock)884 # client = self.progress[i]['client']885 # if self.locks[n_lock] is None:886 # self.locks[n_lock] = client887 # elif cmd.startswith("unlock_"):888 # n_lock = int(cmd.lstrip("unlock_"))889 # assert n_lock >= 0, str(n_lock)890 # assert n_lock < len(self.locks), str(n_lock)891 # client = self.progress[i]['client']892 # if self.locks[n_lock] == client:893 # self.locks[n_lock] = None894 # else:895 # raise Exception("unknown command: " + cmd)896 # insert initial instance if it doesn't exist before897 # def init_progress(self, idx: int):898 # if not idx in self.progress:899 # self.progress[idx] = INIT_PROGRESS()900 901 # insert values into states up to idx902 # def init_states_to(self, idx):903 # end = len(self.states)904 # if len(self.states) > idx:905 # return # the idx already exists906 # for i in range(end, idx + 1):907 # self.states.append(S_UNSURE) # fill holes with S_UNSURE908 # copy re-usable fields into a new payload object909 # def init_payload (self, payload):910 # stripped = {911 # 'instance': payload['instance']912 # }913 # if 'client' in payload:914 # stripped['client'] = payload['client']915 # return stripped916 # send a message to peer and immediately close connection917 # def send_msg(self, defaultPORT: int, target_id: int, payload, recv=True):918 # if random.random() > 0.95:919 # self.log("^" * 80 + "\ndropping {} to {}\n".format(payload, target_id) + "^" * 80)920 # return # randomly drop message921 # # self.msg_count += 1922 # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)923 # if target_id == self.leader:924 # sock.settimeout((self.server_id + random.random()) * 3)925 # else:926 # sock.settimeout(1)927 # sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)928 # sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)929 # addr = ('localhost', defaultPORT + target_id)930 # # payload['source'] = 'server'931 # # payload['server_id'] = self.server_id932 # try:933 # sock.connect(addr)934 # sock.sendall(json.dumps(payload).encode('utf-8'))935 # self.log('Paxos #{}, sending {} to {}'.format(payload['instance'],936 # payload, target_id))937 # if recv:938 # try:939 # return json.loads(sock.recv(1024).decode('utf-8'))940 # except ConnectionResetError as e:941 # self.log(">>>>>>\ntarget_id: {}\npayload: {}\n<<<<<<<".format(target_id, payload))942 # raise e943 # except ConnectionRefusedError:944 # self.log("peer {} is down, removing it".format(addr))945 # self.failed_nodes.add(target_id)946 # if target_id == self.leader:947 # self.log("send_msg to leader {} failed, trigger election".format(addr))948 # self.elect_leader()949 # except socket.timeout as e:950 # self.log("socket timeout {}\ntarget_id: {}\npayload: {}".format(e, target_id, payload))951 # raise e952 # finally:953 # sock.close()954 # send to all followers (every nodes except myself)955 # def send_all (self, payload):956 # self.log("send_all({})".format(payload))957 # replies = []958 # for i in range(1, self.total_nodes + 1):959 # if not i == self.server_id and not (i in self.failed_nodes):960 # self.log('send_all from {} to {}'.format(self.server_id, i))961 # reply = self.send_msg(i, payload)962 # if reply is not None:963 # replies.append((i, reply))964 # return replies965 # check whether this server is the current leader966 # def notify(self):967 # with self.handler_lock:968 # if self.waiting == W_OK:969 # return970 # if time.time() > self.waiting_settime + WAITING_TIMEOUT:971 # if self.waiting == W_WAITING_PREPARE_REPLIES:972 # self.send_prepare(*self.wait_args)973 # elif self.waiting == W_WAITING_ACCEPT_REPLIES:974 # self.send_accept(*self.wait_args)975 # else:976 # assert False, "invalid waiting: {}".format(self.waiting)977 # def elect_leader(self):978 # with self.handler_lock:979 # if self.leader not in self.failed_nodes:980 # self.failed_nodes.add(self.leader)981 # assert not self.is_electing982 # self.is_electing = True983 # idx = self.get_next_instance_idx()984 # data = {985 # 'instance': idx,986 # 'command': S_ELECT_LEADER(self.server_id)987 # }988 # assert idx not in self.progress989 # self.init_progress(idx)990 # data['proposal_n'] = self.bump_next_proposal_n(idx)991 # self.send_prepare(data)992# start server993def start (sid, total, recover=False):994 server = LockServer(sid, total, recover=recover)995 heartbeat_server_thr = HeartBeatRecvThread(sid, server)996 heartbeat_server_thr.start()997 heartbeat_sender_thr = HeartBeatSenderThread(server)998 heartbeat_sender_thr.start()999 redirect_sender_thr = RedirectSenderThread(server)1000 redirect_sender_thr.start()1001 fetch_server_thr = FetchRecvThread(sid, server)1002 fetch_server_thr.start()1003 heartbeat_checker_thr = HeartBeatCheckerThread(heartbeat_server_thr.server, server)1004 heartbeat_checker_thr.start()1005 # handle signal1006 def sig_handler (signum, frame):1007 heartbeat_checker_thr.stopped = True1008 heartbeat_sender_thr.stopped = True1009 redirect_sender_thr.stopped = True1010 heartbeat_checker_thr.join()1011 heartbeat_sender_thr.join()1012 redirect_sender_thr.join()1013 heartbeat_server_thr.server.shutdown()1014 heartbeat_server_thr.server.server_close()1015 heartbeat_server_thr.join()1016 fetch_server_thr.server.shutdown()1017 fetch_server_thr.server.server_close()1018 fetch_server_thr.join()1019 server.server_close()1020 exit(0)1021 # register signal handler1022 signal.signal(signal.SIGINT, sig_handler)1023 signal.signal(signal.SIGTERM, sig_handler)1024 # serve until explicit shutdown1025 ip, port = server.server_address1026 server.log('Listening on {}:{} ...'.format(ip, port))1027 server.serve_forever()1028if __name__ == "__main__":1029 if len(sys.argv) < 3:1030 print('Usage:\npython paxos_processor.py [server_id] [total_nodes] [-v]')1031 exit(0)1032 if len(sys.argv) == 4 and "v" in sys.argv[3]:1033 VERBOSE = True1034 else:1035 VERBOSE = False1036 if len(sys.argv) == 4 and "r" in sys.argv[3]:1037 RECOVER = True1038 else:1039 RECOVER = False1040 print("Start Recover Mode...")...

Full Screen

Full Screen

test_lock_server.py

Source:test_lock_server.py Github

copy

Full Screen

1# Copyright 2011 OpenStack Foundation2# Copyright 2013 IBM Corp.3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15import webob16import mock17from nova.api.openstack import common18from nova.api.openstack.compute.legacy_v2.contrib import admin_actions \19 as lock_server_v220from nova.api.openstack.compute import lock_server as lock_server_v2121from nova import context22from nova import exception23from nova import test24from nova.tests.unit.api.openstack.compute import admin_only_action_common25from nova.tests.unit.api.openstack import fakes26from nova.tests.unit import fake_instance27class LockServerTestsV21(admin_only_action_common.CommonTests):28 lock_server = lock_server_v2129 controller_name = 'LockServerController'30 authorization_error = exception.PolicyNotAuthorized31 _api_version = '2.1'32 def setUp(self):33 super(LockServerTestsV21, self).setUp()34 self.controller = getattr(self.lock_server, self.controller_name)()35 self.compute_api = self.controller.compute_api36 def _fake_controller(*args, **kwargs):37 return self.controller38 self.stubs.Set(self.lock_server, self.controller_name,39 _fake_controller)40 self.mox.StubOutWithMock(self.compute_api, 'get')41 def test_lock_unlock(self):42 self._test_actions(['_lock', '_unlock'])43 def test_lock_unlock_with_non_existed_instance(self):44 self._test_actions_with_non_existed_instance(['_lock', '_unlock'])45 def test_unlock_not_authorized(self):46 self.mox.StubOutWithMock(self.compute_api, 'unlock')47 instance = self._stub_instance_get()48 self.compute_api.unlock(self.context, instance).AndRaise(49 exception.PolicyNotAuthorized(action='unlock'))50 self.mox.ReplayAll()51 body = {}52 self.assertRaises(self.authorization_error,53 self.controller._unlock,54 self.req, instance.uuid, body)55class LockServerTestsV2(LockServerTestsV21):56 lock_server = lock_server_v257 controller_name = 'AdminActionsController'58 authorization_error = webob.exc.HTTPForbidden59 _api_version = '2'60class LockServerPolicyEnforcementV21(test.NoDBTestCase):61 def setUp(self):62 super(LockServerPolicyEnforcementV21, self).setUp()63 self.controller = lock_server_v21.LockServerController()64 self.req = fakes.HTTPRequest.blank('')65 def test_lock_policy_failed(self):66 rule_name = "os_compute_api:os-lock-server:lock"67 self.policy.set_rules({rule_name: "project:non_fake"})68 exc = self.assertRaises(69 exception.PolicyNotAuthorized,70 self.controller._lock, self.req,71 fakes.FAKE_UUID,72 body={'lock': {}})73 self.assertEqual(74 "Policy doesn't allow %s to be performed." % rule_name,75 exc.format_message())76 def test_unlock_policy_failed(self):77 rule_name = "os_compute_api:os-lock-server:unlock"78 self.policy.set_rules({rule_name: "project:non_fake"})79 exc = self.assertRaises(80 exception.PolicyNotAuthorized,81 self.controller._unlock, self.req,82 fakes.FAKE_UUID,83 body={'unlock': {}})84 self.assertEqual(85 "Policy doesn't allow %s to be performed." % rule_name,86 exc.format_message())87 @mock.patch.object(common, 'get_instance')88 def test_unlock_policy_failed_with_unlock_override(self,89 get_instance_mock):90 ctxt = context.RequestContext('fake', 'fake')91 instance = fake_instance.fake_instance_obj(ctxt)92 instance.locked_by = "fake"93 get_instance_mock.return_value = instance94 rule_name = ("os_compute_api:os-lock-server:"95 "unlock:unlock_override")96 rules = {"os_compute_api:os-lock-server:unlock": "@",97 rule_name: "project:non_fake"}98 self.policy.set_rules(rules)99 exc = self.assertRaises(100 exception.PolicyNotAuthorized, self.controller._unlock,101 self.req, fakes.FAKE_UUID, body={'unlock': {}})102 self.assertEqual(103 "Policy doesn't allow %s to be performed." % rule_name,...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful