Best Python code snippet using hypothesis
socket_cache_test.py
Source:socket_cache_test.py
1# Copyright (C) 2011 Internet Systems Consortium, Inc. ("ISC")2#3# Permission to use, copy, modify, and distribute this software for any4# purpose with or without fee is hereby granted, provided that the above5# copyright notice and this permission notice appear in all copies.6#7# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM8# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL9# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL10# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,11# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING12# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,13# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION14# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.15import unittest16import isc.log17import isc.bind10.socket_cache18import isc.bind10.sockcreator19from isc.net.addr import IPAddr20import os21class Test(unittest.TestCase):22 """23 Base for the tests here. It replaces the os.close method.24 """25 def setUp(self):26 self._closes = []27 isc.bind10.socket_cache.os.close = self.__close28 def tearDown(self):29 # This is not very clean solution. But when the test stops30 # to exist, the method must not be used to destroy the31 # object any more. And we can't restore the os.close here32 # as we never work with real sockets here.33 isc.bind10.socket_cache.os.close = lambda fd: None34 def __close(self, fd):35 """36 Just log a close was called.37 """38 self._closes.append(fd)39class SocketTest(Test):40 """41 Test for the Socket class.42 """43 def setUp(self):44 """45 Creates the socket to be tested.46 It also creates other useful test variables.47 """48 Test.setUp(self)49 self.__address = IPAddr("192.0.2.1")50 self.__socket = isc.bind10.socket_cache.Socket('UDP', self.__address,51 1024, 42)52 def test_init(self):53 """54 Checks the intrnals of the cache just after the creation.55 """56 self.assertEqual('UDP', self.__socket.protocol)57 self.assertEqual(self.__address, self.__socket.address)58 self.assertEqual(1024, self.__socket.port)59 self.assertEqual(42, self.__socket.fileno)60 self.assertEqual({}, self.__socket.active_tokens)61 self.assertEqual({}, self.__socket.shares)62 self.assertEqual(set(), self.__socket.waiting_tokens)63 def test_del(self):64 """65 Check it closes the socket when removed.66 """67 # This should make the refcount 0 and call the descructor68 # right away69 self.__socket = None70 self.assertEqual([42], self._closes)71 def test_share_modes(self):72 """73 Test the share mode compatibility check function.74 """75 modes = ['NO', 'SAMEAPP', 'ANY']76 # If there are no shares, it is compatible with everything.77 for mode in modes:78 self.assertTrue(self.__socket.share_compatible(mode, 'anything'))79 # There's an NO already, so it is incompatible with everything.80 self.__socket.shares = {'token': ('NO', 'anything')}81 for mode in modes:82 self.assertFalse(self.__socket.share_compatible(mode, 'anything'))83 # If there's SAMEAPP, it is compatible with ANY and SAMEAPP with the84 # same name.85 self.__socket.shares = {'token': ('SAMEAPP', 'app')}86 self.assertFalse(self.__socket.share_compatible('NO', 'app'))87 self.assertFalse(self.__socket.share_compatible('SAMEAPP',88 'something'))89 self.assertTrue(self.__socket.share_compatible('SAMEAPP', 'app'))90 self.assertTrue(self.__socket.share_compatible('ANY', 'app'))91 self.assertFalse(self.__socket.share_compatible('ANY', 'something'))92 # If there's ANY, then ANY and SAMEAPP with the same name is compatible93 self.__socket.shares = {'token': ('ANY', 'app')}94 self.assertFalse(self.__socket.share_compatible('NO', 'app'))95 self.assertFalse(self.__socket.share_compatible('SAMEAPP',96 'something'))97 self.assertTrue(self.__socket.share_compatible('SAMEAPP', 'app'))98 self.assertTrue(self.__socket.share_compatible('ANY', 'something'))99 # In case there are multiple already inside100 self.__socket.shares = {101 'token': ('ANY', 'app'),102 'another': ('SAMEAPP', 'app')103 }104 self.assertFalse(self.__socket.share_compatible('NO', 'app'))105 self.assertFalse(self.__socket.share_compatible('SAMEAPP',106 'something'))107 self.assertTrue(self.__socket.share_compatible('SAMEAPP', 'app'))108 self.assertFalse(self.__socket.share_compatible('ANY', 'something'))109 self.assertTrue(self.__socket.share_compatible('ANY', 'app'))110 # Invalid inputs are rejected111 self.assertRaises(ValueError, self.__socket.share_compatible, 'bad',112 'bad')113class SocketCacheTest(Test):114 """115 Some tests for the isc.bind10.socket_cache.Cache.116 This class, as well as being the testcase, pretends to be the117 socket creator so it can hijack all the requests for sockets.118 """119 def setUp(self):120 """121 Creates the cache for tests with us being the socket creator.122 Also creates some more variables for testing.123 """124 Test.setUp(self)125 self.__cache = isc.bind10.socket_cache.Cache(self)126 self.__address = IPAddr("192.0.2.1")127 self.__socket = isc.bind10.socket_cache.Socket('UDP', self.__address,128 1024, 42)129 self.__get_socket_called = False130 def test_init(self):131 """132 Checks the internals of the cache just after the creation.133 """134 self.assertEqual(self, self.__cache._creator)135 self.assertEqual({}, self.__cache._waiting_tokens)136 self.assertEqual({}, self.__cache._active_tokens)137 self.assertEqual({}, self.__cache._active_apps)138 self.assertEqual({}, self.__cache._sockets)139 self.assertEqual(set(), self.__cache._live_tokens)140 def get_socket(self, address, port, socktype):141 """142 Pretend to be a socket creator.143 This expects to be called with the _address, port 1024 and 'UDP'.144 Returns 42 and notes down it was called.145 """146 self.assertEqual(self.__address, address)147 self.assertEqual(1024, port)148 self.assertEqual('UDP', socktype)149 self.__get_socket_called = True150 return 42151 def test_get_token_cached(self):152 """153 Check the behaviour of get_token when the requested socket is already154 cached inside.155 """156 self.__cache._sockets = {157 'UDP': {'192.0.2.1': {1024: self.__socket}}158 }159 token = self.__cache.get_token('UDP', self.__address, 1024, 'ANY',160 'test')161 # It didn't call get_socket162 self.assertFalse(self.__get_socket_called)163 # It returned something164 self.assertIsNotNone(token)165 # The token is both in the waiting sockets and the live tokens166 self.assertEqual({token: self.__socket}, self.__cache._waiting_tokens)167 self.assertEqual(set([token]), self.__cache._live_tokens)168 # The token got the new share to block any relevant queries169 self.assertEqual({token: ('ANY', 'test')}, self.__socket.shares)170 # The socket knows the token is waiting in it171 self.assertEqual(set([token]), self.__socket.waiting_tokens)172 # If we request one more, with incompatible share, it is rejected173 self.assertRaises(isc.bind10.socket_cache.ShareError,174 self.__cache.get_token, 'UDP', self.__address, 1024,175 'NO', 'test')176 # The internals are not changed, so the same checks177 self.assertEqual({token: self.__socket}, self.__cache._waiting_tokens)178 self.assertEqual(set([token]), self.__cache._live_tokens)179 self.assertEqual({token: ('ANY', 'test')}, self.__socket.shares)180 self.assertEqual(set([token]), self.__socket.waiting_tokens)181 def test_get_token_uncached(self):182 """183 Check a new socket is created when a corresponding one is missing.184 """185 token = self.__cache.get_token('UDP', self.__address, 1024, 'ANY',186 'test')187 # The get_socket was called188 self.assertTrue(self.__get_socket_called)189 # It returned something190 self.assertIsNotNone(token)191 # Get the socket and check it looks OK192 socket = self.__cache._waiting_tokens[token]193 self.assertEqual(self.__address, socket.address)194 self.assertEqual(1024, socket.port)195 self.assertEqual(42, socket.fileno)196 self.assertEqual('UDP', socket.protocol)197 # The socket is properly cached198 self.assertEqual({199 'UDP': {'192.0.2.1': {1024: socket}}200 }, self.__cache._sockets)201 # The token is both in the waiting sockets and the live tokens202 self.assertEqual({token: socket}, self.__cache._waiting_tokens)203 self.assertEqual(set([token]), self.__cache._live_tokens)204 # The token got the new share to block any relevant queries205 self.assertEqual({token: ('ANY', 'test')}, socket.shares)206 # The socket knows the token is waiting in it207 self.assertEqual(set([token]), socket.waiting_tokens)208 def test_get_token_excs(self):209 """210 Test that it is handled properly if the socket creator raises211 some exceptions.212 """213 def raiseCreatorError(fatal):214 raise isc.bind10.sockcreator.CreatorError('test error', fatal)215 # First, fatal socket creator errors are passed through216 self.get_socket = lambda addr, port, proto: raiseCreatorError(True)217 self.assertRaises(isc.bind10.sockcreator.CreatorError,218 self.__cache.get_token, 'UDP', self.__address, 1024,219 'NO', 'test')220 # And nonfatal are converted to SocketError221 self.get_socket = lambda addr, port, proto: raiseCreatorError(False)222 self.assertRaises(isc.bind10.socket_cache.SocketError,223 self.__cache.get_token, 'UDP', self.__address, 1024,224 'NO', 'test')225 def test_get_socket(self):226 """227 Test that we can pickup a socket if we know a token.228 """229 token = "token"230 app = 13231 # No socket prepared there232 self.assertRaises(ValueError, self.__cache.get_socket, token, app)233 # Not changed234 self.assertEqual({}, self.__cache._active_tokens)235 self.assertEqual({}, self.__cache._active_apps)236 self.assertEqual({}, self.__cache._sockets)237 self.assertEqual(set(), self.__cache._live_tokens)238 # Prepare a token there239 self.__socket.waiting_tokens = set([token])240 self.__socket.shares = {token: ('ANY', 'app')}241 self.__cache._waiting_tokens = {token: self.__socket}242 self.__cache._sockets = {'UDP': {'192.0.2.1': {1024: self.__socket}}}243 self.__cache._live_tokens = set([token])244 socket = self.__cache.get_socket(token, app)245 # Received the fileno246 self.assertEqual(42, socket)247 # It moved from waiting to active ones248 self.assertEqual({}, self.__cache._waiting_tokens)249 self.assertEqual({token: self.__socket}, self.__cache._active_tokens)250 self.assertEqual({13: set([token])}, self.__cache._active_apps)251 self.assertEqual(set([token]), self.__cache._live_tokens)252 self.assertEqual(set(), self.__socket.waiting_tokens)253 self.assertEqual({token: 13}, self.__socket.active_tokens)254 # Trying to get it again fails255 self.assertRaises(ValueError, self.__cache.get_socket, token, app)256 def test_drop_application(self):257 """258 Test that a drop_application calls drop_socket on all the sockets259 held by the application.260 """261 sockets = set()262 def drop_socket(token):263 sockets.add(token)264 # Mock the drop_socket so we know it is called265 self.__cache.drop_socket = drop_socket266 self.assertRaises(ValueError, self.__cache.drop_application,267 13)268 self.assertEqual(set(), sockets)269 # Put the tokens into active_apps. Nothing else should be touched270 # by this call, so leave it alone.271 self.__cache._active_apps = {272 1: set(['t1', 't2']),273 2: set(['t3'])274 }275 self.__cache.drop_application(1)276 # We don't check the _active_apps, as it would be cleaned by277 # drop_socket and we removed it.278 self.assertEqual(set(['t1', 't2']), sockets)279 def test_drop_socket(self):280 """281 Test the drop_socket call. It tests:282 * That a socket that still has something to keep it alive is left alive283 (both waiting and active).284 * If not, it is deleted.285 * All bookkeeping data around are properly removed.286 * Of course the exception.287 """288 self.assertRaises(ValueError, self.__cache.drop_socket, "bad token")289 self.__socket.active_tokens = {'t1': 1}290 self.__socket.waiting_tokens = set(['t2'])291 self.__socket.shares = {'t1': ('ANY', 'app1'), 't2': ('ANY', 'app2')}292 self.__cache._waiting_tokens = {'t2': self.__socket}293 self.__cache._active_tokens = {'t1': self.__socket}294 self.__cache._sockets = {'UDP': {'192.0.2.1': {1024: self.__socket}}}295 self.__cache._live_tokens = set(['t1', 't2'])296 self.__cache._active_apps = {1: set(['t1'])}297 # We can't drop what wasn't picket up yet298 self.assertRaises(ValueError, self.__cache.drop_socket, 't2')299 self.assertEqual({'t1': 1}, self.__socket.active_tokens)300 self.assertEqual(set(['t2']), self.__socket.waiting_tokens)301 self.assertEqual({'t1': ('ANY', 'app1'), 't2': ('ANY', 'app2')},302 self.__socket.shares)303 self.assertEqual({'t2': self.__socket}, self.__cache._waiting_tokens)304 self.assertEqual({'t1': self.__socket}, self.__cache._active_tokens)305 self.assertEqual({'UDP': {'192.0.2.1': {1024: self.__socket}}},306 self.__cache._sockets)307 self.assertEqual(set(['t1', 't2']), self.__cache._live_tokens)308 self.assertEqual({1: set(['t1'])}, self.__cache._active_apps)309 self.assertEqual([], self._closes)310 # If we drop this, it survives because it waits for being picked up311 self.__cache.drop_socket('t1')312 self.assertEqual({}, self.__socket.active_tokens)313 self.assertEqual(set(['t2']), self.__socket.waiting_tokens)314 self.assertEqual({'t2': ('ANY', 'app2')}, self.__socket.shares)315 self.assertEqual({}, self.__cache._active_tokens)316 self.assertEqual({'UDP': {'192.0.2.1': {1024: self.__socket}}},317 self.__cache._sockets)318 self.assertEqual(set(['t2']), self.__cache._live_tokens)319 self.assertEqual({}, self.__cache._active_apps)320 self.assertEqual([], self._closes)321 # Fill it again, now two applications having the same socket322 self.__socket.active_tokens = {'t1': 1, 't2': 2}323 self.__socket.waiting_tokens = set()324 self.__socket.shares = {'t1': ('ANY', 'app1'), 't2': ('ANY', 'app2')}325 self.__cache._waiting_tokens = {}326 self.__cache._active_tokens = {327 't1': self.__socket,328 't2': self.__socket329 }330 self.__cache._live_tokens = set(['t1', 't2', 't3'])331 self.assertEqual([], self._closes)332 # We cheat here little bit, the t3 doesn't exist enywhere else, but333 # we need to check the app isn't removed too soon and it shouldn't334 # matter anywhere else, so we just avoid the tiresome filling in335 self.__cache._active_apps = {1: set(['t1', 't3']), 2: set(['t2'])}336 # Drop it as t1. It should still live.337 self.__cache.drop_socket('t1')338 self.assertEqual({'t2': 2}, self.__socket.active_tokens)339 self.assertEqual(set(), self.__socket.waiting_tokens)340 self.assertEqual({'t2': ('ANY', 'app2')}, self.__socket.shares)341 self.assertEqual({}, self.__cache._waiting_tokens)342 self.assertEqual({'t2': self.__socket}, self.__cache._active_tokens)343 self.assertEqual({'UDP': {'192.0.2.1': {1024: self.__socket}}},344 self.__cache._sockets)345 self.assertEqual(set(['t3', 't2']), self.__cache._live_tokens)346 self.assertEqual({1: set(['t3']), 2: set(['t2'])},347 self.__cache._active_apps)348 self.assertEqual([], self._closes)349 # Drop it again, from the other application. It should get removed350 # and closed.351 self.__cache.drop_socket('t2')352 self.assertEqual({}, self.__socket.active_tokens)353 self.assertEqual(set(), self.__socket.waiting_tokens)354 self.assertEqual({}, self.__socket.shares)355 self.assertEqual({}, self.__cache._waiting_tokens)356 self.assertEqual({}, self.__cache._active_tokens)357 self.assertEqual({}, self.__cache._sockets)358 self.assertEqual(set(['t3']), self.__cache._live_tokens)359 self.assertEqual({1: set(['t3'])}, self.__cache._active_apps)360 # The cache doesn't hold the socket. So when we remove it ourself,361 # it should get closed.362 self.__socket = None363 self.assertEqual([42], self._closes)364if __name__ == '__main__':365 isc.log.init("bind10")366 isc.log.resetUnitTestRootLogger()...
DossierRequester.py
Source:DossierRequester.py
1# Python bytecode 2.7 (decompiled from Python 2.7)2# Embedded file name: scripts/client/gui/shared/utils/requesters/DossierRequester.py3import time4from functools import partial5import BigWorld6import constants7import dossiers28import AccountCommands9from adisp import async10from debug_utils import LOG_ERROR11from gui.shared.utils import code2str12from gui.shared.utils.requesters.abstract import AbstractSyncDataRequester13from gui.shared.utils.requesters.common import RequestProcessor14from skeletons.gui.shared.utils.requesters import IDossierRequester15class UserDossier(object):16 __queue = []17 __lastResponseTime = 018 __request = None19 def __init__(self, databaseID):20 self.__cache = {'databaseID': int(databaseID),21 'account': None,22 'vehicles': {},23 'clan': None,24 'hidden': False,25 'available': True,26 'rating': None,27 'rated7x7Seasons': {},28 'ranked': None}29 return30 def __setLastResponseTime(self):31 self.__lastResponseTime = time.time()32 def __nextRequestTime(self):33 t = constants.REQUEST_COOLDOWN.PLAYER_DOSSIER - (time.time() - self.__lastResponseTime)34 return t if t > 0 else 035 def __processQueue(self):36 if self.__request is not None:37 return38 elif self.__queue:39 self.__request = RequestProcessor(self.__nextRequestTime(), self.__queue.pop())40 return41 else:42 return43 def __requestPlayerInfo(self, callback):44 def proxyCallback(value):45 if value is not None and len(value) > 1:46 self.__cache['databaseID'] = value[0]47 self.__cache['account'] = dossiers2.getAccountDossierDescr(value[1])48 self.__cache['clan'] = value[2]49 self.__cache['rating'] = value[3]50 self.__cache['rated7x7Seasons'] = seasons = {}51 self.__cache['ranked'] = value[5]52 self.__cache['dogTag'] = value[6]53 self.__cache['battleRoyaleStats'] = value[7]54 for sID, d in (value[4] or {}).iteritems():55 seasons[sID] = dossiers2.getRated7x7DossierDescr(d)56 callback(self.__cache['account'])57 return58 self.__queue.append(lambda : BigWorld.player().requestPlayerInfo(self.__cache['databaseID'], partial(lambda c, code, databaseID, dossier, clanID, clanInfo, gRating, eSportSeasons, ranked, dogTag, br: self.__processValueResponse(c, code, (databaseID,59 dossier,60 (clanID, clanInfo),61 gRating,62 eSportSeasons,63 ranked,64 dogTag,65 br)), proxyCallback)))66 self.__processQueue()67 def __requestAccountDossier(self, callback):68 def proxyCallback(dossier):69 self.__cache['account'] = dossiers2.getAccountDossierDescr(dossier)70 callback(self.__cache['account'])71 self.__queue.append(lambda : BigWorld.player().requestAccountDossier(self.__cache['databaseID'], partial(self.__processValueResponse, proxyCallback)))72 self.__processQueue()73 def __requestVehicleDossier(self, vehCompDescr, callback):74 def proxyCallback(dossier):75 self.__cache['vehicles'][vehCompDescr] = dossiers2.getVehicleDossierDescr(dossier)76 callback(self.__cache['vehicles'][vehCompDescr])77 self.__queue.append(lambda : BigWorld.player().requestVehicleDossier(self.__cache['databaseID'], vehCompDescr, partial(self.__processValueResponse, proxyCallback)))78 self.__processQueue()79 def __requestClanInfo(self, callback):80 self.__queue.append(lambda : BigWorld.player().requestPlayerClanInfo(self.__cache['databaseID'], partial(lambda c, code, str, clanDBID, clanInfo: self.__processValueResponse(c, code, (clanDBID, clanInfo)), callback)))81 self.__processQueue()82 def __processValueResponse(self, callback, code, value):83 self.__setLastResponseTime()84 self.__request = None85 if code < 0:86 LOG_ERROR('Error while server request (code=%s): %s' % (code, code2str(code)))87 if code == AccountCommands.RES_HIDDEN_DOSSIER:88 self.__cache['hidden'] = True89 elif code == AccountCommands.RES_CENTER_DISCONNECTED:90 self.__cache['available'] = False91 callback('')92 else:93 callback(value)94 self.__processQueue()95 return96 @async97 def getAccountDossier(self, callback):98 if not self.isValid:99 callback(None)100 if self.__cache.get('account') is None:101 self.__requestPlayerInfo(callback)102 return103 else:104 callback(self.__cache['account'])105 return106 @async107 def getClanInfo(self, callback):108 if not self.isValid:109 callback(None)110 if self.__cache.get('clan') is None:111 self.__requestClanInfo(callback)112 return113 else:114 callback(self.__cache['clan'])115 return116 @async117 def getRated7x7Seasons(self, callback):118 if not self.isValid:119 callback({})120 if self.__cache.get('rated7x7Seasons') is None:121 self.__requestPlayerInfo(lambda accDossier: callback(self.__cache['rated7x7Seasons']))122 return123 else:124 callback(self.__cache['rated7x7Seasons'])125 return126 @async127 def getRankedInfo(self, callback):128 if not self.isValid:129 callback({})130 if self.__cache.get('ranked') is None:131 self.__requestPlayerInfo(lambda accDossier: callback(self.__cache['ranked']))132 return133 else:134 callback(self.__cache['ranked'])135 return136 @async137 def getGlobalRating(self, callback):138 if not self.isValid:139 callback(None)140 if self.__cache.get('rating') is None:141 self.__requestPlayerInfo(lambda accDossier: callback(self.__cache['rating']))142 return143 else:144 callback(self.__cache['rating'])145 return146 @async147 def getVehicleDossier(self, vehCompDescr, callback):148 if not self.isValid:149 callback(None)150 if self.__cache.get('vehicles', {}).get(vehCompDescr, None) is None:151 self.__requestVehicleDossier(vehCompDescr, callback)152 return153 else:154 callback(self.__cache['vehicles'][vehCompDescr])155 return156 @async157 def getDogTag(self, callback):158 if not self.isValid:159 callback(None)160 if self.__cache.get('dogTag') is None:161 self.__requestPlayerInfo(callback)162 return163 else:164 callback(self.__cache['dogTag'])165 return166 @async167 def getBattleRoyaleStats(self, callback):168 if not self.isValid:169 callback({})170 if self.__cache.get('battleRoyaleStats') is None:171 self.__requestPlayerInfo(lambda accDossier: callback(self.__cache['battleRoyaleStats']))172 return173 else:174 callback(self.__cache['battleRoyaleStats'])175 return176 @property177 def isHidden(self):178 return self.__cache.get('hidden', False)179 @property180 def isAvailable(self):181 return self.__cache.get('available', False)182 @property183 def isValid(self):184 return not self.isHidden and self.isAvailable185class DossierRequester(AbstractSyncDataRequester, IDossierRequester):186 def __init__(self):187 super(DossierRequester, self).__init__()188 self.__users = {}189 @async190 def _requestCache(self, callback):191 BigWorld.player().dossierCache.getCache(lambda resID, value: self._response(resID, value, callback))192 def getVehicleDossier(self, vehTypeCompDescr):193 return self.getCacheValue((constants.DOSSIER_TYPE.VEHICLE, vehTypeCompDescr), (0, ''))[1]194 def getVehDossiersIterator(self):195 for (dossierType, vehIntCD), records in self._data.iteritems():196 if dossierType == constants.DOSSIER_TYPE.VEHICLE:197 yield (vehIntCD, records[1])198 def getUserDossierRequester(self, databaseID):199 databaseID = int(databaseID)200 return self.__users.setdefault(databaseID, UserDossier(databaseID))201 def closeUserDossier(self, databaseID):202 if databaseID in self.__users:203 del self.__users[databaseID]204 def onCenterIsLongDisconnected(self, isLongDisconnected):205 if isLongDisconnected:206 return...
problem_1_test.py
Source:problem_1_test.py
1import unittest2from Project_2.problem_1.problem_1 import LRUCache3class LRUCacheTest(unittest.TestCase):4 def setUp(self):5 self.__cache = LRUCache()6 self.__tmp = self.__cache.get_dll()7 def test_set_value_cache(self):8 self.assertTrue(self.__cache.set(1, "Yadira"))9 # Set on top. Sets "Edgar" on top10 self.assertTrue(self.__cache.set(9, "Edgar"))11 self.assertEqual(self.__tmp.to_list(), ["Edgar", "Yadira"])12 # Prints cache["Edgar", "Yadira"]13 print(self.__tmp.to_list())14 def test_set_value_existing_key(self):15 self.__cache.set(2, 2)16 self.__cache.set(1, 1)17 self.__cache.set(3, 3)18 self.__cache.set(1, 10)19 print(self.__cache.get(1))20 # should return 1021 print(self.__cache.get(2))22 # should return 223 self.__cache.set(1, "Carmen")24 print(self.__cache.get(1))25 def test_max_capacity_cache(self):26 self.__cache.set(1, 1)27 self.__cache.set(2, 2)28 self.__cache.set(3, 3)29 self.__cache.set(4, 4)30 print(self.__cache.get(1))31 # returns 132 print(self.__cache.get(2))33 # returns 234 print(self.__cache.get(9))35 # returns -1 because 9 is not present in the cache36 self.__cache.set(5, 5)37 # Max capacity. Remove LRU to add 638 self.__cache.set(6, 6)39 # returns 640 print(self.__cache.get(6))41 # prints capacity 542 print(self.__cache.number_elements)43 # should return -144 print(self.__cache.get(3))45 def test_set_none(self):46 # Values as None, asserts False47 self.assertFalse(self.__cache.set(None, None))48 # return False for None values49 print(self.__cache.set(None, None))50 self.assertFalse(self.__cache.set(0, None))51 # return False, if at least one value is None52 print(self.__cache.set(0, None))53 self.assertFalse(self.__cache.set(None, 0))54 # return False, if at least one value is None55 print(self.__cache.set(None, 0))56 def test_set_zero_capacity(self):57 new_cache = LRUCache(0)58 self.assertEqual(new_cache.set(1, 1), "Can't perform operations on 0 capacity cache")59 print(new_cache.set(1, 1))60 def test_get_existing_value(self):61 self.__cache.set(1, "Yadira")62 self.__cache.set(9, "Edgar")63 self.assertEqual(self.__cache.get(1), "Yadira")64 # prints Yadira65 print(self.__cache.get(1))66 # Setting existing value to top67 self.__cache.get(1)68 self.assertEqual(self.__tmp.to_list(), ["Yadira", "Edgar"])69 # Existing value "Yadira" on top of cache. Prints ["Yadira", "Edgar"]70 print(self.__cache.get_dll().to_list())71 def test_get_non_existing_value(self):72 self.__cache.set(1, 1)73 self.__cache.set(2, 2)74 self.assertEqual(self.__cache.get(10), -1)75 # Prints -1 for a non existing value76 print(self.__cache.get(10))77 print(self.__cache.get(20))78 def test_get_none(self):79 # Key is None80 self.assertFalse(self.__cache.get(None))81 # Print False when key is None82 print(self.__cache.get(None))83 def test_get_value_zero_capacity_cache(self):84 new_cache = LRUCache(0)85 new_cache.set(1, 1)86 # prints -1, no key will be found on 0 capacity cache87 self.assertEqual(new_cache.get(1), -1)88 print(new_cache.get(1))89if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!