Best JavaScript code snippet using mountebank
TalkAssistant.py
Source:TalkAssistant.py
1from direct.directnotify import DirectNotifyGlobal2from direct.showbase import DirectObject3from pandac.PandaModules import *4import sys5import time6from otp.chat.ChatGlobals import *7from otp.chat.TalkGlobals import *8from otp.chat.TalkHandle import TalkHandle9from otp.chat.TalkMessage import TalkMessage10from otp.otpbase import OTPGlobals11from otp.otpbase import OTPLocalizer12from otp.speedchat import SCDecoders13from toontown.chat.ChatGlobals import *14from toontown.chat.TTWhiteList import TTWhiteList15ThoughtPrefix = '.'16class TalkAssistant(DirectObject.DirectObject):17 notify = DirectNotifyGlobal.directNotify.newCategory('TalkAssistant')18 def __init__(self):19 self.logWhispers = 120 self.whiteList = None21 self.clearHistory()22 self.zeroTimeDay = time.time()23 self.zeroTimeGame = globalClock.getRealTime()24 self.floodThreshold = 10.025 self.useWhiteListFilter = base.config.GetBool('white-list-filter-openchat', 0)26 self.lastWhisperDoId = None27 self.lastWhisperPlayerId = None28 self.lastWhisper = None29 self.SCDecoder = SCDecoders30 self.whiteList = TTWhiteList()31 return32 def clearHistory(self):33 self.historyComplete = []34 self.historyOpen = []35 self.historyUpdates = []36 self.historyGuild = []37 self.historyByDoId = {}38 self.historyByDISLId = {}39 self.floodDataByDoId = {}40 self.spamDictByDoId = {}41 self.labelGuild = OTPLocalizer.TalkGuild42 self.handleDict = {}43 self.messageCount = 044 self.shownWhiteListWarning = 045 def delete(self):46 self.ignoreAll()47 self.clearHistory()48 def start(self):49 pass50 def stop(self):51 pass52 def countMessage(self):53 self.messageCount += 154 return self.messageCount - 155 def getOpenText(self, numLines, startPoint = 0):56 return self.historyOpen[startPoint:startPoint + numLines]57 def getSizeOpenText(self):58 return len(self.historyOpen)59 def getCompleteText(self, numLines, startPoint = 0):60 return self.historyComplete[startPoint:startPoint + numLines]61 def getCompleteTextFromRecent(self, numLines, startPoint = 0):62 start = len(self.historyComplete) - startPoint63 if start < 0:64 start = 065 backStart = max(start - numLines, 0)66 text = self.historyComplete[backStart:start]67 text.reverse()68 return text69 def getAllCompleteText(self):70 return self.historyComplete71 def getAllHistory(self):72 return self.historyComplete73 def getSizeCompleteText(self):74 return len(self.historyComplete)75 def getHandle(self, doId):76 return self.handleDict.get(doId)77 def doWhiteListWarning(self):78 pass79 def addToHistoryDoId(self, message, doId, scrubbed = 0):80 if message.getTalkType() == TALK_WHISPER and doId != localAvatar.doId:81 self.lastWhisperDoId = doId82 self.lastWhisper = self.lastWhisperDoId83 if doId not in self.historyByDoId:84 self.historyByDoId[doId] = []85 self.historyByDoId[doId].append(message)86 if not self.shownWhiteListWarning and scrubbed and doId == localAvatar.doId:87 self.doWhiteListWarning()88 self.shownWhiteListWarning = 189 if doId not in self.floodDataByDoId:90 self.floodDataByDoId[doId] = [0.0, self.stampTime(), message]91 else:92 oldTime = self.floodDataByDoId[doId][1]93 newTime = self.stampTime()94 timeDiff = newTime - oldTime95 oldRating = self.floodDataByDoId[doId][0]96 contentMult = 1.097 if len(message.getBody()) < 6:98 contentMult += 0.2 * float(6 - len(message.getBody()))99 if self.floodDataByDoId[doId][2].getBody() == message.getBody():100 contentMult += 1.0101 floodRating = max(0, 3.0 * contentMult + oldRating - timeDiff)102 self.floodDataByDoId[doId] = [floodRating, self.stampTime(), message]103 if floodRating > self.floodThreshold:104 if oldRating < self.floodThreshold:105 self.floodDataByDoId[doId] = [floodRating + 3.0, self.stampTime(), message]106 return 1107 else:108 self.floodDataByDoId[doId] = [oldRating - timeDiff, self.stampTime(), message]109 return 2110 return 0111 def addToHistoryDISLId(self, message, dISLId, scrubbed = 0):112 if message.getTalkType() == TALK_ACCOUNT:113 self.lastWhisperPlayerId = dISLId114 self.lastWhisper = self.lastWhisperPlayerId115 if dISLId not in self.historyByDISLId:116 self.historyByDISLId[dISLId] = []117 self.historyByDISLId[dISLId].append(message)118 def addHandle(self, doId, message):119 if doId == localAvatar.doId:120 return121 handle = self.handleDict.get(doId)122 if not handle:123 handle = TalkHandle(doId, message)124 self.handleDict[doId] = handle125 else:126 handle.addMessageInfo(message)127 def stampTime(self):128 return globalClock.getRealTime() - self.zeroTimeGame129 def findName(self, id, isPlayer = 0):130 if isPlayer:131 return self.findPlayerName(id)132 else:133 return self.findAvatarName(id)134 def findAvatarName(self, id):135 info = base.cr.identifyAvatar(id)136 if info:137 return info.getName()138 else:139 return ''140 def findPlayerName(self, id):141 info = base.cr.playerFriendsManager.getFriendInfo(id)142 if info:143 return info.playerName144 else:145 return ''146 def whiteListFilterMessage(self, text):147 if not self.useWhiteListFilter:148 return text149 elif not base.whiteList:150 return 'no list'151 words = text.split(' ')152 newwords = []153 for word in words:154 if word == '' or base.whiteList.isWord(word):155 newwords.append(word)156 else:157 newwords.append(base.whiteList.defaultWord)158 newText = ' '.join(newwords)159 return newText160 def colorMessageByWhiteListFilter(self, text):161 if not base.whiteList:162 return text163 words = text.split(' ')164 newwords = []165 for word in words:166 if word == '' or base.whiteList.isWord(word):167 newwords.append(word)168 else:169 newwords.append('\x01WLRed\x01' + word + '\x02')170 newText = ' '.join(newwords)171 return newText172 def isThought(self, message):173 if not message:174 return 0175 elif len(message) == 0:176 return 0177 elif message.find(ThoughtPrefix, 0, len(ThoughtPrefix)) >= 0:178 return 1179 else:180 return 0181 def removeThoughtPrefix(self, message):182 if self.isThought(message):183 return message[len(ThoughtPrefix):]184 else:185 return message186 def fillWithTestText(self):187 hold = self.floodThreshold188 self.floodThreshold = 1000.0189 self.receiveOpenTalk(1001, 'Bob the Ghost', None, None, 'Hello from the machine')190 self.receiveOpenTalk(1001, 'Bob the Ghost', None, None, 'More text for ya!')191 self.receiveOpenTalk(1001, 'Bob the Ghost', None, None, 'Hope this makes life easier')192 self.receiveOpenTalk(1002, 'Doug the Spirit', None, None, 'Now we need some longer text that will spill over onto two lines')193 self.receiveOpenTalk(1002, 'Doug the Spirit', None, None, 'Maybe I will tell you')194 self.receiveOpenTalk(1001, 'Bob the Ghost', None, None, 'If you are seeing this text it is because you are cool')195 self.receiveOpenTalk(1002, 'Doug the Spirit', None, None, "That's right, there is no need to call tech support")196 self.receiveOpenTalk(localAvatar.doId, localAvatar.getName, None, None, "Okay I won't call tech support, because I am cool")197 self.receiveGMTalk(1003, 'God of Text', None, None, 'Good because I have seen it already')198 self.floodThreshold = hold199 return200 def printHistoryComplete(self):201 print 'HISTORY COMPLETE'202 for message in self.historyComplete:203 print '%s %s %s\n%s\n' % (message.getTimeStamp(),204 message.getSenderAvatarName(),205 message.getSenderAccountName(),206 message.getBody())207 def checkOpenTypedChat(self):208 if base.localAvatar.commonChatFlags & OTPGlobals.CommonChat:209 return True210 return False211 def checkAnyTypedChat(self):212 if base.localAvatar.commonChatFlags & OTPGlobals.CommonChat:213 return True214 if base.localAvatar.canChat():215 return True216 return False217 def checkOpenSpeedChat(self):218 return True219 def checkWhisperTypedChatAvatar(self, avatarId):220 remoteAvatar = base.cr.doId2do.get(avatarId)221 if remoteAvatar:222 if remoteAvatar.isUnderstandable():223 return True224 if base.localAvatar.commonChatFlags & OTPGlobals.SuperChat:225 return True226 remoteAvatarOrHandleOrInfo = base.cr.identifyAvatar(avatarId)227 if remoteAvatarOrHandleOrInfo and hasattr(remoteAvatarOrHandleOrInfo, 'isUnderstandable'):228 if remoteAvatarOrHandleOrInfo.isUnderstandable():229 return True230 info = base.cr.playerFriendsManager.findPlayerInfoFromAvId(avatarId)231 if info:232 if info.understandableYesNo:233 return True234 info = base.cr.avatarFriendsManager.getFriendInfo(avatarId)235 if info:236 if info.understandableYesNo:237 return True238 if base.cr.getFriendFlags(avatarId) & OTPGlobals.FriendChat:239 return True240 return False241 def checkWhisperSpeedChatAvatar(self, avatarId):242 return True243 def checkWhisperTypedChatPlayer(self, playerId):244 info = base.cr.playerFriendsManager.getFriendInfo(playerId)245 if info:246 if info.understandableYesNo:247 return True248 return False249 def checkWhisperSpeedChatPlayer(self, playerId):250 if base.cr.playerFriendsManager.isPlayerFriend(playerId):251 return True252 return False253 def checkOpenSpeedChat(self):254 return True255 def checkWhisperSpeedChatAvatar(self, avatarId):256 return True257 def checkWhisperSpeedChatPlayer(self, playerId):258 if base.cr.playerFriendsManager.isPlayerFriend(playerId):259 return True260 return False261 def checkGuildTypedChat(self):262 if localAvatar.guildId:263 return True264 return False265 def checkGuildSpeedChat(self):266 if localAvatar.guildId:267 return True268 return False269 def receiveOpenTalk(self, senderAvId, avatarName, accountId, accountName, message, scrubbed = 0):270 error = None271 if not avatarName and senderAvId:272 localAvatar.sendUpdate('logSuspiciousEvent', ['receiveOpenTalk: invalid avatar name (%s)' % senderAvId])273 avatarName = self.findAvatarName(senderAvId)274 if not accountName and accountId:275 accountName = self.findPlayerName(accountId)276 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, senderAvId, avatarName, accountId, accountName, None, None, None, None, TALK_OPEN, None)277 if senderAvId != localAvatar.doId:278 self.addHandle(senderAvId, newMessage)279 reject = 0280 if senderAvId:281 reject = self.addToHistoryDoId(newMessage, senderAvId, scrubbed)282 if accountId:283 self.addToHistoryDISLId(newMessage, accountId)284 if reject == 1:285 newMessage.setBody(OTPLocalizer.AntiSpamInChat)286 if reject != 2:287 isSpam = self.spamDictByDoId.get(senderAvId) and reject288 if not isSpam:289 self.historyComplete.append(newMessage)290 self.historyOpen.append(newMessage)291 messenger.send('NewOpenMessage', [newMessage])292 if newMessage.getBody() == OTPLocalizer.AntiSpamInChat:293 self.spamDictByDoId[senderAvId] = 1294 else:295 self.spamDictByDoId[senderAvId] = 0296 return error297 def receiveWhisperTalk(self, avatarId, avatarName, accountId, accountName, toId, toName, message, scrubbed = 0):298 error = None299 if not avatarName and avatarId:300 avatarName = self.findAvatarName(avatarId)301 if not accountName and accountId:302 accountName = self.findPlayerName(accountId)303 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, avatarId, avatarName, accountId, accountName, toId, toName, None, None, TALK_WHISPER, None)304 if avatarId == localAvatar.doId:305 self.addHandle(toId, newMessage)306 else:307 self.addHandle(avatarId, newMessage)308 self.historyComplete.append(newMessage)309 if avatarId:310 self.addToHistoryDoId(newMessage, avatarId, scrubbed)311 if accountId:312 self.addToHistoryDISLId(newMessage, accountId)313 messenger.send('NewOpenMessage', [newMessage])314 return error315 def receiveAccountTalk(self, avatarId, avatarName, accountId, accountName, toId, toName, message, scrubbed = 0):316 if not accountName and base.cr.playerFriendsManager.playerId2Info.get(accountId):317 accountName = base.cr.playerFriendsManager.playerId2Info.get(accountId).playerName318 error = None319 if not avatarName and avatarId:320 avatarName = self.findAvatarName(avatarId)321 if not accountName and accountId:322 accountName = self.findPlayerName(accountId)323 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, avatarId, avatarName, accountId, accountName, None, None, toId, toName, TALK_ACCOUNT, None)324 self.historyComplete.append(newMessage)325 if avatarId:326 self.addToHistoryDoId(newMessage, avatarId, scrubbed)327 if accountId:328 self.addToHistoryDISLId(newMessage, accountId, scrubbed)329 messenger.send('NewOpenMessage', [newMessage])330 return error331 def receiveGuildTalk(self, senderAvId, fromAC, avatarName, message, scrubbed = 0):332 error = None333 if not self.isThought(message):334 accountName = self.findName(fromAC, 1)335 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, senderAvId, avatarName, fromAC, accountName, None, None, None, None, TALK_GUILD, None)336 reject = self.addToHistoryDoId(newMessage, senderAvId)337 if reject == 1:338 newMessage.setBody(OTPLocalizer.AntiSpamInChat)339 if reject != 2:340 isSpam = self.spamDictByDoId.get(senderAvId) and reject341 if not isSpam:342 self.historyComplete.append(newMessage)343 self.historyGuild.append(newMessage)344 messenger.send('NewOpenMessage', [newMessage])345 if newMessage.getBody() == OTPLocalizer.AntiSpamInChat:346 self.spamDictByDoId[senderAvId] = 1347 else:348 self.spamDictByDoId[senderAvId] = 0349 return error350 def receiveGMTalk(self, avatarId, avatarName, accountId, accountName, message, scrubbed = 0):351 error = None352 if not avatarName and avatarId:353 avatarName = self.findAvatarName(avatarId)354 if not accountName and accountId:355 accountName = self.findPlayerName(accountId)356 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, avatarId, avatarName, accountId, accountName, None, None, None, None, TALK_GM, None)357 self.historyComplete.append(newMessage)358 self.historyOpen.append(newMessage)359 if avatarId:360 self.addToHistoryDoId(newMessage, avatarId)361 if accountId:362 self.addToHistoryDISLId(newMessage, accountId)363 messenger.send('NewOpenMessage', [newMessage])364 return error365 def receiveThought(self, avatarId, avatarName, accountId, accountName, message, scrubbed = 0):366 error = None367 if not avatarName and avatarId:368 avatarName = self.findAvatarName(avatarId)369 if not accountName and accountId:370 accountName = self.findPlayerName(accountId)371 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, avatarId, avatarName, accountId, accountName, None, None, None, None, AVATAR_THOUGHT, None)372 if avatarId != localAvatar.doId:373 self.addHandle(avatarId, newMessage)374 reject = 0375 if avatarId:376 reject = self.addToHistoryDoId(newMessage, avatarId, scrubbed)377 if accountId:378 self.addToHistoryDISLId(newMessage, accountId)379 if reject == 1:380 newMessage.setBody(OTPLocalizer.AntiSpamInChat)381 if reject != 2:382 self.historyComplete.append(newMessage)383 self.historyOpen.append(newMessage)384 messenger.send('NewOpenMessage', [newMessage])385 return error386 def receiveGameMessage(self, message):387 error = None388 if not self.isThought(message):389 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, None, None, None, None, localAvatar.doId, localAvatar.getName(), localAvatar.DISLid, localAvatar.DISLname, INFO_GAME, None)390 self.historyComplete.append(newMessage)391 self.historyUpdates.append(newMessage)392 messenger.send('NewOpenMessage', [newMessage])393 return error394 def receiveSystemMessage(self, message):395 error = None396 if not self.isThought(message):397 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, None, None, None, None, localAvatar.doId, localAvatar.getName(), localAvatar.DISLid, localAvatar.DISLname, INFO_SYSTEM, None)398 self.historyComplete.append(newMessage)399 self.historyUpdates.append(newMessage)400 messenger.send('NewOpenMessage', [newMessage])401 return error402 def receiveDeveloperMessage(self, message):403 error = None404 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, None, None, None, None, localAvatar.doId, localAvatar.getName(), localAvatar.DISLid, localAvatar.DISLname, INFO_DEV, None)405 self.historyComplete.append(newMessage)406 self.historyUpdates.append(newMessage)407 messenger.send('NewOpenMessage', [newMessage])408 return error409 def receiveGuildMessage(self, message, senderAvId, senderName):410 error = None411 if not self.isThought(message):412 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, senderAvId, senderName, None, None, None, None, None, None, TALK_GUILD, None)413 self.historyComplete.append(newMessage)414 self.historyGuild.append(newMessage)415 messenger.send('NewOpenMessage', [newMessage])416 return error417 def receiveGuildUpdateMessage(self, message, senderId, senderName, receiverId, receiverName, extraInfo = None):418 error = None419 if not self.isThought(message):420 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, senderId, senderName, None, None, receiverId, receiverName, None, None, INFO_GUILD, extraInfo)421 self.historyComplete.append(newMessage)422 self.historyGuild.append(newMessage)423 messenger.send('NewOpenMessage', [newMessage])424 return error425 def receiveFriendUpdate(self, friendId, friendName, isOnline):426 if isOnline:427 onlineMessage = OTPLocalizer.FriendOnline428 else:429 onlineMessage = OTPLocalizer.FriendOffline430 newMessage = TalkMessage(self.countMessage(), self.stampTime(), onlineMessage, friendId, friendName, None, None, localAvatar.doId, localAvatar.getName(), localAvatar.DISLid, localAvatar.DISLname, UPDATE_FRIEND, None)431 self.addHandle(friendId, newMessage)432 self.historyComplete.append(newMessage)433 self.historyUpdates.append(newMessage)434 messenger.send('NewOpenMessage', [newMessage])435 return436 def receiveFriendAccountUpdate(self, friendId, friendName, isOnline):437 if isOnline:438 onlineMessage = OTPLocalizer.FriendOnline439 else:440 onlineMessage = OTPLocalizer.FriendOffline441 newMessage = TalkMessage(self.countMessage(), self.stampTime(), onlineMessage, None, None, friendId, friendName, localAvatar.doId, localAvatar.getName(), localAvatar.DISLid, localAvatar.DISLname, UPDATE_FRIEND, None)442 self.historyComplete.append(newMessage)443 self.historyUpdates.append(newMessage)444 messenger.send('NewOpenMessage', [newMessage])445 return446 def receiveGuildUpdate(self, memberId, memberName, isOnline):447 if base.cr.identifyFriend(memberId) is None:448 if isOnline:449 onlineMessage = OTPLocalizer.GuildMemberOnline450 else:451 onlineMessage = OTPLocalizer.GuildMemberOffline452 newMessage = TalkMessage(self.countMessage(), self.stampTime(), onlineMessage, memberId, memberName, None, None, None, None, None, None, UPDATE_GUILD, None)453 self.addHandle(memberId, newMessage)454 self.historyComplete.append(newMessage)455 self.historyUpdates.append(newMessage)456 self.historyGuild.append(newMessage)457 messenger.send('NewOpenMessage', [newMessage])458 return459 def receiveOpenSpeedChat(self, type, messageIndex, senderAvId, name = None):460 error = None461 if not name and senderAvId:462 name = self.findName(senderAvId, 0)463 if type == SPEEDCHAT_NORMAL:464 message = self.SCDecoder.decodeSCStaticTextMsg(messageIndex)465 elif type == SPEEDCHAT_EMOTE:466 message = self.SCDecoder.decodeSCEmoteWhisperMsg(messageIndex, name)467 elif type == SPEEDCHAT_CUSTOM:468 message = self.SCDecoder.decodeSCCustomMsg(messageIndex)469 if message in (None, ''):470 return471 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, senderAvId, name, None, None, None, None, None, None, TALK_OPEN, None)472 self.historyComplete.append(newMessage)473 self.historyOpen.append(newMessage)474 self.addToHistoryDoId(newMessage, senderAvId)475 messenger.send('NewOpenMessage', [newMessage])476 return error477 def receiveAvatarWhisperSpeedChat(self, type, messageIndex, senderAvId, name = None):478 error = None479 if not name and senderAvId:480 name = self.findName(senderAvId, 0)481 if type == SPEEDCHAT_NORMAL:482 message = self.SCDecoder.decodeSCStaticTextMsg(messageIndex)483 elif type == SPEEDCHAT_EMOTE:484 message = self.SCDecoder.decodeSCEmoteWhisperMsg(messageIndex, name)485 elif type == SPEEDCHAT_CUSTOM:486 message = self.SCDecoder.decodeSCCustomMsg(messageIndex)487 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, senderAvId, name, None, None, localAvatar.doId, localAvatar.getName(), localAvatar.DISLid, localAvatar.DISLname, TALK_WHISPER, None)488 self.historyComplete.append(newMessage)489 self.historyOpen.append(newMessage)490 self.addToHistoryDoId(newMessage, senderAvId)491 messenger.send('NewOpenMessage', [newMessage])492 return error493 def receivePlayerWhisperSpeedChat(self, type, messageIndex, senderAvId, name = None):494 error = None495 if not name and senderAvId:496 name = self.findName(senderAvId, 1)497 if type == SPEEDCHAT_NORMAL:498 message = self.SCDecoder.decodeSCStaticTextMsg(messageIndex)499 elif type == SPEEDCHAT_EMOTE:500 message = self.SCDecoder.decodeSCEmoteWhisperMsg(messageIndex, name)501 elif type == SPEEDCHAT_CUSTOM:502 message = self.SCDecoder.decodeSCCustomMsg(messageIndex)503 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, None, None, senderAvId, name, localAvatar.doId, localAvatar.getName(), localAvatar.DISLid, localAvatar.DISLname, TALK_WHISPER, None)504 self.historyComplete.append(newMessage)505 self.historyOpen.append(newMessage)506 self.addToHistoryDISLId(newMessage, senderAvId)507 messenger.send('NewOpenMessage', [newMessage])508 return error509 def sendOpenTalk(self, message):510 error = None511 doId = base.localAvatar.doId512 if base.config.GetBool('want-talkative-tyler', False):513 if base.localAvatar.zoneId == 2000:514 tyler = base.cr.doFind('Talkative Tyler')515 if tyler:516 tyler.sendUpdate('talkMessage', [doId, message])517 if base.cr.wantMagicWords and len(message) > 0 and message[0] == '~':518 messenger.send('magicWord', [message])519 self.receiveDeveloperMessage(message)520 else:521 chatFlags = CFSpeech | CFTimeout522 if self.isThought(message):523 chatFlags = CFThought524 base.cr.chatAgent.sendChatMessage(message)525 messenger.send('chatUpdate', [message, chatFlags])526 return error527 def sendWhisperTalk(self, message, receiverAvId):528 modifications = []529 words = message.split(' ')530 offset = 0531 WantWhitelist = config.GetBool('want-whitelist', 1)532 for word in words:533 if word and not self.whiteList.isWord(word) and WantWhitelist:534 modifications.append((offset, offset+len(word)-1))535 offset += len(word) + 1536 cleanMessage = message537 for modStart, modStop in modifications:538 cleanMessage = cleanMessage[:modStart] + '*'*(modStop-modStart+1) + cleanMessage[modStop+1:]539 message, scrubbed = base.localAvatar.scrubTalk(cleanMessage, modifications)540 base.cr.ttiFriendsManager.sendUpdate('sendTalkWhisper', [receiverAvId, message])541 def sendAccountTalk(self, message, receiverAccount):542 error = None543 base.cr.playerFriendsManager.sendUpdate('setTalkAccount', [receiverAccount,544 0,545 '',546 message,547 [],548 0])549 return error550 def sendGuildTalk(self, message):551 error = None552 if self.checkGuildTypedChat():553 base.cr.guildManager.sendTalk(message)554 else:555 print 'Guild chat error'556 error = ERROR_NO_GUILD_CHAT557 return error558 def sendOpenSpeedChat(self, type, messageIndex):559 error = None560 if type == SPEEDCHAT_NORMAL:561 messenger.send(SCChatEvent)562 messenger.send('chatUpdateSC', [messageIndex])563 base.localAvatar.b_setSC(messageIndex)564 elif type == SPEEDCHAT_EMOTE:565 messenger.send('chatUpdateSCEmote', [messageIndex])566 messenger.send(SCEmoteChatEvent)567 base.localAvatar.b_setSCEmote(messageIndex)568 elif type == SPEEDCHAT_CUSTOM:569 messenger.send('chatUpdateSCCustom', [messageIndex])570 messenger.send(SCCustomChatEvent)571 base.localAvatar.b_setSCCustom(messageIndex)572 return error573 def sendAvatarWhisperSpeedChat(self, type, messageIndex, receiverId):574 error = None575 if type == SPEEDCHAT_NORMAL:576 base.localAvatar.whisperSCTo(messageIndex, receiverId, 0)577 message = self.SCDecoder.decodeSCStaticTextMsg(messageIndex)578 elif type == SPEEDCHAT_EMOTE:579 base.localAvatar.whisperSCEmoteTo(messageIndex, receiverId, 0)580 message = self.SCDecoder.decodeSCEmoteWhisperMsg(messageIndex, localAvatar.getName())581 elif type == SPEEDCHAT_CUSTOM:582 base.localAvatar.whisperSCCustomTo(messageIndex, receiverId, 0)583 message = self.SCDecoder.decodeSCCustomMsg(messageIndex)584 if self.logWhispers:585 avatarName = None586 accountId = None587 avatar = base.cr.identifyAvatar(receiverId)588 if avatar:589 avatarName = avatar.getName()590 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, localAvatar.doId, localAvatar.getName(), localAvatar.DISLid, localAvatar.DISLname, receiverId, avatarName, None, None, TALK_WHISPER, None)591 self.historyComplete.append(newMessage)592 self.addToHistoryDoId(newMessage, localAvatar.doId)593 messenger.send('NewOpenMessage', [newMessage])594 return error595 def sendPlayerWhisperSpeedChat(self, type, messageIndex, receiverId):596 error = None597 if type == SPEEDCHAT_NORMAL:598 base.cr.speedchatRelay.sendSpeedchat(receiverId, messageIndex)599 message = self.SCDecoder.decodeSCStaticTextMsg(messageIndex)600 elif type == SPEEDCHAT_EMOTE:601 base.cr.speedchatRelay.sendSpeedchatEmote(receiverId, messageIndex)602 message = self.SCDecoder.decodeSCEmoteWhisperMsg(messageIndex, localAvatar.getName())603 return604 elif type == SPEEDCHAT_CUSTOM:605 base.cr.speedchatRelay.sendSpeedchatCustom(receiverId, messageIndex)606 message = self.SCDecoder.decodeSCCustomMsg(messageIndex)607 if self.logWhispers:608 receiverName = self.findName(receiverId, 1)609 newMessage = TalkMessage(self.countMessage(), self.stampTime(), message, localAvatar.doId, localAvatar.getName(), localAvatar.DISLid, localAvatar.DISLname, None, None, receiverId, receiverName, TALK_ACCOUNT, None)610 self.historyComplete.append(newMessage)611 self.addToHistoryDoId(newMessage, localAvatar.doId)612 messenger.send('NewOpenMessage', [newMessage])613 return error614 def sendGuildSpeedChat(self, type, msgIndex):615 error = None616 if self.checkGuildSpeedChat():617 base.cr.guildManager.sendSC(msgIndex)618 else:619 print 'Guild Speedchat error'620 error = ERROR_NO_GUILD_CHAT621 return error622 def getWhisperReplyId(self):623 if self.lastWhisper:624 toPlayer = 0625 if self.lastWhisper == self.lastWhisperPlayerId:626 toPlayer = 1627 return (self.lastWhisper, toPlayer)...
test_util.py
Source:test_util.py
1# Protocol Buffers - Google's data interchange format2# Copyright 2008 Google Inc. All rights reserved.3# http://code.google.com/p/protobuf/4#5# Redistribution and use in source and binary forms, with or without6# modification, are permitted provided that the following conditions are7# met:8#9# * Redistributions of source code must retain the above copyright10# notice, this list of conditions and the following disclaimer.11# * Redistributions in binary form must reproduce the above12# copyright notice, this list of conditions and the following disclaimer13# in the documentation and/or other materials provided with the14# distribution.15# * Neither the name of Google Inc. nor the names of its16# contributors may be used to endorse or promote products derived from17# this software without specific prior written permission.18#19# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS20# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT21# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR22# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT23# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,24# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT25# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,26# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY27# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT28# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE29# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.30"""Utilities for Python proto2 tests.31This is intentionally modeled on C++ code in32//google/protobuf/test_util.*.33"""34__author__ = 'robinson@google.com (Will Robinson)'35import os.path36from google.protobuf import unittest_import_pb237from google.protobuf import unittest_pb238def SetAllNonLazyFields(message):39 """Sets every non-lazy field in the message to a unique value.40 Args:41 message: A unittest_pb2.TestAllTypes instance.42 """43 #44 # Optional fields.45 #46 message.optional_int32 = 10147 message.optional_int64 = 10248 message.optional_uint32 = 10349 message.optional_uint64 = 10450 message.optional_sint32 = 10551 message.optional_sint64 = 10652 message.optional_fixed32 = 10753 message.optional_fixed64 = 10854 message.optional_sfixed32 = 10955 message.optional_sfixed64 = 11056 message.optional_float = 11157 message.optional_double = 11258 message.optional_bool = True59 # TODO(robinson): Firmly spec out and test how60 # protos interact with unicode. One specific example:61 # what happens if we change the literal below to62 # u'115'? What *should* happen? Still some discussion63 # to finish with Kenton about bytes vs. strings64 # and forcing everything to be utf8. :-/65 message.optional_string = '115'66 message.optional_bytes = '116'67 message.optionalgroup.a = 11768 message.optional_nested_message.bb = 11869 message.optional_foreign_message.c = 11970 message.optional_import_message.d = 12071 message.optional_public_import_message.e = 12672 message.optional_nested_enum = unittest_pb2.TestAllTypes.BAZ73 message.optional_foreign_enum = unittest_pb2.FOREIGN_BAZ74 message.optional_import_enum = unittest_import_pb2.IMPORT_BAZ75 message.optional_string_piece = '124'76 message.optional_cord = '125'77 #78 # Repeated fields.79 #80 message.repeated_int32.append(201)81 message.repeated_int64.append(202)82 message.repeated_uint32.append(203)83 message.repeated_uint64.append(204)84 message.repeated_sint32.append(205)85 message.repeated_sint64.append(206)86 message.repeated_fixed32.append(207)87 message.repeated_fixed64.append(208)88 message.repeated_sfixed32.append(209)89 message.repeated_sfixed64.append(210)90 message.repeated_float.append(211)91 message.repeated_double.append(212)92 message.repeated_bool.append(True)93 message.repeated_string.append('215')94 message.repeated_bytes.append('216')95 message.repeatedgroup.add().a = 21796 message.repeated_nested_message.add().bb = 21897 message.repeated_foreign_message.add().c = 21998 message.repeated_import_message.add().d = 22099 message.repeated_lazy_message.add().bb = 227100 message.repeated_nested_enum.append(unittest_pb2.TestAllTypes.BAR)101 message.repeated_foreign_enum.append(unittest_pb2.FOREIGN_BAR)102 message.repeated_import_enum.append(unittest_import_pb2.IMPORT_BAR)103 message.repeated_string_piece.append('224')104 message.repeated_cord.append('225')105 # Add a second one of each field.106 message.repeated_int32.append(301)107 message.repeated_int64.append(302)108 message.repeated_uint32.append(303)109 message.repeated_uint64.append(304)110 message.repeated_sint32.append(305)111 message.repeated_sint64.append(306)112 message.repeated_fixed32.append(307)113 message.repeated_fixed64.append(308)114 message.repeated_sfixed32.append(309)115 message.repeated_sfixed64.append(310)116 message.repeated_float.append(311)117 message.repeated_double.append(312)118 message.repeated_bool.append(False)119 message.repeated_string.append('315')120 message.repeated_bytes.append('316')121 message.repeatedgroup.add().a = 317122 message.repeated_nested_message.add().bb = 318123 message.repeated_foreign_message.add().c = 319124 message.repeated_import_message.add().d = 320125 message.repeated_lazy_message.add().bb = 327126 message.repeated_nested_enum.append(unittest_pb2.TestAllTypes.BAZ)127 message.repeated_foreign_enum.append(unittest_pb2.FOREIGN_BAZ)128 message.repeated_import_enum.append(unittest_import_pb2.IMPORT_BAZ)129 message.repeated_string_piece.append('324')130 message.repeated_cord.append('325')131 #132 # Fields that have defaults.133 #134 message.default_int32 = 401135 message.default_int64 = 402136 message.default_uint32 = 403137 message.default_uint64 = 404138 message.default_sint32 = 405139 message.default_sint64 = 406140 message.default_fixed32 = 407141 message.default_fixed64 = 408142 message.default_sfixed32 = 409143 message.default_sfixed64 = 410144 message.default_float = 411145 message.default_double = 412146 message.default_bool = False147 message.default_string = '415'148 message.default_bytes = '416'149 message.default_nested_enum = unittest_pb2.TestAllTypes.FOO150 message.default_foreign_enum = unittest_pb2.FOREIGN_FOO151 message.default_import_enum = unittest_import_pb2.IMPORT_FOO152 message.default_string_piece = '424'153 message.default_cord = '425'154def SetAllFields(message):155 SetAllNonLazyFields(message)156 message.optional_lazy_message.bb = 127157def SetAllExtensions(message):158 """Sets every extension in the message to a unique value.159 Args:160 message: A unittest_pb2.TestAllExtensions instance.161 """162 extensions = message.Extensions163 pb2 = unittest_pb2164 import_pb2 = unittest_import_pb2165 #166 # Optional fields.167 #168 extensions[pb2.optional_int32_extension] = 101169 extensions[pb2.optional_int64_extension] = 102170 extensions[pb2.optional_uint32_extension] = 103171 extensions[pb2.optional_uint64_extension] = 104172 extensions[pb2.optional_sint32_extension] = 105173 extensions[pb2.optional_sint64_extension] = 106174 extensions[pb2.optional_fixed32_extension] = 107175 extensions[pb2.optional_fixed64_extension] = 108176 extensions[pb2.optional_sfixed32_extension] = 109177 extensions[pb2.optional_sfixed64_extension] = 110178 extensions[pb2.optional_float_extension] = 111179 extensions[pb2.optional_double_extension] = 112180 extensions[pb2.optional_bool_extension] = True181 extensions[pb2.optional_string_extension] = '115'182 extensions[pb2.optional_bytes_extension] = '116'183 extensions[pb2.optionalgroup_extension].a = 117184 extensions[pb2.optional_nested_message_extension].bb = 118185 extensions[pb2.optional_foreign_message_extension].c = 119186 extensions[pb2.optional_import_message_extension].d = 120187 extensions[pb2.optional_public_import_message_extension].e = 126188 extensions[pb2.optional_lazy_message_extension].bb = 127189 extensions[pb2.optional_nested_enum_extension] = pb2.TestAllTypes.BAZ190 extensions[pb2.optional_nested_enum_extension] = pb2.TestAllTypes.BAZ191 extensions[pb2.optional_foreign_enum_extension] = pb2.FOREIGN_BAZ192 extensions[pb2.optional_import_enum_extension] = import_pb2.IMPORT_BAZ193 extensions[pb2.optional_string_piece_extension] = '124'194 extensions[pb2.optional_cord_extension] = '125'195 #196 # Repeated fields.197 #198 extensions[pb2.repeated_int32_extension].append(201)199 extensions[pb2.repeated_int64_extension].append(202)200 extensions[pb2.repeated_uint32_extension].append(203)201 extensions[pb2.repeated_uint64_extension].append(204)202 extensions[pb2.repeated_sint32_extension].append(205)203 extensions[pb2.repeated_sint64_extension].append(206)204 extensions[pb2.repeated_fixed32_extension].append(207)205 extensions[pb2.repeated_fixed64_extension].append(208)206 extensions[pb2.repeated_sfixed32_extension].append(209)207 extensions[pb2.repeated_sfixed64_extension].append(210)208 extensions[pb2.repeated_float_extension].append(211)209 extensions[pb2.repeated_double_extension].append(212)210 extensions[pb2.repeated_bool_extension].append(True)211 extensions[pb2.repeated_string_extension].append('215')212 extensions[pb2.repeated_bytes_extension].append('216')213 extensions[pb2.repeatedgroup_extension].add().a = 217214 extensions[pb2.repeated_nested_message_extension].add().bb = 218215 extensions[pb2.repeated_foreign_message_extension].add().c = 219216 extensions[pb2.repeated_import_message_extension].add().d = 220217 extensions[pb2.repeated_lazy_message_extension].add().bb = 227218 extensions[pb2.repeated_nested_enum_extension].append(pb2.TestAllTypes.BAR)219 extensions[pb2.repeated_foreign_enum_extension].append(pb2.FOREIGN_BAR)220 extensions[pb2.repeated_import_enum_extension].append(import_pb2.IMPORT_BAR)221 extensions[pb2.repeated_string_piece_extension].append('224')222 extensions[pb2.repeated_cord_extension].append('225')223 # Append a second one of each field.224 extensions[pb2.repeated_int32_extension].append(301)225 extensions[pb2.repeated_int64_extension].append(302)226 extensions[pb2.repeated_uint32_extension].append(303)227 extensions[pb2.repeated_uint64_extension].append(304)228 extensions[pb2.repeated_sint32_extension].append(305)229 extensions[pb2.repeated_sint64_extension].append(306)230 extensions[pb2.repeated_fixed32_extension].append(307)231 extensions[pb2.repeated_fixed64_extension].append(308)232 extensions[pb2.repeated_sfixed32_extension].append(309)233 extensions[pb2.repeated_sfixed64_extension].append(310)234 extensions[pb2.repeated_float_extension].append(311)235 extensions[pb2.repeated_double_extension].append(312)236 extensions[pb2.repeated_bool_extension].append(False)237 extensions[pb2.repeated_string_extension].append('315')238 extensions[pb2.repeated_bytes_extension].append('316')239 extensions[pb2.repeatedgroup_extension].add().a = 317240 extensions[pb2.repeated_nested_message_extension].add().bb = 318241 extensions[pb2.repeated_foreign_message_extension].add().c = 319242 extensions[pb2.repeated_import_message_extension].add().d = 320243 extensions[pb2.repeated_lazy_message_extension].add().bb = 327244 extensions[pb2.repeated_nested_enum_extension].append(pb2.TestAllTypes.BAZ)245 extensions[pb2.repeated_foreign_enum_extension].append(pb2.FOREIGN_BAZ)246 extensions[pb2.repeated_import_enum_extension].append(import_pb2.IMPORT_BAZ)247 extensions[pb2.repeated_string_piece_extension].append('324')248 extensions[pb2.repeated_cord_extension].append('325')249 #250 # Fields with defaults.251 #252 extensions[pb2.default_int32_extension] = 401253 extensions[pb2.default_int64_extension] = 402254 extensions[pb2.default_uint32_extension] = 403255 extensions[pb2.default_uint64_extension] = 404256 extensions[pb2.default_sint32_extension] = 405257 extensions[pb2.default_sint64_extension] = 406258 extensions[pb2.default_fixed32_extension] = 407259 extensions[pb2.default_fixed64_extension] = 408260 extensions[pb2.default_sfixed32_extension] = 409261 extensions[pb2.default_sfixed64_extension] = 410262 extensions[pb2.default_float_extension] = 411263 extensions[pb2.default_double_extension] = 412264 extensions[pb2.default_bool_extension] = False265 extensions[pb2.default_string_extension] = '415'266 extensions[pb2.default_bytes_extension] = '416'267 extensions[pb2.default_nested_enum_extension] = pb2.TestAllTypes.FOO268 extensions[pb2.default_foreign_enum_extension] = pb2.FOREIGN_FOO269 extensions[pb2.default_import_enum_extension] = import_pb2.IMPORT_FOO270 extensions[pb2.default_string_piece_extension] = '424'271 extensions[pb2.default_cord_extension] = '425'272def SetAllFieldsAndExtensions(message):273 """Sets every field and extension in the message to a unique value.274 Args:275 message: A unittest_pb2.TestAllExtensions message.276 """277 message.my_int = 1278 message.my_string = 'foo'279 message.my_float = 1.0280 message.Extensions[unittest_pb2.my_extension_int] = 23281 message.Extensions[unittest_pb2.my_extension_string] = 'bar'282def ExpectAllFieldsAndExtensionsInOrder(serialized):283 """Ensures that serialized is the serialization we expect for a message284 filled with SetAllFieldsAndExtensions(). (Specifically, ensures that the285 serialization is in canonical, tag-number order).286 """287 my_extension_int = unittest_pb2.my_extension_int288 my_extension_string = unittest_pb2.my_extension_string289 expected_strings = []290 message = unittest_pb2.TestFieldOrderings()291 message.my_int = 1 # Field 1.292 expected_strings.append(message.SerializeToString())293 message.Clear()294 message.Extensions[my_extension_int] = 23 # Field 5.295 expected_strings.append(message.SerializeToString())296 message.Clear()297 message.my_string = 'foo' # Field 11.298 expected_strings.append(message.SerializeToString())299 message.Clear()300 message.Extensions[my_extension_string] = 'bar' # Field 50.301 expected_strings.append(message.SerializeToString())302 message.Clear()303 message.my_float = 1.0304 expected_strings.append(message.SerializeToString())305 message.Clear()306 expected = ''.join(expected_strings)307 if expected != serialized:308 raise ValueError('Expected %r, found %r' % (expected, serialized))309def ExpectAllFieldsSet(test_case, message):310 """Check all fields for correct values have after Set*Fields() is called."""311 test_case.assertTrue(message.HasField('optional_int32'))312 test_case.assertTrue(message.HasField('optional_int64'))313 test_case.assertTrue(message.HasField('optional_uint32'))314 test_case.assertTrue(message.HasField('optional_uint64'))315 test_case.assertTrue(message.HasField('optional_sint32'))316 test_case.assertTrue(message.HasField('optional_sint64'))317 test_case.assertTrue(message.HasField('optional_fixed32'))318 test_case.assertTrue(message.HasField('optional_fixed64'))319 test_case.assertTrue(message.HasField('optional_sfixed32'))320 test_case.assertTrue(message.HasField('optional_sfixed64'))321 test_case.assertTrue(message.HasField('optional_float'))322 test_case.assertTrue(message.HasField('optional_double'))323 test_case.assertTrue(message.HasField('optional_bool'))324 test_case.assertTrue(message.HasField('optional_string'))325 test_case.assertTrue(message.HasField('optional_bytes'))326 test_case.assertTrue(message.HasField('optionalgroup'))327 test_case.assertTrue(message.HasField('optional_nested_message'))328 test_case.assertTrue(message.HasField('optional_foreign_message'))329 test_case.assertTrue(message.HasField('optional_import_message'))330 test_case.assertTrue(message.optionalgroup.HasField('a'))331 test_case.assertTrue(message.optional_nested_message.HasField('bb'))332 test_case.assertTrue(message.optional_foreign_message.HasField('c'))333 test_case.assertTrue(message.optional_import_message.HasField('d'))334 test_case.assertTrue(message.HasField('optional_nested_enum'))335 test_case.assertTrue(message.HasField('optional_foreign_enum'))336 test_case.assertTrue(message.HasField('optional_import_enum'))337 test_case.assertTrue(message.HasField('optional_string_piece'))338 test_case.assertTrue(message.HasField('optional_cord'))339 test_case.assertEqual(101, message.optional_int32)340 test_case.assertEqual(102, message.optional_int64)341 test_case.assertEqual(103, message.optional_uint32)342 test_case.assertEqual(104, message.optional_uint64)343 test_case.assertEqual(105, message.optional_sint32)344 test_case.assertEqual(106, message.optional_sint64)345 test_case.assertEqual(107, message.optional_fixed32)346 test_case.assertEqual(108, message.optional_fixed64)347 test_case.assertEqual(109, message.optional_sfixed32)348 test_case.assertEqual(110, message.optional_sfixed64)349 test_case.assertEqual(111, message.optional_float)350 test_case.assertEqual(112, message.optional_double)351 test_case.assertEqual(True, message.optional_bool)352 test_case.assertEqual('115', message.optional_string)353 test_case.assertEqual('116', message.optional_bytes)354 test_case.assertEqual(117, message.optionalgroup.a)355 test_case.assertEqual(118, message.optional_nested_message.bb)356 test_case.assertEqual(119, message.optional_foreign_message.c)357 test_case.assertEqual(120, message.optional_import_message.d)358 test_case.assertEqual(126, message.optional_public_import_message.e)359 test_case.assertEqual(127, message.optional_lazy_message.bb)360 test_case.assertEqual(unittest_pb2.TestAllTypes.BAZ,361 message.optional_nested_enum)362 test_case.assertEqual(unittest_pb2.FOREIGN_BAZ,363 message.optional_foreign_enum)364 test_case.assertEqual(unittest_import_pb2.IMPORT_BAZ,365 message.optional_import_enum)366 # -----------------------------------------------------------------367 test_case.assertEqual(2, len(message.repeated_int32))368 test_case.assertEqual(2, len(message.repeated_int64))369 test_case.assertEqual(2, len(message.repeated_uint32))370 test_case.assertEqual(2, len(message.repeated_uint64))371 test_case.assertEqual(2, len(message.repeated_sint32))372 test_case.assertEqual(2, len(message.repeated_sint64))373 test_case.assertEqual(2, len(message.repeated_fixed32))374 test_case.assertEqual(2, len(message.repeated_fixed64))375 test_case.assertEqual(2, len(message.repeated_sfixed32))376 test_case.assertEqual(2, len(message.repeated_sfixed64))377 test_case.assertEqual(2, len(message.repeated_float))378 test_case.assertEqual(2, len(message.repeated_double))379 test_case.assertEqual(2, len(message.repeated_bool))380 test_case.assertEqual(2, len(message.repeated_string))381 test_case.assertEqual(2, len(message.repeated_bytes))382 test_case.assertEqual(2, len(message.repeatedgroup))383 test_case.assertEqual(2, len(message.repeated_nested_message))384 test_case.assertEqual(2, len(message.repeated_foreign_message))385 test_case.assertEqual(2, len(message.repeated_import_message))386 test_case.assertEqual(2, len(message.repeated_nested_enum))387 test_case.assertEqual(2, len(message.repeated_foreign_enum))388 test_case.assertEqual(2, len(message.repeated_import_enum))389 test_case.assertEqual(2, len(message.repeated_string_piece))390 test_case.assertEqual(2, len(message.repeated_cord))391 test_case.assertEqual(201, message.repeated_int32[0])392 test_case.assertEqual(202, message.repeated_int64[0])393 test_case.assertEqual(203, message.repeated_uint32[0])394 test_case.assertEqual(204, message.repeated_uint64[0])395 test_case.assertEqual(205, message.repeated_sint32[0])396 test_case.assertEqual(206, message.repeated_sint64[0])397 test_case.assertEqual(207, message.repeated_fixed32[0])398 test_case.assertEqual(208, message.repeated_fixed64[0])399 test_case.assertEqual(209, message.repeated_sfixed32[0])400 test_case.assertEqual(210, message.repeated_sfixed64[0])401 test_case.assertEqual(211, message.repeated_float[0])402 test_case.assertEqual(212, message.repeated_double[0])403 test_case.assertEqual(True, message.repeated_bool[0])404 test_case.assertEqual('215', message.repeated_string[0])405 test_case.assertEqual('216', message.repeated_bytes[0])406 test_case.assertEqual(217, message.repeatedgroup[0].a)407 test_case.assertEqual(218, message.repeated_nested_message[0].bb)408 test_case.assertEqual(219, message.repeated_foreign_message[0].c)409 test_case.assertEqual(220, message.repeated_import_message[0].d)410 test_case.assertEqual(227, message.repeated_lazy_message[0].bb)411 test_case.assertEqual(unittest_pb2.TestAllTypes.BAR,412 message.repeated_nested_enum[0])413 test_case.assertEqual(unittest_pb2.FOREIGN_BAR,414 message.repeated_foreign_enum[0])415 test_case.assertEqual(unittest_import_pb2.IMPORT_BAR,416 message.repeated_import_enum[0])417 test_case.assertEqual(301, message.repeated_int32[1])418 test_case.assertEqual(302, message.repeated_int64[1])419 test_case.assertEqual(303, message.repeated_uint32[1])420 test_case.assertEqual(304, message.repeated_uint64[1])421 test_case.assertEqual(305, message.repeated_sint32[1])422 test_case.assertEqual(306, message.repeated_sint64[1])423 test_case.assertEqual(307, message.repeated_fixed32[1])424 test_case.assertEqual(308, message.repeated_fixed64[1])425 test_case.assertEqual(309, message.repeated_sfixed32[1])426 test_case.assertEqual(310, message.repeated_sfixed64[1])427 test_case.assertEqual(311, message.repeated_float[1])428 test_case.assertEqual(312, message.repeated_double[1])429 test_case.assertEqual(False, message.repeated_bool[1])430 test_case.assertEqual('315', message.repeated_string[1])431 test_case.assertEqual('316', message.repeated_bytes[1])432 test_case.assertEqual(317, message.repeatedgroup[1].a)433 test_case.assertEqual(318, message.repeated_nested_message[1].bb)434 test_case.assertEqual(319, message.repeated_foreign_message[1].c)435 test_case.assertEqual(320, message.repeated_import_message[1].d)436 test_case.assertEqual(327, message.repeated_lazy_message[1].bb)437 test_case.assertEqual(unittest_pb2.TestAllTypes.BAZ,438 message.repeated_nested_enum[1])439 test_case.assertEqual(unittest_pb2.FOREIGN_BAZ,440 message.repeated_foreign_enum[1])441 test_case.assertEqual(unittest_import_pb2.IMPORT_BAZ,442 message.repeated_import_enum[1])443 # -----------------------------------------------------------------444 test_case.assertTrue(message.HasField('default_int32'))445 test_case.assertTrue(message.HasField('default_int64'))446 test_case.assertTrue(message.HasField('default_uint32'))447 test_case.assertTrue(message.HasField('default_uint64'))448 test_case.assertTrue(message.HasField('default_sint32'))449 test_case.assertTrue(message.HasField('default_sint64'))450 test_case.assertTrue(message.HasField('default_fixed32'))451 test_case.assertTrue(message.HasField('default_fixed64'))452 test_case.assertTrue(message.HasField('default_sfixed32'))453 test_case.assertTrue(message.HasField('default_sfixed64'))454 test_case.assertTrue(message.HasField('default_float'))455 test_case.assertTrue(message.HasField('default_double'))456 test_case.assertTrue(message.HasField('default_bool'))457 test_case.assertTrue(message.HasField('default_string'))458 test_case.assertTrue(message.HasField('default_bytes'))459 test_case.assertTrue(message.HasField('default_nested_enum'))460 test_case.assertTrue(message.HasField('default_foreign_enum'))461 test_case.assertTrue(message.HasField('default_import_enum'))462 test_case.assertEqual(401, message.default_int32)463 test_case.assertEqual(402, message.default_int64)464 test_case.assertEqual(403, message.default_uint32)465 test_case.assertEqual(404, message.default_uint64)466 test_case.assertEqual(405, message.default_sint32)467 test_case.assertEqual(406, message.default_sint64)468 test_case.assertEqual(407, message.default_fixed32)469 test_case.assertEqual(408, message.default_fixed64)470 test_case.assertEqual(409, message.default_sfixed32)471 test_case.assertEqual(410, message.default_sfixed64)472 test_case.assertEqual(411, message.default_float)473 test_case.assertEqual(412, message.default_double)474 test_case.assertEqual(False, message.default_bool)475 test_case.assertEqual('415', message.default_string)476 test_case.assertEqual('416', message.default_bytes)477 test_case.assertEqual(unittest_pb2.TestAllTypes.FOO,478 message.default_nested_enum)479 test_case.assertEqual(unittest_pb2.FOREIGN_FOO,480 message.default_foreign_enum)481 test_case.assertEqual(unittest_import_pb2.IMPORT_FOO,482 message.default_import_enum)483def GoldenFile(filename):484 """Finds the given golden file and returns a file object representing it."""485 # Search up the directory tree looking for the C++ protobuf source code.486 path = '.'487 while os.path.exists(path):488 if os.path.exists(os.path.join(path, 'src/google/protobuf')):489 # Found it. Load the golden file from the testdata directory.490 full_path = os.path.join(path, 'src/google/protobuf/testdata', filename)491 return open(full_path, 'rb')492 path = os.path.join(path, '..')493 raise RuntimeError(494 'Could not find golden files. This test must be run from within the '495 'protobuf source package so that it can read test data files from the '496 'C++ source tree.')497def SetAllPackedFields(message):498 """Sets every field in the message to a unique value.499 Args:500 message: A unittest_pb2.TestPackedTypes instance.501 """502 message.packed_int32.extend([601, 701])503 message.packed_int64.extend([602, 702])504 message.packed_uint32.extend([603, 703])505 message.packed_uint64.extend([604, 704])506 message.packed_sint32.extend([605, 705])507 message.packed_sint64.extend([606, 706])508 message.packed_fixed32.extend([607, 707])509 message.packed_fixed64.extend([608, 708])510 message.packed_sfixed32.extend([609, 709])511 message.packed_sfixed64.extend([610, 710])512 message.packed_float.extend([611.0, 711.0])513 message.packed_double.extend([612.0, 712.0])514 message.packed_bool.extend([True, False])515 message.packed_enum.extend([unittest_pb2.FOREIGN_BAR,516 unittest_pb2.FOREIGN_BAZ])517def SetAllPackedExtensions(message):518 """Sets every extension in the message to a unique value.519 Args:520 message: A unittest_pb2.TestPackedExtensions instance.521 """522 extensions = message.Extensions523 pb2 = unittest_pb2524 extensions[pb2.packed_int32_extension].extend([601, 701])525 extensions[pb2.packed_int64_extension].extend([602, 702])526 extensions[pb2.packed_uint32_extension].extend([603, 703])527 extensions[pb2.packed_uint64_extension].extend([604, 704])528 extensions[pb2.packed_sint32_extension].extend([605, 705])529 extensions[pb2.packed_sint64_extension].extend([606, 706])530 extensions[pb2.packed_fixed32_extension].extend([607, 707])531 extensions[pb2.packed_fixed64_extension].extend([608, 708])532 extensions[pb2.packed_sfixed32_extension].extend([609, 709])533 extensions[pb2.packed_sfixed64_extension].extend([610, 710])534 extensions[pb2.packed_float_extension].extend([611.0, 711.0])535 extensions[pb2.packed_double_extension].extend([612.0, 712.0])536 extensions[pb2.packed_bool_extension].extend([True, False])537 extensions[pb2.packed_enum_extension].extend([unittest_pb2.FOREIGN_BAR,538 unittest_pb2.FOREIGN_BAZ])539def SetAllUnpackedFields(message):540 """Sets every field in the message to a unique value.541 Args:542 message: A unittest_pb2.TestUnpackedTypes instance.543 """544 message.unpacked_int32.extend([601, 701])545 message.unpacked_int64.extend([602, 702])546 message.unpacked_uint32.extend([603, 703])547 message.unpacked_uint64.extend([604, 704])548 message.unpacked_sint32.extend([605, 705])549 message.unpacked_sint64.extend([606, 706])550 message.unpacked_fixed32.extend([607, 707])551 message.unpacked_fixed64.extend([608, 708])552 message.unpacked_sfixed32.extend([609, 709])553 message.unpacked_sfixed64.extend([610, 710])554 message.unpacked_float.extend([611.0, 711.0])555 message.unpacked_double.extend([612.0, 712.0])556 message.unpacked_bool.extend([True, False])557 message.unpacked_enum.extend([unittest_pb2.FOREIGN_BAR,...
text_format_test.py
Source:text_format_test.py
1#! /usr/bin/python2#3# Protocol Buffers - Google's data interchange format4# Copyright 2008 Google Inc. All rights reserved.5# http://code.google.com/p/protobuf/6#7# Redistribution and use in source and binary forms, with or without8# modification, are permitted provided that the following conditions are9# met:10#11# * Redistributions of source code must retain the above copyright12# notice, this list of conditions and the following disclaimer.13# * Redistributions in binary form must reproduce the above14# copyright notice, this list of conditions and the following disclaimer15# in the documentation and/or other materials provided with the16# distribution.17# * Neither the name of Google Inc. nor the names of its18# contributors may be used to endorse or promote products derived from19# this software without specific prior written permission.20#21# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS22# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT23# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR24# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT25# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,26# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT27# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,28# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY29# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT30# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE31# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.32"""Test for google.protobuf.text_format."""33__author__ = 'kenton@google.com (Kenton Varda)'34import difflib35import re36import unittest37from google.protobuf import text_format38from google.protobuf.internal import test_util39from google.protobuf import unittest_pb240from google.protobuf import unittest_mset_pb241class TextFormatTest(unittest.TestCase):42 def ReadGolden(self, golden_filename):43 f = test_util.GoldenFile(golden_filename)44 golden_lines = f.readlines()45 f.close()46 return golden_lines47 def CompareToGoldenFile(self, text, golden_filename):48 golden_lines = self.ReadGolden(golden_filename)49 self.CompareToGoldenLines(text, golden_lines)50 def CompareToGoldenText(self, text, golden_text):51 self.CompareToGoldenLines(text, golden_text.splitlines(1))52 def CompareToGoldenLines(self, text, golden_lines):53 actual_lines = text.splitlines(1)54 self.assertEqual(golden_lines, actual_lines,55 "Text doesn't match golden. Diff:\n" +56 ''.join(difflib.ndiff(golden_lines, actual_lines)))57 def testPrintAllFields(self):58 message = unittest_pb2.TestAllTypes()59 test_util.SetAllFields(message)60 self.CompareToGoldenFile(61 self.RemoveRedundantZeros(text_format.MessageToString(message)),62 'text_format_unittest_data.txt')63 def testPrintAllExtensions(self):64 message = unittest_pb2.TestAllExtensions()65 test_util.SetAllExtensions(message)66 self.CompareToGoldenFile(67 self.RemoveRedundantZeros(text_format.MessageToString(message)),68 'text_format_unittest_extensions_data.txt')69 def testPrintMessageSet(self):70 message = unittest_mset_pb2.TestMessageSetContainer()71 ext1 = unittest_mset_pb2.TestMessageSetExtension1.message_set_extension72 ext2 = unittest_mset_pb2.TestMessageSetExtension2.message_set_extension73 message.message_set.Extensions[ext1].i = 2374 message.message_set.Extensions[ext2].str = 'foo'75 self.CompareToGoldenText(text_format.MessageToString(message),76 'message_set {\n'77 ' [protobuf_unittest.TestMessageSetExtension1] {\n'78 ' i: 23\n'79 ' }\n'80 ' [protobuf_unittest.TestMessageSetExtension2] {\n'81 ' str: \"foo\"\n'82 ' }\n'83 '}\n')84 def testPrintBadEnumValue(self):85 message = unittest_pb2.TestAllTypes()86 message.optional_nested_enum = 10087 message.optional_foreign_enum = 10188 message.optional_import_enum = 10289 self.CompareToGoldenText(90 text_format.MessageToString(message),91 'optional_nested_enum: 100\n'92 'optional_foreign_enum: 101\n'93 'optional_import_enum: 102\n')94 def testPrintBadEnumValueExtensions(self):95 message = unittest_pb2.TestAllExtensions()96 message.Extensions[unittest_pb2.optional_nested_enum_extension] = 10097 message.Extensions[unittest_pb2.optional_foreign_enum_extension] = 10198 message.Extensions[unittest_pb2.optional_import_enum_extension] = 10299 self.CompareToGoldenText(100 text_format.MessageToString(message),101 '[protobuf_unittest.optional_nested_enum_extension]: 100\n'102 '[protobuf_unittest.optional_foreign_enum_extension]: 101\n'103 '[protobuf_unittest.optional_import_enum_extension]: 102\n')104 def testPrintExotic(self):105 message = unittest_pb2.TestAllTypes()106 message.repeated_int64.append(-9223372036854775808)107 message.repeated_uint64.append(18446744073709551615)108 message.repeated_double.append(123.456)109 message.repeated_double.append(1.23e22)110 message.repeated_double.append(1.23e-18)111 message.repeated_string.append('\000\001\a\b\f\n\r\t\v\\\'"')112 message.repeated_string.append(u'\u00fc\ua71f')113 self.CompareToGoldenText(114 self.RemoveRedundantZeros(text_format.MessageToString(message)),115 'repeated_int64: -9223372036854775808\n'116 'repeated_uint64: 18446744073709551615\n'117 'repeated_double: 123.456\n'118 'repeated_double: 1.23e+22\n'119 'repeated_double: 1.23e-18\n'120 'repeated_string: '121 '"\\000\\001\\007\\010\\014\\n\\r\\t\\013\\\\\\\'\\""\n'122 'repeated_string: "\\303\\274\\352\\234\\237"\n')123 def testPrintNestedMessageAsOneLine(self):124 message = unittest_pb2.TestAllTypes()125 msg = message.repeated_nested_message.add()126 msg.bb = 42;127 self.CompareToGoldenText(128 text_format.MessageToString(message, as_one_line=True),129 'repeated_nested_message { bb: 42 }')130 def testPrintRepeatedFieldsAsOneLine(self):131 message = unittest_pb2.TestAllTypes()132 message.repeated_int32.append(1)133 message.repeated_int32.append(1)134 message.repeated_int32.append(3)135 message.repeated_string.append("Google")136 message.repeated_string.append("Zurich")137 self.CompareToGoldenText(138 text_format.MessageToString(message, as_one_line=True),139 'repeated_int32: 1 repeated_int32: 1 repeated_int32: 3 '140 'repeated_string: "Google" repeated_string: "Zurich"')141 def testPrintNestedNewLineInStringAsOneLine(self):142 message = unittest_pb2.TestAllTypes()143 message.optional_string = "a\nnew\nline"144 self.CompareToGoldenText(145 text_format.MessageToString(message, as_one_line=True),146 'optional_string: "a\\nnew\\nline"')147 def testPrintMessageSetAsOneLine(self):148 message = unittest_mset_pb2.TestMessageSetContainer()149 ext1 = unittest_mset_pb2.TestMessageSetExtension1.message_set_extension150 ext2 = unittest_mset_pb2.TestMessageSetExtension2.message_set_extension151 message.message_set.Extensions[ext1].i = 23152 message.message_set.Extensions[ext2].str = 'foo'153 self.CompareToGoldenText(154 text_format.MessageToString(message, as_one_line=True),155 'message_set {'156 ' [protobuf_unittest.TestMessageSetExtension1] {'157 ' i: 23'158 ' }'159 ' [protobuf_unittest.TestMessageSetExtension2] {'160 ' str: \"foo\"'161 ' }'162 ' }')163 def testPrintExoticAsOneLine(self):164 message = unittest_pb2.TestAllTypes()165 message.repeated_int64.append(-9223372036854775808)166 message.repeated_uint64.append(18446744073709551615)167 message.repeated_double.append(123.456)168 message.repeated_double.append(1.23e22)169 message.repeated_double.append(1.23e-18)170 message.repeated_string.append('\000\001\a\b\f\n\r\t\v\\\'"')171 message.repeated_string.append(u'\u00fc\ua71f')172 self.CompareToGoldenText(173 self.RemoveRedundantZeros(174 text_format.MessageToString(message, as_one_line=True)),175 'repeated_int64: -9223372036854775808'176 ' repeated_uint64: 18446744073709551615'177 ' repeated_double: 123.456'178 ' repeated_double: 1.23e+22'179 ' repeated_double: 1.23e-18'180 ' repeated_string: '181 '"\\000\\001\\007\\010\\014\\n\\r\\t\\013\\\\\\\'\\""'182 ' repeated_string: "\\303\\274\\352\\234\\237"')183 def testRoundTripExoticAsOneLine(self):184 message = unittest_pb2.TestAllTypes()185 message.repeated_int64.append(-9223372036854775808)186 message.repeated_uint64.append(18446744073709551615)187 message.repeated_double.append(123.456)188 message.repeated_double.append(1.23e22)189 message.repeated_double.append(1.23e-18)190 message.repeated_string.append('\000\001\a\b\f\n\r\t\v\\\'"')191 message.repeated_string.append(u'\u00fc\ua71f')192 # Test as_utf8 = False.193 wire_text = text_format.MessageToString(194 message, as_one_line=True, as_utf8=False)195 parsed_message = unittest_pb2.TestAllTypes()196 text_format.Merge(wire_text, parsed_message)197 self.assertEquals(message, parsed_message)198 # Test as_utf8 = True.199 wire_text = text_format.MessageToString(200 message, as_one_line=True, as_utf8=True)201 parsed_message = unittest_pb2.TestAllTypes()202 text_format.Merge(wire_text, parsed_message)203 self.assertEquals(message, parsed_message)204 def testPrintRawUtf8String(self):205 message = unittest_pb2.TestAllTypes()206 message.repeated_string.append(u'\u00fc\ua71f')207 text = text_format.MessageToString(message, as_utf8 = True)208 self.CompareToGoldenText(text, 'repeated_string: "\303\274\352\234\237"\n')209 parsed_message = unittest_pb2.TestAllTypes()210 text_format.Merge(text, parsed_message)211 self.assertEquals(message, parsed_message)212 def testMessageToString(self):213 message = unittest_pb2.ForeignMessage()214 message.c = 123215 self.assertEqual('c: 123\n', str(message))216 def RemoveRedundantZeros(self, text):217 # Some platforms print 1e+5 as 1e+005. This is fine, but we need to remove218 # these zeros in order to match the golden file.219 text = text.replace('e+0','e+').replace('e+0','e+') \220 .replace('e-0','e-').replace('e-0','e-')221 # Floating point fields are printed with .0 suffix even if they are222 # actualy integer numbers.223 text = re.compile('\.0$', re.MULTILINE).sub('', text)224 return text225 def testMergeGolden(self):226 golden_text = '\n'.join(self.ReadGolden('text_format_unittest_data.txt'))227 parsed_message = unittest_pb2.TestAllTypes()228 text_format.Merge(golden_text, parsed_message)229 message = unittest_pb2.TestAllTypes()230 test_util.SetAllFields(message)231 self.assertEquals(message, parsed_message)232 def testMergeGoldenExtensions(self):233 golden_text = '\n'.join(self.ReadGolden(234 'text_format_unittest_extensions_data.txt'))235 parsed_message = unittest_pb2.TestAllExtensions()236 text_format.Merge(golden_text, parsed_message)237 message = unittest_pb2.TestAllExtensions()238 test_util.SetAllExtensions(message)239 self.assertEquals(message, parsed_message)240 def testMergeAllFields(self):241 message = unittest_pb2.TestAllTypes()242 test_util.SetAllFields(message)243 ascii_text = text_format.MessageToString(message)244 parsed_message = unittest_pb2.TestAllTypes()245 text_format.Merge(ascii_text, parsed_message)246 self.assertEqual(message, parsed_message)247 test_util.ExpectAllFieldsSet(self, message)248 def testMergeAllExtensions(self):249 message = unittest_pb2.TestAllExtensions()250 test_util.SetAllExtensions(message)251 ascii_text = text_format.MessageToString(message)252 parsed_message = unittest_pb2.TestAllExtensions()253 text_format.Merge(ascii_text, parsed_message)254 self.assertEqual(message, parsed_message)255 def testMergeMessageSet(self):256 message = unittest_pb2.TestAllTypes()257 text = ('repeated_uint64: 1\n'258 'repeated_uint64: 2\n')259 text_format.Merge(text, message)260 self.assertEqual(1, message.repeated_uint64[0])261 self.assertEqual(2, message.repeated_uint64[1])262 message = unittest_mset_pb2.TestMessageSetContainer()263 text = ('message_set {\n'264 ' [protobuf_unittest.TestMessageSetExtension1] {\n'265 ' i: 23\n'266 ' }\n'267 ' [protobuf_unittest.TestMessageSetExtension2] {\n'268 ' str: \"foo\"\n'269 ' }\n'270 '}\n')271 text_format.Merge(text, message)272 ext1 = unittest_mset_pb2.TestMessageSetExtension1.message_set_extension273 ext2 = unittest_mset_pb2.TestMessageSetExtension2.message_set_extension274 self.assertEquals(23, message.message_set.Extensions[ext1].i)275 self.assertEquals('foo', message.message_set.Extensions[ext2].str)276 def testMergeExotic(self):277 message = unittest_pb2.TestAllTypes()278 text = ('repeated_int64: -9223372036854775808\n'279 'repeated_uint64: 18446744073709551615\n'280 'repeated_double: 123.456\n'281 'repeated_double: 1.23e+22\n'282 'repeated_double: 1.23e-18\n'283 'repeated_string: \n'284 '"\\000\\001\\007\\010\\014\\n\\r\\t\\013\\\\\\\'\\""\n'285 'repeated_string: "foo" \'corge\' "grault"\n'286 'repeated_string: "\\303\\274\\352\\234\\237"\n'287 'repeated_string: "\\xc3\\xbc"\n'288 'repeated_string: "\xc3\xbc"\n')289 text_format.Merge(text, message)290 self.assertEqual(-9223372036854775808, message.repeated_int64[0])291 self.assertEqual(18446744073709551615, message.repeated_uint64[0])292 self.assertEqual(123.456, message.repeated_double[0])293 self.assertEqual(1.23e22, message.repeated_double[1])294 self.assertEqual(1.23e-18, message.repeated_double[2])295 self.assertEqual(296 '\000\001\a\b\f\n\r\t\v\\\'"', message.repeated_string[0])297 self.assertEqual('foocorgegrault', message.repeated_string[1])298 self.assertEqual(u'\u00fc\ua71f', message.repeated_string[2])299 self.assertEqual(u'\u00fc', message.repeated_string[3])300 def testMergeEmptyText(self):301 message = unittest_pb2.TestAllTypes()302 text = ''303 text_format.Merge(text, message)304 self.assertEquals(unittest_pb2.TestAllTypes(), message)305 def testMergeInvalidUtf8(self):306 message = unittest_pb2.TestAllTypes()307 text = 'repeated_string: "\\xc3\\xc3"'308 self.assertRaises(text_format.ParseError, text_format.Merge, text, message)309 def testMergeSingleWord(self):310 message = unittest_pb2.TestAllTypes()311 text = 'foo'312 self.assertRaisesWithMessage(313 text_format.ParseError,314 ('1:1 : Message type "protobuf_unittest.TestAllTypes" has no field named '315 '"foo".'),316 text_format.Merge, text, message)317 def testMergeUnknownField(self):318 message = unittest_pb2.TestAllTypes()319 text = 'unknown_field: 8\n'320 self.assertRaisesWithMessage(321 text_format.ParseError,322 ('1:1 : Message type "protobuf_unittest.TestAllTypes" has no field named '323 '"unknown_field".'),324 text_format.Merge, text, message)325 def testMergeBadExtension(self):326 message = unittest_pb2.TestAllExtensions()327 text = '[unknown_extension]: 8\n'328 self.assertRaisesWithMessage(329 text_format.ParseError,330 '1:2 : Extension "unknown_extension" not registered.',331 text_format.Merge, text, message)332 message = unittest_pb2.TestAllTypes()333 self.assertRaisesWithMessage(334 text_format.ParseError,335 ('1:2 : Message type "protobuf_unittest.TestAllTypes" does not have '336 'extensions.'),337 text_format.Merge, text, message)338 def testMergeGroupNotClosed(self):339 message = unittest_pb2.TestAllTypes()340 text = 'RepeatedGroup: <'341 self.assertRaisesWithMessage(342 text_format.ParseError, '1:16 : Expected ">".',343 text_format.Merge, text, message)344 text = 'RepeatedGroup: {'345 self.assertRaisesWithMessage(346 text_format.ParseError, '1:16 : Expected "}".',347 text_format.Merge, text, message)348 def testMergeEmptyGroup(self):349 message = unittest_pb2.TestAllTypes()350 text = 'OptionalGroup: {}'351 text_format.Merge(text, message)352 self.assertTrue(message.HasField('optionalgroup'))353 message.Clear()354 message = unittest_pb2.TestAllTypes()355 text = 'OptionalGroup: <>'356 text_format.Merge(text, message)357 self.assertTrue(message.HasField('optionalgroup'))358 def testMergeBadEnumValue(self):359 message = unittest_pb2.TestAllTypes()360 text = 'optional_nested_enum: BARR'361 self.assertRaisesWithMessage(362 text_format.ParseError,363 ('1:23 : Enum type "protobuf_unittest.TestAllTypes.NestedEnum" '364 'has no value named BARR.'),365 text_format.Merge, text, message)366 message = unittest_pb2.TestAllTypes()367 text = 'optional_nested_enum: 100'368 self.assertRaisesWithMessage(369 text_format.ParseError,370 ('1:23 : Enum type "protobuf_unittest.TestAllTypes.NestedEnum" '371 'has no value with number 100.'),372 text_format.Merge, text, message)373 def testMergeBadIntValue(self):374 message = unittest_pb2.TestAllTypes()375 text = 'optional_int32: bork'376 self.assertRaisesWithMessage(377 text_format.ParseError,378 ('1:17 : Couldn\'t parse integer: bork'),379 text_format.Merge, text, message)380 def testMergeStringFieldUnescape(self):381 message = unittest_pb2.TestAllTypes()382 text = r'''repeated_string: "\xf\x62"383 repeated_string: "\\xf\\x62"384 repeated_string: "\\\xf\\\x62"385 repeated_string: "\\\\xf\\\\x62"386 repeated_string: "\\\\\xf\\\\\x62"387 repeated_string: "\x5cx20"'''388 text_format.Merge(text, message)389 SLASH = '\\'390 self.assertEqual('\x0fb', message.repeated_string[0])391 self.assertEqual(SLASH + 'xf' + SLASH + 'x62', message.repeated_string[1])392 self.assertEqual(SLASH + '\x0f' + SLASH + 'b', message.repeated_string[2])393 self.assertEqual(SLASH + SLASH + 'xf' + SLASH + SLASH + 'x62',394 message.repeated_string[3])395 self.assertEqual(SLASH + SLASH + '\x0f' + SLASH + SLASH + 'b',396 message.repeated_string[4])397 self.assertEqual(SLASH + 'x20', message.repeated_string[5])398 def assertRaisesWithMessage(self, e_class, e, func, *args, **kwargs):399 """Same as assertRaises, but also compares the exception message."""400 if hasattr(e_class, '__name__'):401 exc_name = e_class.__name__402 else:403 exc_name = str(e_class)404 try:405 func(*args, **kwargs)406 except e_class as expr:407 if str(expr) != e:408 msg = '%s raised, but with wrong message: "%s" instead of "%s"'409 raise self.failureException(msg % (exc_name,410 str(expr).encode('string_escape'),411 e.encode('string_escape')))412 return413 else:414 raise self.failureException('%s not raised' % exc_name)415class TokenizerTest(unittest.TestCase):416 def testSimpleTokenCases(self):417 text = ('identifier1:"string1"\n \n\n'418 'identifier2 : \n \n123 \n identifier3 :\'string\'\n'419 'identifiER_4 : 1.1e+2 ID5:-0.23 ID6:\'aaaa\\\'bbbb\'\n'420 'ID7 : "aa\\"bb"\n\n\n\n ID8: {A:inf B:-inf C:true D:false}\n'421 'ID9: 22 ID10: -111111111111111111 ID11: -22\n'422 'ID12: 2222222222222222222 ID13: 1.23456f ID14: 1.2e+2f '423 'false_bool: 0 true_BOOL:t \n true_bool1: 1 false_BOOL1:f ' )424 tokenizer = text_format._Tokenizer(text)425 methods = [(tokenizer.ConsumeIdentifier, 'identifier1'),426 ':',427 (tokenizer.ConsumeString, 'string1'),428 (tokenizer.ConsumeIdentifier, 'identifier2'),429 ':',430 (tokenizer.ConsumeInt32, 123),431 (tokenizer.ConsumeIdentifier, 'identifier3'),432 ':',433 (tokenizer.ConsumeString, 'string'),434 (tokenizer.ConsumeIdentifier, 'identifiER_4'),435 ':',436 (tokenizer.ConsumeFloat, 1.1e+2),437 (tokenizer.ConsumeIdentifier, 'ID5'),438 ':',439 (tokenizer.ConsumeFloat, -0.23),440 (tokenizer.ConsumeIdentifier, 'ID6'),441 ':',442 (tokenizer.ConsumeString, 'aaaa\'bbbb'),443 (tokenizer.ConsumeIdentifier, 'ID7'),444 ':',445 (tokenizer.ConsumeString, 'aa\"bb'),446 (tokenizer.ConsumeIdentifier, 'ID8'),447 ':',448 '{',449 (tokenizer.ConsumeIdentifier, 'A'),450 ':',451 (tokenizer.ConsumeFloat, float('inf')),452 (tokenizer.ConsumeIdentifier, 'B'),453 ':',454 (tokenizer.ConsumeFloat, -float('inf')),455 (tokenizer.ConsumeIdentifier, 'C'),456 ':',457 (tokenizer.ConsumeBool, True),458 (tokenizer.ConsumeIdentifier, 'D'),459 ':',460 (tokenizer.ConsumeBool, False),461 '}',462 (tokenizer.ConsumeIdentifier, 'ID9'),463 ':',464 (tokenizer.ConsumeUint32, 22),465 (tokenizer.ConsumeIdentifier, 'ID10'),466 ':',467 (tokenizer.ConsumeInt64, -111111111111111111),468 (tokenizer.ConsumeIdentifier, 'ID11'),469 ':',470 (tokenizer.ConsumeInt32, -22),471 (tokenizer.ConsumeIdentifier, 'ID12'),472 ':',473 (tokenizer.ConsumeUint64, 2222222222222222222),474 (tokenizer.ConsumeIdentifier, 'ID13'),475 ':',476 (tokenizer.ConsumeFloat, 1.23456),477 (tokenizer.ConsumeIdentifier, 'ID14'),478 ':',479 (tokenizer.ConsumeFloat, 1.2e+2),480 (tokenizer.ConsumeIdentifier, 'false_bool'),481 ':',482 (tokenizer.ConsumeBool, False),483 (tokenizer.ConsumeIdentifier, 'true_BOOL'),484 ':',485 (tokenizer.ConsumeBool, True),486 (tokenizer.ConsumeIdentifier, 'true_bool1'),487 ':',488 (tokenizer.ConsumeBool, True),489 (tokenizer.ConsumeIdentifier, 'false_BOOL1'),490 ':',491 (tokenizer.ConsumeBool, False)]492 i = 0493 while not tokenizer.AtEnd():494 m = methods[i]495 if type(m) == str:496 token = tokenizer.token497 self.assertEqual(token, m)498 tokenizer.NextToken()499 else:500 self.assertEqual(m[1], m[0]())501 i += 1502 def testConsumeIntegers(self):503 # This test only tests the failures in the integer parsing methods as well504 # as the '0' special cases.505 int64_max = (1 << 63) - 1506 uint32_max = (1 << 32) - 1507 text = '-1 %d %d' % (uint32_max + 1, int64_max + 1)508 tokenizer = text_format._Tokenizer(text)509 self.assertRaises(text_format.ParseError, tokenizer.ConsumeUint32)510 self.assertRaises(text_format.ParseError, tokenizer.ConsumeUint64)511 self.assertEqual(-1, tokenizer.ConsumeInt32())512 self.assertRaises(text_format.ParseError, tokenizer.ConsumeUint32)513 self.assertRaises(text_format.ParseError, tokenizer.ConsumeInt32)514 self.assertEqual(uint32_max + 1, tokenizer.ConsumeInt64())515 self.assertRaises(text_format.ParseError, tokenizer.ConsumeInt64)516 self.assertEqual(int64_max + 1, tokenizer.ConsumeUint64())517 self.assertTrue(tokenizer.AtEnd())518 text = '-0 -0 0 0'519 tokenizer = text_format._Tokenizer(text)520 self.assertEqual(0, tokenizer.ConsumeUint32())521 self.assertEqual(0, tokenizer.ConsumeUint64())522 self.assertEqual(0, tokenizer.ConsumeUint32())523 self.assertEqual(0, tokenizer.ConsumeUint64())524 self.assertTrue(tokenizer.AtEnd())525 def testConsumeByteString(self):526 text = '"string1\''527 tokenizer = text_format._Tokenizer(text)528 self.assertRaises(text_format.ParseError, tokenizer.ConsumeByteString)529 text = 'string1"'530 tokenizer = text_format._Tokenizer(text)531 self.assertRaises(text_format.ParseError, tokenizer.ConsumeByteString)532 text = '\n"\\xt"'533 tokenizer = text_format._Tokenizer(text)534 self.assertRaises(text_format.ParseError, tokenizer.ConsumeByteString)535 text = '\n"\\"'536 tokenizer = text_format._Tokenizer(text)537 self.assertRaises(text_format.ParseError, tokenizer.ConsumeByteString)538 text = '\n"\\x"'539 tokenizer = text_format._Tokenizer(text)540 self.assertRaises(text_format.ParseError, tokenizer.ConsumeByteString)541 def testConsumeBool(self):542 text = 'not-a-bool'543 tokenizer = text_format._Tokenizer(text)544 self.assertRaises(text_format.ParseError, tokenizer.ConsumeBool)545if __name__ == '__main__':...
message_test.py
Source:message_test.py
1#! /usr/bin/python2#3# Protocol Buffers - Google's data interchange format4# Copyright 2008 Google Inc. All rights reserved.5# http://code.google.com/p/protobuf/6#7# Redistribution and use in source and binary forms, with or without8# modification, are permitted provided that the following conditions are9# met:10#11# * Redistributions of source code must retain the above copyright12# notice, this list of conditions and the following disclaimer.13# * Redistributions in binary form must reproduce the above14# copyright notice, this list of conditions and the following disclaimer15# in the documentation and/or other materials provided with the16# distribution.17# * Neither the name of Google Inc. nor the names of its18# contributors may be used to endorse or promote products derived from19# this software without specific prior written permission.20#21# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS22# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT23# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR24# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT25# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,26# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT27# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,28# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY29# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT30# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE31# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.32"""Tests python protocol buffers against the golden message.33Note that the golden messages exercise every known field type, thus this34test ends up exercising and verifying nearly all of the parsing and35serialization code in the whole library.36TODO(kenton): Merge with wire_format_test? It doesn't make a whole lot of37sense to call this a test of the "message" module, which only declares an38abstract interface.39"""40__author__ = 'gps@google.com (Gregory P. Smith)'41import copy42import math43import operator44import pickle45import unittest46from google.protobuf import unittest_import_pb247from google.protobuf import unittest_pb248from google.protobuf.internal import api_implementation49from google.protobuf.internal import test_util50from google.protobuf import message51# Python pre-2.6 does not have isinf() or isnan() functions, so we have52# to provide our own.53def isnan(val):54 # NaN is never equal to itself.55 return val != val56def isinf(val):57 # Infinity times zero equals NaN.58 return not isnan(val) and isnan(val * 0)59def IsPosInf(val):60 return isinf(val) and (val > 0)61def IsNegInf(val):62 return isinf(val) and (val < 0)63class MessageTest(unittest.TestCase):64 def testGoldenMessage(self):65 golden_data = test_util.GoldenFile('golden_message').read()66 golden_message = unittest_pb2.TestAllTypes()67 golden_message.ParseFromString(golden_data)68 test_util.ExpectAllFieldsSet(self, golden_message)69 self.assertEqual(golden_data, golden_message.SerializeToString())70 golden_copy = copy.deepcopy(golden_message)71 self.assertEqual(golden_data, golden_copy.SerializeToString())72 def testGoldenExtensions(self):73 golden_data = test_util.GoldenFile('golden_message').read()74 golden_message = unittest_pb2.TestAllExtensions()75 golden_message.ParseFromString(golden_data)76 all_set = unittest_pb2.TestAllExtensions()77 test_util.SetAllExtensions(all_set)78 self.assertEquals(all_set, golden_message)79 self.assertEqual(golden_data, golden_message.SerializeToString())80 golden_copy = copy.deepcopy(golden_message)81 self.assertEqual(golden_data, golden_copy.SerializeToString())82 def testGoldenPackedMessage(self):83 golden_data = test_util.GoldenFile('golden_packed_fields_message').read()84 golden_message = unittest_pb2.TestPackedTypes()85 golden_message.ParseFromString(golden_data)86 all_set = unittest_pb2.TestPackedTypes()87 test_util.SetAllPackedFields(all_set)88 self.assertEquals(all_set, golden_message)89 self.assertEqual(golden_data, all_set.SerializeToString())90 golden_copy = copy.deepcopy(golden_message)91 self.assertEqual(golden_data, golden_copy.SerializeToString())92 def testGoldenPackedExtensions(self):93 golden_data = test_util.GoldenFile('golden_packed_fields_message').read()94 golden_message = unittest_pb2.TestPackedExtensions()95 golden_message.ParseFromString(golden_data)96 all_set = unittest_pb2.TestPackedExtensions()97 test_util.SetAllPackedExtensions(all_set)98 self.assertEquals(all_set, golden_message)99 self.assertEqual(golden_data, all_set.SerializeToString())100 golden_copy = copy.deepcopy(golden_message)101 self.assertEqual(golden_data, golden_copy.SerializeToString())102 def testPickleSupport(self):103 golden_data = test_util.GoldenFile('golden_message').read()104 golden_message = unittest_pb2.TestAllTypes()105 golden_message.ParseFromString(golden_data)106 pickled_message = pickle.dumps(golden_message)107 unpickled_message = pickle.loads(pickled_message)108 self.assertEquals(unpickled_message, golden_message)109 def testPickleIncompleteProto(self):110 golden_message = unittest_pb2.TestRequired(a=1)111 pickled_message = pickle.dumps(golden_message)112 unpickled_message = pickle.loads(pickled_message)113 self.assertEquals(unpickled_message, golden_message)114 self.assertEquals(unpickled_message.a, 1)115 # This is still an incomplete proto - so serializing should fail116 self.assertRaises(message.EncodeError, unpickled_message.SerializeToString)117 def testPositiveInfinity(self):118 golden_data = ('\x5D\x00\x00\x80\x7F'119 '\x61\x00\x00\x00\x00\x00\x00\xF0\x7F'120 '\xCD\x02\x00\x00\x80\x7F'121 '\xD1\x02\x00\x00\x00\x00\x00\x00\xF0\x7F')122 golden_message = unittest_pb2.TestAllTypes()123 golden_message.ParseFromString(golden_data)124 self.assertTrue(IsPosInf(golden_message.optional_float))125 self.assertTrue(IsPosInf(golden_message.optional_double))126 self.assertTrue(IsPosInf(golden_message.repeated_float[0]))127 self.assertTrue(IsPosInf(golden_message.repeated_double[0]))128 self.assertEqual(golden_data, golden_message.SerializeToString())129 def testNegativeInfinity(self):130 golden_data = ('\x5D\x00\x00\x80\xFF'131 '\x61\x00\x00\x00\x00\x00\x00\xF0\xFF'132 '\xCD\x02\x00\x00\x80\xFF'133 '\xD1\x02\x00\x00\x00\x00\x00\x00\xF0\xFF')134 golden_message = unittest_pb2.TestAllTypes()135 golden_message.ParseFromString(golden_data)136 self.assertTrue(IsNegInf(golden_message.optional_float))137 self.assertTrue(IsNegInf(golden_message.optional_double))138 self.assertTrue(IsNegInf(golden_message.repeated_float[0]))139 self.assertTrue(IsNegInf(golden_message.repeated_double[0]))140 self.assertEqual(golden_data, golden_message.SerializeToString())141 def testNotANumber(self):142 golden_data = ('\x5D\x00\x00\xC0\x7F'143 '\x61\x00\x00\x00\x00\x00\x00\xF8\x7F'144 '\xCD\x02\x00\x00\xC0\x7F'145 '\xD1\x02\x00\x00\x00\x00\x00\x00\xF8\x7F')146 golden_message = unittest_pb2.TestAllTypes()147 golden_message.ParseFromString(golden_data)148 self.assertTrue(isnan(golden_message.optional_float))149 self.assertTrue(isnan(golden_message.optional_double))150 self.assertTrue(isnan(golden_message.repeated_float[0]))151 self.assertTrue(isnan(golden_message.repeated_double[0]))152 # The protocol buffer may serialize to any one of multiple different153 # representations of a NaN. Rather than verify a specific representation,154 # verify the serialized string can be converted into a correctly155 # behaving protocol buffer.156 serialized = golden_message.SerializeToString()157 message = unittest_pb2.TestAllTypes()158 message.ParseFromString(serialized)159 self.assertTrue(isnan(message.optional_float))160 self.assertTrue(isnan(message.optional_double))161 self.assertTrue(isnan(message.repeated_float[0]))162 self.assertTrue(isnan(message.repeated_double[0]))163 def testPositiveInfinityPacked(self):164 golden_data = ('\xA2\x06\x04\x00\x00\x80\x7F'165 '\xAA\x06\x08\x00\x00\x00\x00\x00\x00\xF0\x7F')166 golden_message = unittest_pb2.TestPackedTypes()167 golden_message.ParseFromString(golden_data)168 self.assertTrue(IsPosInf(golden_message.packed_float[0]))169 self.assertTrue(IsPosInf(golden_message.packed_double[0]))170 self.assertEqual(golden_data, golden_message.SerializeToString())171 def testNegativeInfinityPacked(self):172 golden_data = ('\xA2\x06\x04\x00\x00\x80\xFF'173 '\xAA\x06\x08\x00\x00\x00\x00\x00\x00\xF0\xFF')174 golden_message = unittest_pb2.TestPackedTypes()175 golden_message.ParseFromString(golden_data)176 self.assertTrue(IsNegInf(golden_message.packed_float[0]))177 self.assertTrue(IsNegInf(golden_message.packed_double[0]))178 self.assertEqual(golden_data, golden_message.SerializeToString())179 def testNotANumberPacked(self):180 golden_data = ('\xA2\x06\x04\x00\x00\xC0\x7F'181 '\xAA\x06\x08\x00\x00\x00\x00\x00\x00\xF8\x7F')182 golden_message = unittest_pb2.TestPackedTypes()183 golden_message.ParseFromString(golden_data)184 self.assertTrue(isnan(golden_message.packed_float[0]))185 self.assertTrue(isnan(golden_message.packed_double[0]))186 serialized = golden_message.SerializeToString()187 message = unittest_pb2.TestPackedTypes()188 message.ParseFromString(serialized)189 self.assertTrue(isnan(message.packed_float[0]))190 self.assertTrue(isnan(message.packed_double[0]))191 def testExtremeFloatValues(self):192 message = unittest_pb2.TestAllTypes()193 # Most positive exponent, no significand bits set.194 kMostPosExponentNoSigBits = math.pow(2, 127)195 message.optional_float = kMostPosExponentNoSigBits196 message.ParseFromString(message.SerializeToString())197 self.assertTrue(message.optional_float == kMostPosExponentNoSigBits)198 # Most positive exponent, one significand bit set.199 kMostPosExponentOneSigBit = 1.5 * math.pow(2, 127)200 message.optional_float = kMostPosExponentOneSigBit201 message.ParseFromString(message.SerializeToString())202 self.assertTrue(message.optional_float == kMostPosExponentOneSigBit)203 # Repeat last two cases with values of same magnitude, but negative.204 message.optional_float = -kMostPosExponentNoSigBits205 message.ParseFromString(message.SerializeToString())206 self.assertTrue(message.optional_float == -kMostPosExponentNoSigBits)207 message.optional_float = -kMostPosExponentOneSigBit208 message.ParseFromString(message.SerializeToString())209 self.assertTrue(message.optional_float == -kMostPosExponentOneSigBit)210 # Most negative exponent, no significand bits set.211 kMostNegExponentNoSigBits = math.pow(2, -127)212 message.optional_float = kMostNegExponentNoSigBits213 message.ParseFromString(message.SerializeToString())214 self.assertTrue(message.optional_float == kMostNegExponentNoSigBits)215 # Most negative exponent, one significand bit set.216 kMostNegExponentOneSigBit = 1.5 * math.pow(2, -127)217 message.optional_float = kMostNegExponentOneSigBit218 message.ParseFromString(message.SerializeToString())219 self.assertTrue(message.optional_float == kMostNegExponentOneSigBit)220 # Repeat last two cases with values of the same magnitude, but negative.221 message.optional_float = -kMostNegExponentNoSigBits222 message.ParseFromString(message.SerializeToString())223 self.assertTrue(message.optional_float == -kMostNegExponentNoSigBits)224 message.optional_float = -kMostNegExponentOneSigBit225 message.ParseFromString(message.SerializeToString())226 self.assertTrue(message.optional_float == -kMostNegExponentOneSigBit)227 def testExtremeDoubleValues(self):228 message = unittest_pb2.TestAllTypes()229 # Most positive exponent, no significand bits set.230 kMostPosExponentNoSigBits = math.pow(2, 1023)231 message.optional_double = kMostPosExponentNoSigBits232 message.ParseFromString(message.SerializeToString())233 self.assertTrue(message.optional_double == kMostPosExponentNoSigBits)234 # Most positive exponent, one significand bit set.235 kMostPosExponentOneSigBit = 1.5 * math.pow(2, 1023)236 message.optional_double = kMostPosExponentOneSigBit237 message.ParseFromString(message.SerializeToString())238 self.assertTrue(message.optional_double == kMostPosExponentOneSigBit)239 # Repeat last two cases with values of same magnitude, but negative.240 message.optional_double = -kMostPosExponentNoSigBits241 message.ParseFromString(message.SerializeToString())242 self.assertTrue(message.optional_double == -kMostPosExponentNoSigBits)243 message.optional_double = -kMostPosExponentOneSigBit244 message.ParseFromString(message.SerializeToString())245 self.assertTrue(message.optional_double == -kMostPosExponentOneSigBit)246 # Most negative exponent, no significand bits set.247 kMostNegExponentNoSigBits = math.pow(2, -1023)248 message.optional_double = kMostNegExponentNoSigBits249 message.ParseFromString(message.SerializeToString())250 self.assertTrue(message.optional_double == kMostNegExponentNoSigBits)251 # Most negative exponent, one significand bit set.252 kMostNegExponentOneSigBit = 1.5 * math.pow(2, -1023)253 message.optional_double = kMostNegExponentOneSigBit254 message.ParseFromString(message.SerializeToString())255 self.assertTrue(message.optional_double == kMostNegExponentOneSigBit)256 # Repeat last two cases with values of the same magnitude, but negative.257 message.optional_double = -kMostNegExponentNoSigBits258 message.ParseFromString(message.SerializeToString())259 self.assertTrue(message.optional_double == -kMostNegExponentNoSigBits)260 message.optional_double = -kMostNegExponentOneSigBit261 message.ParseFromString(message.SerializeToString())262 self.assertTrue(message.optional_double == -kMostNegExponentOneSigBit)263 def testSortingRepeatedScalarFieldsDefaultComparator(self):264 """Check some different types with the default comparator."""265 message = unittest_pb2.TestAllTypes()266 # TODO(mattp): would testing more scalar types strengthen test?267 message.repeated_int32.append(1)268 message.repeated_int32.append(3)269 message.repeated_int32.append(2)270 message.repeated_int32.sort()271 self.assertEqual(message.repeated_int32[0], 1)272 self.assertEqual(message.repeated_int32[1], 2)273 self.assertEqual(message.repeated_int32[2], 3)274 message.repeated_float.append(1.1)275 message.repeated_float.append(1.3)276 message.repeated_float.append(1.2)277 message.repeated_float.sort()278 self.assertAlmostEqual(message.repeated_float[0], 1.1)279 self.assertAlmostEqual(message.repeated_float[1], 1.2)280 self.assertAlmostEqual(message.repeated_float[2], 1.3)281 message.repeated_string.append('a')282 message.repeated_string.append('c')283 message.repeated_string.append('b')284 message.repeated_string.sort()285 self.assertEqual(message.repeated_string[0], 'a')286 self.assertEqual(message.repeated_string[1], 'b')287 self.assertEqual(message.repeated_string[2], 'c')288 message.repeated_bytes.append('a')289 message.repeated_bytes.append('c')290 message.repeated_bytes.append('b')291 message.repeated_bytes.sort()292 self.assertEqual(message.repeated_bytes[0], 'a')293 self.assertEqual(message.repeated_bytes[1], 'b')294 self.assertEqual(message.repeated_bytes[2], 'c')295 def testSortingRepeatedScalarFieldsCustomComparator(self):296 """Check some different types with custom comparator."""297 message = unittest_pb2.TestAllTypes()298 message.repeated_int32.append(-3)299 message.repeated_int32.append(-2)300 message.repeated_int32.append(-1)301 message.repeated_int32.sort(lambda x,y: cmp(abs(x), abs(y)))302 self.assertEqual(message.repeated_int32[0], -1)303 self.assertEqual(message.repeated_int32[1], -2)304 self.assertEqual(message.repeated_int32[2], -3)305 message.repeated_string.append('aaa')306 message.repeated_string.append('bb')307 message.repeated_string.append('c')308 message.repeated_string.sort(lambda x,y: cmp(len(x), len(y)))309 self.assertEqual(message.repeated_string[0], 'c')310 self.assertEqual(message.repeated_string[1], 'bb')311 self.assertEqual(message.repeated_string[2], 'aaa')312 def testSortingRepeatedCompositeFieldsCustomComparator(self):313 """Check passing a custom comparator to sort a repeated composite field."""314 message = unittest_pb2.TestAllTypes()315 message.repeated_nested_message.add().bb = 1316 message.repeated_nested_message.add().bb = 3317 message.repeated_nested_message.add().bb = 2318 message.repeated_nested_message.add().bb = 6319 message.repeated_nested_message.add().bb = 5320 message.repeated_nested_message.add().bb = 4321 message.repeated_nested_message.sort(lambda x,y: cmp(x.bb, y.bb))322 self.assertEqual(message.repeated_nested_message[0].bb, 1)323 self.assertEqual(message.repeated_nested_message[1].bb, 2)324 self.assertEqual(message.repeated_nested_message[2].bb, 3)325 self.assertEqual(message.repeated_nested_message[3].bb, 4)326 self.assertEqual(message.repeated_nested_message[4].bb, 5)327 self.assertEqual(message.repeated_nested_message[5].bb, 6)328 def testRepeatedCompositeFieldSortArguments(self):329 """Check sorting a repeated composite field using list.sort() arguments."""330 message = unittest_pb2.TestAllTypes()331 get_bb = operator.attrgetter('bb')332 cmp_bb = lambda a, b: cmp(a.bb, b.bb)333 message.repeated_nested_message.add().bb = 1334 message.repeated_nested_message.add().bb = 3335 message.repeated_nested_message.add().bb = 2336 message.repeated_nested_message.add().bb = 6337 message.repeated_nested_message.add().bb = 5338 message.repeated_nested_message.add().bb = 4339 message.repeated_nested_message.sort(key=get_bb)340 self.assertEqual([k.bb for k in message.repeated_nested_message],341 [1, 2, 3, 4, 5, 6])342 message.repeated_nested_message.sort(key=get_bb, reverse=True)343 self.assertEqual([k.bb for k in message.repeated_nested_message],344 [6, 5, 4, 3, 2, 1])345 message.repeated_nested_message.sort(sort_function=cmp_bb)346 self.assertEqual([k.bb for k in message.repeated_nested_message],347 [1, 2, 3, 4, 5, 6])348 message.repeated_nested_message.sort(cmp=cmp_bb, reverse=True)349 self.assertEqual([k.bb for k in message.repeated_nested_message],350 [6, 5, 4, 3, 2, 1])351 def testRepeatedScalarFieldSortArguments(self):352 """Check sorting a scalar field using list.sort() arguments."""353 message = unittest_pb2.TestAllTypes()354 abs_cmp = lambda a, b: cmp(abs(a), abs(b))355 message.repeated_int32.append(-3)356 message.repeated_int32.append(-2)357 message.repeated_int32.append(-1)358 message.repeated_int32.sort(key=abs)359 self.assertEqual(list(message.repeated_int32), [-1, -2, -3])360 message.repeated_int32.sort(key=abs, reverse=True)361 self.assertEqual(list(message.repeated_int32), [-3, -2, -1])362 message.repeated_int32.sort(sort_function=abs_cmp)363 self.assertEqual(list(message.repeated_int32), [-1, -2, -3])364 message.repeated_int32.sort(cmp=abs_cmp, reverse=True)365 self.assertEqual(list(message.repeated_int32), [-3, -2, -1])366 len_cmp = lambda a, b: cmp(len(a), len(b))367 message.repeated_string.append('aaa')368 message.repeated_string.append('bb')369 message.repeated_string.append('c')370 message.repeated_string.sort(key=len)371 self.assertEqual(list(message.repeated_string), ['c', 'bb', 'aaa'])372 message.repeated_string.sort(key=len, reverse=True)373 self.assertEqual(list(message.repeated_string), ['aaa', 'bb', 'c'])374 message.repeated_string.sort(sort_function=len_cmp)375 self.assertEqual(list(message.repeated_string), ['c', 'bb', 'aaa'])376 message.repeated_string.sort(cmp=len_cmp, reverse=True)377 self.assertEqual(list(message.repeated_string), ['aaa', 'bb', 'c'])378 def testParsingMerge(self):379 """Check the merge behavior when a required or optional field appears380 multiple times in the input."""381 messages = [382 unittest_pb2.TestAllTypes(),383 unittest_pb2.TestAllTypes(),384 unittest_pb2.TestAllTypes() ]385 messages[0].optional_int32 = 1386 messages[1].optional_int64 = 2387 messages[2].optional_int32 = 3388 messages[2].optional_string = 'hello'389 merged_message = unittest_pb2.TestAllTypes()390 merged_message.optional_int32 = 3391 merged_message.optional_int64 = 2392 merged_message.optional_string = 'hello'393 generator = unittest_pb2.TestParsingMerge.RepeatedFieldsGenerator()394 generator.field1.extend(messages)395 generator.field2.extend(messages)396 generator.field3.extend(messages)397 generator.ext1.extend(messages)398 generator.ext2.extend(messages)399 generator.group1.add().field1.MergeFrom(messages[0])400 generator.group1.add().field1.MergeFrom(messages[1])401 generator.group1.add().field1.MergeFrom(messages[2])402 generator.group2.add().field1.MergeFrom(messages[0])403 generator.group2.add().field1.MergeFrom(messages[1])404 generator.group2.add().field1.MergeFrom(messages[2])405 data = generator.SerializeToString()406 parsing_merge = unittest_pb2.TestParsingMerge()407 parsing_merge.ParseFromString(data)408 # Required and optional fields should be merged.409 self.assertEqual(parsing_merge.required_all_types, merged_message)410 self.assertEqual(parsing_merge.optional_all_types, merged_message)411 self.assertEqual(parsing_merge.optionalgroup.optional_group_all_types,412 merged_message)413 self.assertEqual(parsing_merge.Extensions[414 unittest_pb2.TestParsingMerge.optional_ext],415 merged_message)416 # Repeated fields should not be merged.417 self.assertEqual(len(parsing_merge.repeated_all_types), 3)418 self.assertEqual(len(parsing_merge.repeatedgroup), 3)419 self.assertEqual(len(parsing_merge.Extensions[420 unittest_pb2.TestParsingMerge.repeated_ext]), 3)421 def testSortEmptyRepeatedCompositeContainer(self):422 """Exercise a scenario that has led to segfaults in the past.423 """424 m = unittest_pb2.TestAllTypes()425 m.repeated_nested_message.sort()426if __name__ == '__main__':...
layer_receive.py
Source:layer_receive.py
1from .layer_base import AxolotlBaseLayer2from yowsup.layers.protocol_receipts.protocolentities import OutgoingReceiptProtocolEntity3from yowsup.layers.protocol_messages.proto.wa_pb2 import *4from yowsup.layers.axolotl.protocolentities import *5from yowsup.structs import ProtocolTreeNode6from yowsup.layers.axolotl.props import PROP_IDENTITY_AUTOTRUST7from axolotl.protocol.prekeywhispermessage import PreKeyWhisperMessage8from axolotl.protocol.whispermessage import WhisperMessage9from axolotl.sessioncipher import SessionCipher10from axolotl.groups.groupcipher import GroupCipher11from axolotl.invalidmessageexception import InvalidMessageException12from axolotl.duplicatemessagexception import DuplicateMessageException13from axolotl.invalidkeyidexception import InvalidKeyIdException14from axolotl.nosessionexception import NoSessionException15from axolotl.untrustedidentityexception import UntrustedIdentityException16from axolotl.axolotladdress import AxolotlAddress17from axolotl.groups.senderkeyname import SenderKeyName18from axolotl.groups.groupsessionbuilder import GroupSessionBuilder19from axolotl.protocol.senderkeydistributionmessage import SenderKeyDistributionMessage20import logging21import copy22logger = logging.getLogger(__name__)23class AxolotlReceivelayer(AxolotlBaseLayer):24 def __init__(self):25 super(AxolotlReceivelayer, self).__init__()26 self.v2Jids = [] #people we're going to send v2 enc messages27 self.sessionCiphers = {}28 self.groupCiphers = {}29 self.pendingIncomingMessages = {} #(jid, participantJid?) => message30 def receive(self, protocolTreeNode):31 """32 :type protocolTreeNode: ProtocolTreeNode33 """34 if not self.processIqRegistry(protocolTreeNode):35 if protocolTreeNode.tag == "message":36 self.onMessage(protocolTreeNode)37 elif not protocolTreeNode.tag == "receipt":38 #receipts will be handled by send layer39 self.toUpper(protocolTreeNode)40 # elif protocolTreeNode.tag == "iq":41 # if protocolTreeNode.getChild("encr_media"):42 # protocolTreeNode.addChild("media", {43 # "url": protocolTreeNode["url"],44 # "ip": protocolTreeNode["ip"],45 # })46 # self.toUpper(protocolTreeNode)47 # return48 ######49 def onEncrMediaResult(self, resultNode):50 pass51 def processPendingIncomingMessages(self, jid, participantJid = None):52 conversationIdentifier = (jid, participantJid)53 if conversationIdentifier in self.pendingIncomingMessages:54 for messageNode in self.pendingIncomingMessages[conversationIdentifier]:55 self.onMessage(messageNode)56 del self.pendingIncomingMessages[conversationIdentifier]57 ##### handling received data #####58 def onMessage(self, protocolTreeNode):59 encNode = protocolTreeNode.getChild("enc")60 if encNode:61 self.handleEncMessage(protocolTreeNode)62 else:63 self.toUpper(protocolTreeNode)64 def handleEncMessage(self, node):65 encMessageProtocolEntity = EncryptedMessageProtocolEntity.fromProtocolTreeNode(node)66 isGroup = node["participant"] is not None67 senderJid = node["participant"] if isGroup else node["from"]68 if node.getChild("enc")["v"] == "2" and node["from"] not in self.v2Jids:69 self.v2Jids.append(node["from"])70 try:71 if encMessageProtocolEntity.getEnc(EncProtocolEntity.TYPE_PKMSG):72 self.handlePreKeyWhisperMessage(node)73 elif encMessageProtocolEntity.getEnc(EncProtocolEntity.TYPE_MSG):74 self.handleWhisperMessage(node)75 if encMessageProtocolEntity.getEnc(EncProtocolEntity.TYPE_SKMSG):76 self.handleSenderKeyMessage(node)77 except (InvalidMessageException, InvalidKeyIdException) as e:78 logger.warning("InvalidMessage or KeyId for %s, going to send a retry", encMessageProtocolEntity.getAuthor(False))79 retry = RetryOutgoingReceiptProtocolEntity.fromMessageNode(node, self.store.getLocalRegistrationId())80 self.toLower(retry.toProtocolTreeNode())81 except NoSessionException as e:82 logger.warning("No session for %s, getting their keys now", encMessageProtocolEntity.getAuthor(False))83 conversationIdentifier = (node["from"], node["participant"])84 if conversationIdentifier not in self.pendingIncomingMessages:85 self.pendingIncomingMessages[conversationIdentifier] = []86 self.pendingIncomingMessages[conversationIdentifier].append(node)87 successFn = lambda successJids, b: self.processPendingIncomingMessages(*conversationIdentifier) if len(successJids) else None88 self.getKeysFor([senderJid], successFn)89 except DuplicateMessageException as e:90 logger.warning("Received a message that we've previously decrypted, goint to send the delivery receipt myself")91 self.toLower(OutgoingReceiptProtocolEntity(node["id"], node["from"], participant=node["participant"]).toProtocolTreeNode())92 except UntrustedIdentityException as e:93 if self.getProp(PROP_IDENTITY_AUTOTRUST, False):94 logger.warning("Autotrusting identity for %s", e.getName())95 self.store.saveIdentity(e.getName(), e.getIdentityKey())96 return self.handleEncMessage(node)97 else:98 logger.error("Ignoring message with untrusted identity")99 def handlePreKeyWhisperMessage(self, node):100 pkMessageProtocolEntity = EncryptedMessageProtocolEntity.fromProtocolTreeNode(node)101 enc = pkMessageProtocolEntity.getEnc(EncProtocolEntity.TYPE_PKMSG)102 preKeyWhisperMessage = PreKeyWhisperMessage(serialized=enc.getData())103 sessionCipher = self.getSessionCipher(pkMessageProtocolEntity.getAuthor(False))104 plaintext = sessionCipher.decryptPkmsg(preKeyWhisperMessage)105 if enc.getVersion() == 2:106 paddingByte = plaintext[-1] if type(plaintext[-1]) is int else ord(plaintext[-1])107 padding = paddingByte & 0xFF108 self.parseAndHandleMessageProto(pkMessageProtocolEntity, plaintext[:-padding])109 else:110 self.handleConversationMessage(node, plaintext)111 def handleWhisperMessage(self, node):112 encMessageProtocolEntity = EncryptedMessageProtocolEntity.fromProtocolTreeNode(node)113 enc = encMessageProtocolEntity.getEnc(EncProtocolEntity.TYPE_MSG)114 whisperMessage = WhisperMessage(serialized=enc.getData())115 sessionCipher = self.getSessionCipher(encMessageProtocolEntity.getAuthor(False))116 plaintext = sessionCipher.decryptMsg(whisperMessage)117 if enc.getVersion() == 2:118 paddingByte = plaintext[-1] if type(plaintext[-1]) is int else ord(plaintext[-1])119 padding = paddingByte & 0xFF120 self.parseAndHandleMessageProto(encMessageProtocolEntity, plaintext[:-padding])121 else:122 self.handleConversationMessage(encMessageProtocolEntity.toProtocolTreeNode(), plaintext)123 def handleSenderKeyMessage(self, node):124 encMessageProtocolEntity = EncryptedMessageProtocolEntity.fromProtocolTreeNode(node)125 enc = encMessageProtocolEntity.getEnc(EncProtocolEntity.TYPE_SKMSG)126 senderKeyName = SenderKeyName(encMessageProtocolEntity.getFrom(True), AxolotlAddress(encMessageProtocolEntity.getParticipant(False), 0))127 groupCipher = GroupCipher(self.store, senderKeyName)128 try:129 plaintext = groupCipher.decrypt(enc.getData())130 padding = ord(plaintext[-1]) & 0xFF131 plaintext = plaintext[:-padding]132 plaintext = plaintext.encode() if sys.version_info >= (3, 0) else plaintext133 self.parseAndHandleMessageProto(encMessageProtocolEntity, plaintext)134 except NoSessionException as e:135 logger.warning("No session for %s, going to send a retry", encMessageProtocolEntity.getAuthor(False))136 retry = RetryOutgoingReceiptProtocolEntity.fromMessageNode(node, self.store.getLocalRegistrationId())137 self.toLower(retry.toProtocolTreeNode())138 def parseAndHandleMessageProto(self, encMessageProtocolEntity, serializedData):139 node = encMessageProtocolEntity.toProtocolTreeNode()140 m = Message()141 handled = False142 try:143 m.ParseFromString(serializedData)144 except:145 print("DUMP:")146 print(serializedData)147 print([s for s in serializedData])148 print([ord(s) for s in serializedData])149 raise150 if not m or not serializedData:151 raise ValueError("Empty message")152 if m.HasField("sender_key_distribution_message"):153 handled = True154 axolotlAddress = AxolotlAddress(encMessageProtocolEntity.getParticipant(False), 0)155 self.handleSenderKeyDistributionMessage(m.sender_key_distribution_message, axolotlAddress)156 if m.HasField("conversation"):157 handled = True158 self.handleConversationMessage(node, m.conversation)159 elif m.HasField("contact_message"):160 handled = True161 self.handleContactMessage(node, m.contact_message)162 elif m.HasField("url_message"):163 handled = True164 self.handleUrlMessage(node, m.url_message)165 elif m.HasField("location_message"):166 handled = True167 self.handleLocationMessage(node, m.location_message)168 elif m.HasField("image_message"):169 handled = True170 self.handleImageMessage(node, m.image_message)171 if not handled:172 print(m)173 raise ValueError("Unhandled")174 def handleSenderKeyDistributionMessage(self, senderKeyDistributionMessage, axolotlAddress):175 groupId = senderKeyDistributionMessage.groupId176 axolotlSenderKeyDistributionMessage = SenderKeyDistributionMessage(serialized=senderKeyDistributionMessage.axolotl_sender_key_distribution_message)177 groupSessionBuilder = GroupSessionBuilder(self.store)178 senderKeyName = SenderKeyName(groupId, axolotlAddress)179 groupSessionBuilder.process(senderKeyName, axolotlSenderKeyDistributionMessage)180 def handleConversationMessage(self, originalEncNode, text):181 messageNode = copy.deepcopy(originalEncNode)182 messageNode.children = []183 messageNode.addChild(ProtocolTreeNode("body", data = text))184 self.toUpper(messageNode)185 def handleImageMessage(self, originalEncNode, imageMessage):186 messageNode = copy.deepcopy(originalEncNode)187 messageNode["type"] = "media"188 mediaNode = ProtocolTreeNode("media", {189 "type": "image",190 "filehash": imageMessage.file_sha256,191 "size": str(imageMessage.file_length),192 "url": imageMessage.url,193 "mimetype": imageMessage.mime_type,194 "width": imageMessage.width,195 "height": imageMessage.height,196 "caption": imageMessage.caption,197 "encoding": "raw",198 "file": "enc",199 "ip": "0"200 }, data = imageMessage.jpeg_thumbnail)201 messageNode.addChild(mediaNode)202 self.toUpper(messageNode)203 def handleUrlMessage(self, originalEncNode, urlMessage):204 #convert to ??205 pass206 def handleDocumentMessage(self, originalEncNode, documentMessage):207 #convert to ??208 pass209 def handleLocationMessage(self, originalEncNode, locationMessage):210 messageNode = copy.deepcopy(originalEncNode)211 messageNode["type"] = "media"212 mediaNode = ProtocolTreeNode("media", {213 "latitude": locationMessage.degrees_latitude,214 "longitude": locationMessage.degress_longitude,215 "name": "%s %s" % (locationMessage.name, locationMessage.address),216 "url": locationMessage.url,217 "encoding": "raw",218 "type": "location"219 }, data=locationMessage.jpeg_thumbnail)220 messageNode.addChild(mediaNode)221 self.toUpper(messageNode)222 def handleContactMessage(self, originalEncNode, contactMessage):223 messageNode = copy.deepcopy(originalEncNode)224 messageNode["type"] = "media"225 mediaNode = ProtocolTreeNode("media", {226 "type": "vcard"227 }, [228 ProtocolTreeNode("vcard", {"name": contactMessage.display_name}, data = contactMessage.vcard)229 ] )230 messageNode.addChild(mediaNode)231 self.toUpper(messageNode)232 def getSessionCipher(self, recipientId):233 if recipientId in self.sessionCiphers:234 sessionCipher = self.sessionCiphers[recipientId]235 else:236 sessionCipher = SessionCipher(self.store, self.store, self.store, self.store, recipientId, 1)237 self.sessionCiphers[recipientId] = sessionCipher238 return sessionCipher239 def getGroupCipher(self, groupId, senderId):240 senderKeyName = SenderKeyName(groupId, AxolotlAddress(senderId, 1))241 if senderKeyName in self.groupCiphers:242 groupCipher = self.groupCiphers[senderKeyName]243 else:244 groupCipher = GroupCipher(self.store, senderKeyName)245 self.groupCiphers[senderKeyName] = groupCipher...
messaging.py
Source:messaging.py
1# Copyright 2014 The Chromium Authors. All rights reserved.2# Use of this source code is governed by a BSD-style license that can be3# found in the LICENSE file.4"""Utility classes to handle sending and receiving messages."""5import struct6import sys7import weakref8import mojo_bindings.serialization as serialization9# pylint: disable=E0611,F040110import mojo_system as system11# The flag values for a message header.12NO_FLAG = 013MESSAGE_EXPECTS_RESPONSE_FLAG = 1 << 014MESSAGE_IS_RESPONSE_FLAG = 1 << 115class MessagingException(Exception):16 def __init__(self, *args, **kwargs):17 Exception.__init__(self, *args, **kwargs)18 self.__traceback__ = sys.exc_info()[2]19class MessageHeader(object):20 """The header of a mojo message."""21 _SIMPLE_MESSAGE_VERSION = 022 _SIMPLE_MESSAGE_STRUCT = struct.Struct("<IIII")23 _REQUEST_ID_STRUCT = struct.Struct("<Q")24 _REQUEST_ID_OFFSET = _SIMPLE_MESSAGE_STRUCT.size25 _MESSAGE_WITH_REQUEST_ID_VERSION = 126 _MESSAGE_WITH_REQUEST_ID_SIZE = (27 _SIMPLE_MESSAGE_STRUCT.size + _REQUEST_ID_STRUCT.size)28 def __init__(self, message_type, flags, request_id=0, data=None):29 self._message_type = message_type30 self._flags = flags31 self._request_id = request_id32 self._data = data33 @classmethod34 def Deserialize(cls, data):35 buf = buffer(data)36 if len(data) < cls._SIMPLE_MESSAGE_STRUCT.size:37 raise serialization.DeserializationException('Header is too short.')38 (size, version, message_type, flags) = (39 cls._SIMPLE_MESSAGE_STRUCT.unpack_from(buf))40 if (version < cls._SIMPLE_MESSAGE_VERSION):41 raise serialization.DeserializationException('Incorrect version.')42 request_id = 043 if _HasRequestId(flags):44 if version < cls._MESSAGE_WITH_REQUEST_ID_VERSION:45 raise serialization.DeserializationException('Incorrect version.')46 if (size < cls._MESSAGE_WITH_REQUEST_ID_SIZE or47 len(data) < cls._MESSAGE_WITH_REQUEST_ID_SIZE):48 raise serialization.DeserializationException('Header is too short.')49 (request_id, ) = cls._REQUEST_ID_STRUCT.unpack_from(50 buf, cls._REQUEST_ID_OFFSET)51 return MessageHeader(message_type, flags, request_id, data)52 @property53 def message_type(self):54 return self._message_type55 # pylint: disable=E020256 @property57 def request_id(self):58 assert self.has_request_id59 return self._request_id60 # pylint: disable=E020261 @request_id.setter62 def request_id(self, request_id):63 assert self.has_request_id64 self._request_id = request_id65 self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET,66 request_id)67 @property68 def has_request_id(self):69 return _HasRequestId(self._flags)70 @property71 def expects_response(self):72 return self._HasFlag(MESSAGE_EXPECTS_RESPONSE_FLAG)73 @property74 def is_response(self):75 return self._HasFlag(MESSAGE_IS_RESPONSE_FLAG)76 @property77 def size(self):78 if self.has_request_id:79 return self._MESSAGE_WITH_REQUEST_ID_SIZE80 return self._SIMPLE_MESSAGE_STRUCT.size81 def Serialize(self):82 if not self._data:83 self._data = bytearray(self.size)84 version = self._SIMPLE_MESSAGE_VERSION85 size = self._SIMPLE_MESSAGE_STRUCT.size86 if self.has_request_id:87 version = self._MESSAGE_WITH_REQUEST_ID_VERSION88 size = self._MESSAGE_WITH_REQUEST_ID_SIZE89 self._SIMPLE_MESSAGE_STRUCT.pack_into(self._data, 0, size, version,90 self._message_type, self._flags)91 if self.has_request_id:92 self._REQUEST_ID_STRUCT.pack_into(self._data, self._REQUEST_ID_OFFSET,93 self._request_id)94 return self._data95 def _HasFlag(self, flag):96 return self._flags & flag != 097class Message(object):98 """A message for a message pipe. This contains data and handles."""99 def __init__(self, data=None, handles=None, header=None):100 self.data = data101 self.handles = handles102 self._header = header103 self._payload = None104 @property105 def header(self):106 if self._header is None:107 self._header = MessageHeader.Deserialize(self.data)108 return self._header109 @property110 def payload(self):111 if self._payload is None:112 self._payload = Message(self.data[self.header.size:], self.handles)113 return self._payload114 def SetRequestId(self, request_id):115 header = self.header116 header.request_id = request_id117 (data, _) = header.Serialize()118 self.data[:header.Size] = data[:header.Size]119class MessageReceiver(object):120 """A class which implements this interface can receive Message objects."""121 def Accept(self, message):122 """123 Receive a Message. The MessageReceiver is allowed to mutate the message.124 Args:125 message: the received message.126 Returns:127 True if the message has been handled, False otherwise.128 """129 raise NotImplementedError()130class MessageReceiverWithResponder(MessageReceiver):131 """132 A MessageReceiver that can also handle the response message generated from the133 given message.134 """135 def AcceptWithResponder(self, message, responder):136 """137 A variant on Accept that registers a MessageReceiver (known as the138 responder) to handle the response message generated from the given message.139 The responder's Accept method may be called as part of the call to140 AcceptWithResponder, or some time after its return.141 Args:142 message: the received message.143 responder: the responder that will receive the response.144 Returns:145 True if the message has been handled, False otherwise.146 """147 raise NotImplementedError()148class ConnectionErrorHandler(object):149 """150 A ConnectionErrorHandler is notified of an error happening while using the151 bindings over message pipes.152 """153 def OnError(self, result):154 raise NotImplementedError()155class Connector(MessageReceiver):156 """157 A Connector owns a message pipe and will send any received messages to the158 registered MessageReceiver. It also acts as a MessageReceiver and will send159 any message through the handle.160 The method Start must be called before the Connector will start listening to161 incoming messages.162 """163 def __init__(self, handle):164 MessageReceiver.__init__(self)165 self._handle = handle166 self._cancellable = None167 self._incoming_message_receiver = None168 self._error_handler = None169 def __del__(self):170 if self._cancellable:171 self._cancellable()172 def SetIncomingMessageReceiver(self, message_receiver):173 """174 Set the MessageReceiver that will receive message from the owned message175 pipe.176 """177 self._incoming_message_receiver = message_receiver178 def SetErrorHandler(self, error_handler):179 """180 Set the ConnectionErrorHandler that will be notified of errors on the owned181 message pipe.182 """183 self._error_handler = error_handler184 def Start(self):185 assert not self._cancellable186 self._RegisterAsyncWaiterForRead()187 def Accept(self, message):188 result = self._handle.WriteMessage(message.data, message.handles)189 return result == system.RESULT_OK190 def Close(self):191 if self._cancellable:192 self._cancellable()193 self._cancellable = None194 self._handle.Close()195 def PassMessagePipe(self):196 if self._cancellable:197 self._cancellable()198 self._cancellable = None199 result = self._handle200 self._handle = system.Handle()201 return result202 def _OnAsyncWaiterResult(self, result):203 self._cancellable = None204 if result == system.RESULT_OK:205 self._ReadOutstandingMessages()206 else:207 self._OnError(result)208 def _OnError(self, result):209 assert not self._cancellable210 if self._error_handler:211 self._error_handler.OnError(result)212 self._handle.Close()213 def _RegisterAsyncWaiterForRead(self) :214 assert not self._cancellable215 self._cancellable = self._handle.AsyncWait(216 system.HANDLE_SIGNAL_READABLE,217 system.DEADLINE_INDEFINITE,218 _WeakCallback(self._OnAsyncWaiterResult))219 def _ReadOutstandingMessages(self):220 result = None221 dispatched = True222 while dispatched:223 result, dispatched = _ReadAndDispatchMessage(224 self._handle, self._incoming_message_receiver)225 if result == system.RESULT_SHOULD_WAIT:226 self._RegisterAsyncWaiterForRead()227 return228 self._OnError(result)229class Router(MessageReceiverWithResponder):230 """231 A Router will handle mojo message and forward those to a Connector. It deals232 with parsing of headers and adding of request ids in order to be able to match233 a response to a request.234 """235 def __init__(self, handle):236 MessageReceiverWithResponder.__init__(self)237 self._incoming_message_receiver = None238 self._next_request_id = 1239 self._responders = {}240 self._connector = Connector(handle)241 self._connector.SetIncomingMessageReceiver(242 ForwardingMessageReceiver(_WeakCallback(self._HandleIncomingMessage)))243 def Start(self):244 self._connector.Start()245 def SetIncomingMessageReceiver(self, message_receiver):246 """247 Set the MessageReceiver that will receive message from the owned message248 pipe.249 """250 self._incoming_message_receiver = message_receiver251 def SetErrorHandler(self, error_handler):252 """253 Set the ConnectionErrorHandler that will be notified of errors on the owned254 message pipe.255 """256 self._connector.SetErrorHandler(error_handler)257 def Accept(self, message):258 # A message without responder is directly forwarded to the connector.259 return self._connector.Accept(message)260 def AcceptWithResponder(self, message, responder):261 # The message must have a header.262 header = message.header263 assert header.expects_response264 request_id = self._NextRequestId()265 header.request_id = request_id266 if not self._connector.Accept(message):267 return False268 self._responders[request_id] = responder269 return True270 def Close(self):271 self._connector.Close()272 def PassMessagePipe(self):273 return self._connector.PassMessagePipe()274 def _HandleIncomingMessage(self, message):275 header = message.header276 if header.expects_response:277 if self._incoming_message_receiver:278 return self._incoming_message_receiver.AcceptWithResponder(279 message, self)280 # If we receive a request expecting a response when the client is not281 # listening, then we have no choice but to tear down the pipe.282 self.Close()283 return False284 if header.is_response:285 request_id = header.request_id286 responder = self._responders.pop(request_id, None)287 if responder is None:288 return False289 return responder.Accept(message)290 if self._incoming_message_receiver:291 return self._incoming_message_receiver.Accept(message)292 # Ok to drop the message293 return False294 def _NextRequestId(self):295 request_id = self._next_request_id296 while request_id == 0 or request_id in self._responders:297 request_id = (request_id + 1) % (1 << 64)298 self._next_request_id = (request_id + 1) % (1 << 64)299 return request_id300class ForwardingMessageReceiver(MessageReceiver):301 """A MessageReceiver that forward calls to |Accept| to a callable."""302 def __init__(self, callback):303 MessageReceiver.__init__(self)304 self._callback = callback305 def Accept(self, message):306 return self._callback(message)307def _WeakCallback(callback):308 func = callback.im_func309 self = callback.im_self310 if not self:311 return callback312 weak_self = weakref.ref(self)313 def Callback(*args, **kwargs):314 self = weak_self()315 if self:316 return func(self, *args, **kwargs)317 return Callback318def _ReadAndDispatchMessage(handle, message_receiver):319 dispatched = False320 (result, _, sizes) = handle.ReadMessage()321 if result == system.RESULT_OK and message_receiver:322 dispatched = message_receiver.Accept(Message(bytearray(), []))323 if result != system.RESULT_RESOURCE_EXHAUSTED:324 return (result, dispatched)325 (result, data, _) = handle.ReadMessage(bytearray(sizes[0]), sizes[1])326 if result == system.RESULT_OK and message_receiver:327 dispatched = message_receiver.Accept(Message(data[0], data[1]))328 return (result, dispatched)329def _HasRequestId(flags):...
message_set.py
Source:message_set.py
1#!/usr/bin/env python2#3# Copyright 2007 Google Inc.4#5# Licensed under the Apache License, Version 2.0 (the "License");6# you may not use this file except in compliance with the License.7# You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16#17"""This module contains the MessageSet class, which is a special kind of18protocol message which can contain other protocol messages without knowing19their types. See the class's doc string for more information."""20from google.net.proto import ProtocolBuffer21import logging22try:23 from google3.net.proto import _net_proto___parse__python24except ImportError:25 _net_proto___parse__python = None26TAG_BEGIN_ITEM_GROUP = 1127TAG_END_ITEM_GROUP = 1228TAG_TYPE_ID = 1629TAG_MESSAGE = 2630class Item:31 def __init__(self, message, message_class=None):32 self.message = message33 self.message_class = message_class34 def SetToDefaultInstance(self, message_class):35 self.message = message_class()36 self.message_class = message_class37 def Parse(self, message_class):38 if self.message_class is not None:39 return 140 try:41 message_obj = message_class()42 message_obj.MergePartialFromString(self.message)43 self.message = message_obj44 self.message_class = message_class45 return 146 except ProtocolBuffer.ProtocolBufferDecodeError:47 logging.warn("Parse error in message inside MessageSet. Tried "48 "to parse as: " + message_class.__name__)49 return 050 def MergeFrom(self, other):51 if self.message_class is not None:52 if other.Parse(self.message_class):53 self.message.MergeFrom(other.message)54 elif other.message_class is not None:55 if not self.Parse(other.message_class):56 self.message = other.message_class()57 self.message_class = other.message_class58 self.message.MergeFrom(other.message)59 else:60 self.message += other.message61 def Copy(self):62 if self.message_class is None:63 return Item(self.message)64 else:65 new_message = self.message_class()66 new_message.CopyFrom(self.message)67 return Item(new_message, self.message_class)68 def Equals(self, other):69 if self.message_class is not None:70 if not other.Parse(self.message_class): return 071 return self.message.Equals(other.message)72 elif other.message_class is not None:73 if not self.Parse(other.message_class): return 074 return self.message.Equals(other.message)75 else:76 return self.message == other.message77 def IsInitialized(self, debug_strs=None):78 if self.message_class is None:79 return 180 else:81 return self.message.IsInitialized(debug_strs)82 def ByteSize(self, pb, type_id):83 message_length = 084 if self.message_class is None:85 message_length = len(self.message)86 else:87 message_length = self.message.ByteSize()88 return pb.lengthString(message_length) + pb.lengthVarInt64(type_id) + 289 def ByteSizePartial(self, pb, type_id):90 message_length = 091 if self.message_class is None:92 message_length = len(self.message)93 else:94 message_length = self.message.ByteSizePartial()95 return pb.lengthString(message_length) + pb.lengthVarInt64(type_id) + 296 def OutputUnchecked(self, out, type_id):97 out.putVarInt32(TAG_TYPE_ID)98 out.putVarUint64(type_id)99 out.putVarInt32(TAG_MESSAGE)100 if self.message_class is None:101 out.putPrefixedString(self.message)102 else:103 out.putVarInt32(self.message.ByteSize())104 self.message.OutputUnchecked(out)105 def OutputPartial(self, out, type_id):106 out.putVarInt32(TAG_TYPE_ID)107 out.putVarUint64(type_id)108 out.putVarInt32(TAG_MESSAGE)109 if self.message_class is None:110 out.putPrefixedString(self.message)111 else:112 out.putVarInt32(self.message.ByteSizePartial())113 self.message.OutputPartial(out)114 def Decode(decoder):115 type_id = 0116 message = None117 while 1:118 tag = decoder.getVarInt32()119 if tag == TAG_END_ITEM_GROUP:120 break121 if tag == TAG_TYPE_ID:122 type_id = decoder.getVarUint64()123 continue124 if tag == TAG_MESSAGE:125 message = decoder.getPrefixedString()126 continue127 if tag == 0: raise ProtocolBuffer.ProtocolBufferDecodeError128 decoder.skipData(tag)129 if type_id == 0 or message is None:130 raise ProtocolBuffer.ProtocolBufferDecodeError131 return (type_id, message)132 Decode = staticmethod(Decode)133class MessageSet(ProtocolBuffer.ProtocolMessage):134 def __init__(self, contents=None):135 self.items = dict()136 if contents is not None: self.MergeFromString(contents)137 def get(self, message_class):138 if message_class.MESSAGE_TYPE_ID not in self.items:139 return message_class()140 item = self.items[message_class.MESSAGE_TYPE_ID]141 if item.Parse(message_class):142 return item.message143 else:144 return message_class()145 def mutable(self, message_class):146 if message_class.MESSAGE_TYPE_ID not in self.items:147 message = message_class()148 self.items[message_class.MESSAGE_TYPE_ID] = Item(message, message_class)149 return message150 item = self.items[message_class.MESSAGE_TYPE_ID]151 if not item.Parse(message_class):152 item.SetToDefaultInstance(message_class)153 return item.message154 def has(self, message_class):155 if message_class.MESSAGE_TYPE_ID not in self.items:156 return 0157 item = self.items[message_class.MESSAGE_TYPE_ID]158 return item.Parse(message_class)159 def has_unparsed(self, message_class):160 return message_class.MESSAGE_TYPE_ID in self.items161 def GetTypeIds(self):162 return self.items.keys()163 def NumMessages(self):164 return len(self.items)165 def remove(self, message_class):166 if message_class.MESSAGE_TYPE_ID in self.items:167 del self.items[message_class.MESSAGE_TYPE_ID]168 def __getitem__(self, message_class):169 if message_class.MESSAGE_TYPE_ID not in self.items:170 raise KeyError(message_class)171 item = self.items[message_class.MESSAGE_TYPE_ID]172 if item.Parse(message_class):173 return item.message174 else:175 raise KeyError(message_class)176 def __setitem__(self, message_class, message):177 self.items[message_class.MESSAGE_TYPE_ID] = Item(message, message_class)178 def __contains__(self, message_class):179 return self.has(message_class)180 def __delitem__(self, message_class):181 self.remove(message_class)182 def __len__(self):183 return len(self.items)184 def MergeFrom(self, other):185 assert other is not self186 for (type_id, item) in other.items.items():187 if type_id in self.items:188 self.items[type_id].MergeFrom(item)189 else:190 self.items[type_id] = item.Copy()191 def Equals(self, other):192 if other is self: return 1193 if len(self.items) != len(other.items): return 0194 for (type_id, item) in other.items.items():195 if type_id not in self.items: return 0196 if not self.items[type_id].Equals(item): return 0197 return 1198 def __eq__(self, other):199 return ((other is not None)200 and (other.__class__ == self.__class__)201 and self.Equals(other))202 def __ne__(self, other):203 return not (self == other)204 def IsInitialized(self, debug_strs=None):205 initialized = 1206 for item in self.items.values():207 if not item.IsInitialized(debug_strs):208 initialized = 0209 return initialized210 def ByteSize(self):211 n = 2 * len(self.items)212 for (type_id, item) in self.items.items():213 n += item.ByteSize(self, type_id)214 return n215 def ByteSizePartial(self):216 n = 2 * len(self.items)217 for (type_id, item) in self.items.items():218 n += item.ByteSizePartial(self, type_id)219 return n220 def Clear(self):221 self.items = dict()222 def OutputUnchecked(self, out):223 for (type_id, item) in self.items.items():224 out.putVarInt32(TAG_BEGIN_ITEM_GROUP)225 item.OutputUnchecked(out, type_id)226 out.putVarInt32(TAG_END_ITEM_GROUP)227 def OutputPartial(self, out):228 for (type_id, item) in self.items.items():229 out.putVarInt32(TAG_BEGIN_ITEM_GROUP)230 item.OutputPartial(out, type_id)231 out.putVarInt32(TAG_END_ITEM_GROUP)232 def TryMerge(self, decoder):233 while decoder.avail() > 0:234 tag = decoder.getVarInt32()235 if tag == TAG_BEGIN_ITEM_GROUP:236 (type_id, message) = Item.Decode(decoder)237 if type_id in self.items:238 self.items[type_id].MergeFrom(Item(message))239 else:240 self.items[type_id] = Item(message)241 continue242 if (tag == 0): raise ProtocolBuffer.ProtocolBufferDecodeError243 decoder.skipData(tag)244 def _CToASCII(self, output_format):245 if _net_proto___parse__python is None:246 return ProtocolBuffer.ProtocolMessage._CToASCII(self, output_format)247 else:248 return _net_proto___parse__python.ToASCII(249 self, "MessageSetInternal", output_format)250 def ParseASCII(self, s):251 if _net_proto___parse__python is None:252 ProtocolBuffer.ProtocolMessage.ParseASCII(self, s)253 else:254 _net_proto___parse__python.ParseASCII(self, "MessageSetInternal", s)255 def ParseASCIIIgnoreUnknown(self, s):256 if _net_proto___parse__python is None:257 ProtocolBuffer.ProtocolMessage.ParseASCIIIgnoreUnknown(self, s)258 else:259 _net_proto___parse__python.ParseASCIIIgnoreUnknown(260 self, "MessageSetInternal", s)261 def __str__(self, prefix="", printElemNumber=0):262 text = ""263 for (type_id, item) in self.items.items():264 if item.message_class is None:265 text += "%s[%d] <\n" % (prefix, type_id)266 text += "%s (%d bytes)\n" % (prefix, len(item.message))267 text += "%s>\n" % prefix268 else:269 text += "%s[%s] <\n" % (prefix, item.message_class.__name__)270 text += item.message.__str__(prefix + " ", printElemNumber)271 text += "%s>\n" % prefix272 return text273 _PROTO_DESCRIPTOR_NAME = 'MessageSet'...
messages.py
Source:messages.py
1"""2Provide the class Message and its subclasses.3"""4class Message(object):5 message = ''6 message_args = ()7 def __init__(self, filename, loc):8 self.filename = filename9 self.lineno = loc.lineno10 self.col = getattr(loc, 'col_offset', 0)11 def __str__(self):12 return '%s:%s: %s' % (self.filename, self.lineno,13 self.message % self.message_args)14class UnusedImport(Message):15 message = '%r imported but unused'16 def __init__(self, filename, loc, name):17 Message.__init__(self, filename, loc)18 self.message_args = (name,)19class RedefinedWhileUnused(Message):20 message = 'redefinition of unused %r from line %r'21 def __init__(self, filename, loc, name, orig_loc):22 Message.__init__(self, filename, loc)23 self.message_args = (name, orig_loc.lineno)24class RedefinedInListComp(Message):25 message = 'list comprehension redefines %r from line %r'26 def __init__(self, filename, loc, name, orig_loc):27 Message.__init__(self, filename, loc)28 self.message_args = (name, orig_loc.lineno)29class ImportShadowedByLoopVar(Message):30 message = 'import %r from line %r shadowed by loop variable'31 def __init__(self, filename, loc, name, orig_loc):32 Message.__init__(self, filename, loc)33 self.message_args = (name, orig_loc.lineno)34class ImportStarUsed(Message):35 message = "'from %s import *' used; unable to detect undefined names"36 def __init__(self, filename, loc, modname):37 Message.__init__(self, filename, loc)38 self.message_args = (modname,)39class UndefinedName(Message):40 message = 'undefined name %r'41 def __init__(self, filename, loc, name):42 Message.__init__(self, filename, loc)43 self.message_args = (name,)44class DoctestSyntaxError(Message):45 message = 'syntax error in doctest'46 def __init__(self, filename, loc, position=None):47 Message.__init__(self, filename, loc)48 if position:49 (self.lineno, self.col) = position50 self.message_args = ()51class UndefinedExport(Message):52 message = 'undefined name %r in __all__'53 def __init__(self, filename, loc, name):54 Message.__init__(self, filename, loc)55 self.message_args = (name,)56class UndefinedLocal(Message):57 message = ('local variable %r (defined in enclosing scope on line %r) '58 'referenced before assignment')59 def __init__(self, filename, loc, name, orig_loc):60 Message.__init__(self, filename, loc)61 self.message_args = (name, orig_loc.lineno)62class DuplicateArgument(Message):63 message = 'duplicate argument %r in function definition'64 def __init__(self, filename, loc, name):65 Message.__init__(self, filename, loc)66 self.message_args = (name,)67class Redefined(Message):68 message = 'redefinition of %r from line %r'69 def __init__(self, filename, loc, name, orig_loc):70 Message.__init__(self, filename, loc)71 self.message_args = (name, orig_loc.lineno)72class LateFutureImport(Message):73 message = 'future import(s) %r after other statements'74 def __init__(self, filename, loc, names):75 Message.__init__(self, filename, loc)76 self.message_args = (names,)77class UnusedVariable(Message):78 """79 Indicates that a variable has been explicity assigned to but not actually80 used.81 """82 message = 'local variable %r is assigned to but never used'83 def __init__(self, filename, loc, names):84 Message.__init__(self, filename, loc)85 self.message_args = (names,)86class ReturnWithArgsInsideGenerator(Message):87 """88 Indicates a return statement with arguments inside a generator.89 """...
Using AI Code Generation
1const mb = require('mountebank');2 {3 {4 {5 equals: {6 }7 }8 {9 is: {10 }11 }12 }13 }14];15mb.create({ imposters }).then(() => {16 console.log('mountebank started');17 mb.post('/imposters', {18 {19 {20 equals: {21 }22 }23 {24 is: {25 }26 }27 }28 }).then(() => {29 console.log('stub created');30 });31});32const mb = require('mountebank');33 {34 {35 {36 equals: {37 }38 }39 {40 is: {41 }42 }43 }44 }45];46mb.create({ imposters }).then(() => {47 console.log('mountebank started
Using AI Code Generation
1var mb = require('mountebank'),2 assert = require('assert');3mb.create({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log', ipWhitelist: ['*'] }, function (error, imposter) {4 assert.ifError(error);5 console.log('Imposter created on port %s', imposter.port);6 imposter.post('/imposters', {7 {8 {9 is: {10 }11 }12 }13 }, function (error, response) {14 assert.ifError(error);15 console.log('Stub created with response %s', response.body);16 imposter.del('/imposters/2525', function (error, response) {17 assert.ifError(error);18 console.log('Imposter deleted');19 });20 });21});22var io = require('socket.io-client');23socket.emit('message', 'Hello World!');
Using AI Code Generation
1const mb = require('mountebank');2const port = 2525;3const imposter = {4 {5 {6 is: {7 headers: {8 },9 body: {10 }11 }12 }13 }14};15mb.create(url, imposter).then(function () {16 console.log('Imposter started');17});18{"message":"Hello World"}19const mb = require('mountebank');20const port = 2525;21const imposter = {22 {23 {24 is: {25 headers: {26 },27 body: {28 }29 }30 }31 }32};33mb.create(url, imposter).then(function () {34 console.log('Imposter started');35});36{"message":"Hello World"}
Using AI Code Generation
1const imposter = require('mountebank').create({2 {3 {4 equals: {5 }6 }7 {8 is: {9 body: '{"status": "ok"}'10 }11 }12 }13});14const imposter = require('mountebank').create({15 {16 {17 equals: {18 }19 }20 {21 is: {22 body: '{"status": "ok"}'23 }24 }25 }26});27const imposter = require('mountebank').create({28 {29 {30 equals: {31 }32 }33 {34 is: {35 body: '{"status": "ok"}'36 }37 }38 }39});40const imposter = require('mountebank').create({41 {42 {43 equals: {44 }45 }46 {47 is: {48 body: '{"status": "ok"}'49 }50 }51 }52});53const imposter = require('mountebank').create({
Using AI Code Generation
1var imposter = require('mountebank').create({2 stubs: [{3 predicates: [{4 equals: {5 }6 }],7 responses: [{8 is: {9 }10 }]11 }]12});13imposter.start(function (error, imposter) {14 if (error) {15 console.log(error);16 } else {17 console.log('Imposter started');18 }19});20var client = require('mountebank').createClient({ port: 2525 });21client.get('/', function (error, response) {22 if (error) {23 console.log(error);24 } else {25 console.log(response.body);26 }27});28print(max_number)
Using AI Code Generation
1var mb = require('mountebank');2var mbHelper = mb.create({ port: 2525, pidfile: 'mb.pid', logfile: 'mb.log' });3mbHelper.start().then(function () {4 var message = {5 body: {6 {7 {8 is: {9 }10 }11 }12 }13 };14 return mbHelper.message(message);15}).then(function () {16 return mbHelper.get('/imposters');17}).then(function (response) {18 console.log(response.body);19}).finally(function () {20 mbHelper.stop();21});
Using AI Code Generation
1var message = require('message');2var logger = require('logger');3var http = require('http');4var https = require('https');5var util = require('util');6var file = require('file');7var response = require('response');8var jsonpath = require('jsonpath');9var xml2js = require('xml2js');10var xpath = require('xpath');11var crypto = require('crypto');12var promise = require('promise');13var fs = require('fs');14var path = require('path');15var events = require('events');16var querystring = require('querystring');17var url = require('url');18var mime = require('mime');19var process = require('process');20var console = require('console');21var assert = require('assert');22var stream = require('stream');23var timers = require('timers');24var buffer = require('buffer');25var string_decoder = require('string_decoder');26var os = require('os');27var constants = require('constants');
Using AI Code Generation
1const mbHelper = require('./mountebank-helper');2mb.post('/imposters', {3 stubs: [{4 {5 is: {6 headers: {7 },8 body: JSON.stringify({ status: 'success' })9 }10 }11 }]12}).then((response) => {13 console.log('response', response);14 return mb.post('/imposters/4545/messages', {15 });16}).then((response) => {17 console.log('response', response);18});19const request = require('request');20class MountebankHelper {21 constructor(url) {22 this.url = url;23 }24 post(path, body) {25 return new Promise((resolve, reject) => {26 request.post({27 url: `${this.url}${path}`,28 }, (err, response, body) => {29 if (err) {30 reject(err);31 }32 else {33 resolve(body);34 }35 });36 });37 }38}39module.exports = MountebankHelper;
Using AI Code Generation
1var message = require('mountebank').message;2var mbhelper = require('mbhelper');3var message = mbhelper.message;4var message = mbhelper.message;5var message = mbhelper.message;6var message = mbhelper.message;7var message = mbhelper.message;8var message = mbhelper.message;9var message = mbhelper.message;10var message = mbhelper.message;11var message = mbhelper.message;12var message = mbhelper.message;13var message = mbhelper.message;14var message = mbhelper.message;15var message = mbhelper.message;16var message = mbhelper.message;17var mbhelper = require('mbhelper')
Using AI Code Generation
1const mb = require('mountebank');2const port = 2525;3const protocol = 'http';4const imposter = { port, protocol };5mb.create(imposter).then(function () {6 console.log('Imposter created');7 return mb.get('/imposters', 2525);8}).then(function (response) {9 console.log('Imposter retrieved');10 console.log(response.body);11 return mb.del('/imposters', 2525);12}).then(function () {13 console.log('Imposter deleted');14 return mb.stop(2525);15}).then(function () {16 console.log('Imposter stopped');17});
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!