Best Python code snippet using SeleniumBase
test_memory_leaks.py
Source:test_memory_leaks.py
1#!/usr/bin/env python2# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.3# Use of this source code is governed by a BSD-style license that can be4# found in the LICENSE file.5"""6Tests for detecting function memory leaks (typically the ones7implemented in C). It does so by calling a function many times and8checking whether process memory usage keeps increasing between9calls or over time.10Note that this may produce false positives (especially on Windows11for some reason).12"""13from __future__ import print_function14import errno15import functools16import gc17import os18import sys19import threading20import time21import psutil22import psutil._common23from psutil import LINUX24from psutil import OPENBSD25from psutil import OSX26from psutil import POSIX27from psutil import SUNOS28from psutil import WINDOWS29from psutil._compat import xrange30from psutil.tests import create_sockets31from psutil.tests import get_test_subprocess32from psutil.tests import HAS_CPU_AFFINITY33from psutil.tests import HAS_CPU_FREQ34from psutil.tests import HAS_ENVIRON35from psutil.tests import HAS_IONICE36from psutil.tests import HAS_MEMORY_MAPS37from psutil.tests import HAS_PROC_CPU_NUM38from psutil.tests import HAS_PROC_IO_COUNTERS39from psutil.tests import HAS_RLIMIT40from psutil.tests import HAS_SENSORS_BATTERY41from psutil.tests import HAS_SENSORS_FANS42from psutil.tests import HAS_SENSORS_TEMPERATURES43from psutil.tests import reap_children44from psutil.tests import run_test_module_by_name45from psutil.tests import safe_rmpath46from psutil.tests import skip_on_access_denied47from psutil.tests import TESTFN48from psutil.tests import TRAVIS49from psutil.tests import unittest50LOOPS = 100051MEMORY_TOLERANCE = 409652RETRY_FOR = 353SKIP_PYTHON_IMPL = True if TRAVIS else False54cext = psutil._psplatform.cext55thisproc = psutil.Process()56SKIP_PYTHON_IMPL = True if TRAVIS else False57# ===================================================================58# utils59# ===================================================================60def skip_if_linux():61 return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL,62 "worthless on LINUX (pure python)")63def bytes2human(n):64 """65 http://code.activestate.com/recipes/57801966 >>> bytes2human(10000)67 '9.8K'68 >>> bytes2human(100001221)69 '95.4M'70 """71 symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')72 prefix = {}73 for i, s in enumerate(symbols):74 prefix[s] = 1 << (i + 1) * 1075 for s in reversed(symbols):76 if n >= prefix[s]:77 value = float(n) / prefix[s]78 return '%.2f%s' % (value, s)79 return "%sB" % n80class TestMemLeak(unittest.TestCase):81 """Base framework class which calls a function many times and82 produces a failure if process memory usage keeps increasing83 between calls or over time.84 """85 tolerance = MEMORY_TOLERANCE86 loops = LOOPS87 retry_for = RETRY_FOR88 def setUp(self):89 gc.collect()90 def execute(self, fun, *args, **kwargs):91 """Test a callable."""92 def call_many_times():93 for x in xrange(loops):94 self._call(fun, *args, **kwargs)95 del x96 gc.collect()97 tolerance = kwargs.pop('tolerance_', None) or self.tolerance98 loops = kwargs.pop('loops_', None) or self.loops99 retry_for = kwargs.pop('retry_for_', None) or self.retry_for100 # warm up101 for x in range(10):102 self._call(fun, *args, **kwargs)103 self.assertEqual(gc.garbage, [])104 self.assertEqual(threading.active_count(), 1)105 self.assertEqual(thisproc.children(), [])106 # Get 2 distinct memory samples, before and after having107 # called fun repeadetly.108 # step 1109 call_many_times()110 mem1 = self._get_mem()111 # step 2112 call_many_times()113 mem2 = self._get_mem()114 diff1 = mem2 - mem1115 if diff1 > tolerance:116 # This doesn't necessarily mean we have a leak yet.117 # At this point we assume that after having called the118 # function so many times the memory usage is stabilized119 # and if there are no leaks it should not increase120 # anymore.121 # Let's keep calling fun for 3 more seconds and fail if122 # we notice any difference.123 ncalls = 0124 stop_at = time.time() + retry_for125 while time.time() <= stop_at:126 self._call(fun, *args, **kwargs)127 ncalls += 1128 del stop_at129 gc.collect()130 mem3 = self._get_mem()131 diff2 = mem3 - mem2132 if mem3 > mem2:133 # failure134 extra_proc_mem = bytes2human(diff1 + diff2)135 print("exta proc mem: %s" % extra_proc_mem, file=sys.stderr)136 msg = "+%s after %s calls, +%s after another %s calls, "137 msg += "+%s extra proc mem"138 msg = msg % (139 bytes2human(diff1), loops, bytes2human(diff2), ncalls,140 extra_proc_mem)141 self.fail(msg)142 def execute_w_exc(self, exc, fun, *args, **kwargs):143 """Convenience function which tests a callable raising144 an exception.145 """146 def call():147 self.assertRaises(exc, fun, *args, **kwargs)148 self.execute(call)149 @staticmethod150 def _get_mem():151 # By using USS memory it seems it's less likely to bump152 # into false positives.153 if LINUX or WINDOWS or OSX:154 return thisproc.memory_full_info().uss155 else:156 return thisproc.memory_info().rss157 @staticmethod158 def _call(fun, *args, **kwargs):159 fun(*args, **kwargs)160# ===================================================================161# Process class162# ===================================================================163class TestProcessObjectLeaks(TestMemLeak):164 """Test leaks of Process class methods."""165 proc = thisproc166 def test_coverage(self):167 skip = set((168 "pid", "as_dict", "children", "cpu_affinity", "cpu_percent",169 "ionice", "is_running", "kill", "memory_info_ex", "memory_percent",170 "nice", "oneshot", "parent", "rlimit", "send_signal", "suspend",171 "terminate", "wait"))172 for name in dir(psutil.Process):173 if name.startswith('_'):174 continue175 if name in skip:176 continue177 self.assertTrue(hasattr(self, "test_" + name), msg=name)178 @skip_if_linux()179 def test_name(self):180 self.execute(self.proc.name)181 @skip_if_linux()182 def test_cmdline(self):183 self.execute(self.proc.cmdline)184 @skip_if_linux()185 def test_exe(self):186 self.execute(self.proc.exe)187 @skip_if_linux()188 def test_ppid(self):189 self.execute(self.proc.ppid)190 @unittest.skipIf(not POSIX, "POSIX only")191 @skip_if_linux()192 def test_uids(self):193 self.execute(self.proc.uids)194 @unittest.skipIf(not POSIX, "POSIX only")195 @skip_if_linux()196 def test_gids(self):197 self.execute(self.proc.gids)198 @skip_if_linux()199 def test_status(self):200 self.execute(self.proc.status)201 def test_nice_get(self):202 self.execute(self.proc.nice)203 def test_nice_set(self):204 niceness = thisproc.nice()205 self.execute(self.proc.nice, niceness)206 @unittest.skipIf(not HAS_IONICE, "not supported")207 def test_ionice_get(self):208 self.execute(self.proc.ionice)209 @unittest.skipIf(not HAS_IONICE, "not supported")210 def test_ionice_set(self):211 if WINDOWS:212 value = thisproc.ionice()213 self.execute(self.proc.ionice, value)214 else:215 self.execute(self.proc.ionice, psutil.IOPRIO_CLASS_NONE)216 fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0)217 self.execute_w_exc(OSError, fun)218 @unittest.skipIf(not HAS_PROC_IO_COUNTERS, "not supported")219 @skip_if_linux()220 def test_io_counters(self):221 self.execute(self.proc.io_counters)222 @unittest.skipIf(POSIX, "worthless on POSIX")223 def test_username(self):224 self.execute(self.proc.username)225 @skip_if_linux()226 def test_create_time(self):227 self.execute(self.proc.create_time)228 @skip_if_linux()229 @skip_on_access_denied(only_if=OPENBSD)230 def test_num_threads(self):231 self.execute(self.proc.num_threads)232 @unittest.skipIf(not WINDOWS, "WINDOWS only")233 def test_num_handles(self):234 self.execute(self.proc.num_handles)235 @unittest.skipIf(not POSIX, "POSIX only")236 @skip_if_linux()237 def test_num_fds(self):238 self.execute(self.proc.num_fds)239 @skip_if_linux()240 def test_num_ctx_switches(self):241 self.execute(self.proc.num_ctx_switches)242 @skip_if_linux()243 @skip_on_access_denied(only_if=OPENBSD)244 def test_threads(self):245 self.execute(self.proc.threads)246 @skip_if_linux()247 def test_cpu_times(self):248 self.execute(self.proc.cpu_times)249 @skip_if_linux()250 @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported")251 def test_cpu_num(self):252 self.execute(self.proc.cpu_num)253 @skip_if_linux()254 def test_memory_info(self):255 self.execute(self.proc.memory_info)256 @skip_if_linux()257 def test_memory_full_info(self):258 self.execute(self.proc.memory_full_info)259 @unittest.skipIf(not POSIX, "POSIX only")260 @skip_if_linux()261 def test_terminal(self):262 self.execute(self.proc.terminal)263 @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,264 "worthless on POSIX (pure python)")265 def test_resume(self):266 self.execute(self.proc.resume)267 @skip_if_linux()268 def test_cwd(self):269 self.execute(self.proc.cwd)270 @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported")271 def test_cpu_affinity_get(self):272 self.execute(self.proc.cpu_affinity)273 @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported")274 def test_cpu_affinity_set(self):275 affinity = thisproc.cpu_affinity()276 self.execute(self.proc.cpu_affinity, affinity)277 if not TRAVIS:278 self.execute_w_exc(ValueError, self.proc.cpu_affinity, [-1])279 @skip_if_linux()280 def test_open_files(self):281 safe_rmpath(TESTFN) # needed after UNIX socket test has run282 with open(TESTFN, 'w'):283 self.execute(self.proc.open_files)284 # OSX implementation is unbelievably slow285 @unittest.skipIf(OSX, "too slow on OSX")286 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")287 @skip_if_linux()288 def test_memory_maps(self):289 self.execute(self.proc.memory_maps)290 @unittest.skipIf(not LINUX, "LINUX only")291 @unittest.skipIf(not HAS_RLIMIT, "not supported")292 def test_rlimit_get(self):293 self.execute(self.proc.rlimit, psutil.RLIMIT_NOFILE)294 @unittest.skipIf(not LINUX, "LINUX only")295 @unittest.skipIf(not HAS_RLIMIT, "not supported")296 def test_rlimit_set(self):297 limit = thisproc.rlimit(psutil.RLIMIT_NOFILE)298 self.execute(self.proc.rlimit, psutil.RLIMIT_NOFILE, limit)299 self.execute_w_exc(OSError, self.proc.rlimit, -1)300 @skip_if_linux()301 # Windows implementation is based on a single system-wide302 # function (tested later).303 @unittest.skipIf(WINDOWS, "worthless on WINDOWS")304 def test_connections(self):305 # TODO: UNIX sockets are temporarily implemented by parsing306 # 'pfiles' cmd output; we don't want that part of the code to307 # be executed.308 with create_sockets():309 kind = 'inet' if SUNOS else 'all'310 self.execute(self.proc.connections, kind)311 @unittest.skipIf(not HAS_ENVIRON, "not supported")312 def test_environ(self):313 self.execute(self.proc.environ)314 @unittest.skipIf(not WINDOWS, "WINDOWS only")315 def test_proc_info(self):316 self.execute(cext.proc_info, os.getpid())317class TestTerminatedProcessLeaks(TestProcessObjectLeaks):318 """Repeat the tests above looking for leaks occurring when dealing319 with terminated processes raising NoSuchProcess exception.320 The C functions are still invoked but will follow different code321 paths. We'll check those code paths.322 """323 @classmethod324 def setUpClass(cls):325 super(TestTerminatedProcessLeaks, cls).setUpClass()326 p = get_test_subprocess()327 cls.proc = psutil.Process(p.pid)328 cls.proc.kill()329 cls.proc.wait()330 @classmethod331 def tearDownClass(cls):332 super(TestTerminatedProcessLeaks, cls).tearDownClass()333 reap_children()334 def _call(self, fun, *args, **kwargs):335 try:336 fun(*args, **kwargs)337 except psutil.NoSuchProcess:338 pass339 if WINDOWS:340 def test_kill(self):341 self.execute(self.proc.kill)342 def test_terminate(self):343 self.execute(self.proc.terminate)344 def test_suspend(self):345 self.execute(self.proc.suspend)346 def test_resume(self):347 self.execute(self.proc.resume)348 def test_wait(self):349 self.execute(self.proc.wait)350 def test_proc_info(self):351 # test dual implementation352 def call():353 try:354 return cext.proc_info(self.proc.pid)355 except OSError as err:356 if err.errno != errno.ESRCH:357 raise358 self.execute(call)359# ===================================================================360# system APIs361# ===================================================================362class TestModuleFunctionsLeaks(TestMemLeak):363 """Test leaks of psutil module functions."""364 def test_coverage(self):365 skip = set((366 "version_info", "__version__", "process_iter", "wait_procs",367 "cpu_percent", "cpu_times_percent", "cpu_count"))368 for name in psutil.__all__:369 if not name.islower():370 continue371 if name in skip:372 continue373 self.assertTrue(hasattr(self, "test_" + name), msg=name)374 # --- cpu375 @skip_if_linux()376 def test_cpu_count_logical(self):377 self.execute(psutil.cpu_count, logical=True)378 @skip_if_linux()379 def test_cpu_count_physical(self):380 self.execute(psutil.cpu_count, logical=False)381 @skip_if_linux()382 def test_cpu_times(self):383 self.execute(psutil.cpu_times)384 @skip_if_linux()385 def test_per_cpu_times(self):386 self.execute(psutil.cpu_times, percpu=True)387 def test_cpu_stats(self):388 self.execute(psutil.cpu_stats)389 @skip_if_linux()390 @unittest.skipIf(not HAS_CPU_FREQ, "not supported")391 def test_cpu_freq(self):392 self.execute(psutil.cpu_freq)393 # --- mem394 def test_virtual_memory(self):395 self.execute(psutil.virtual_memory)396 # TODO: remove this skip when this gets fixed397 @unittest.skipIf(SUNOS,398 "worthless on SUNOS (uses a subprocess)")399 def test_swap_memory(self):400 self.execute(psutil.swap_memory)401 @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,402 "worthless on POSIX (pure python)")403 def test_pid_exists(self):404 self.execute(psutil.pid_exists, os.getpid())405 # --- disk406 @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,407 "worthless on POSIX (pure python)")408 def test_disk_usage(self):409 self.execute(psutil.disk_usage, '.')410 def test_disk_partitions(self):411 self.execute(psutil.disk_partitions)412 @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),413 '/proc/diskstats not available on this Linux version')414 @skip_if_linux()415 def test_disk_io_counters(self):416 self.execute(psutil.disk_io_counters, nowrap=False)417 # --- proc418 @skip_if_linux()419 def test_pids(self):420 self.execute(psutil.pids)421 # --- net422 @skip_if_linux()423 def test_net_io_counters(self):424 self.execute(psutil.net_io_counters, nowrap=False)425 @unittest.skipIf(LINUX,426 "worthless on Linux (pure python)")427 @unittest.skipIf(OSX and os.getuid() != 0, "need root access")428 def test_net_connections(self):429 with create_sockets():430 self.execute(psutil.net_connections)431 def test_net_if_addrs(self):432 # Note: verified that on Windows this was a false positive.433 self.execute(psutil.net_if_addrs,434 tolerance_=80 * 1024 if WINDOWS else None)435 @unittest.skipIf(TRAVIS, "EPERM on travis")436 def test_net_if_stats(self):437 self.execute(psutil.net_if_stats)438 # --- sensors439 @skip_if_linux()440 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")441 def test_sensors_battery(self):442 self.execute(psutil.sensors_battery)443 @skip_if_linux()444 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")445 def test_sensors_temperatures(self):446 self.execute(psutil.sensors_temperatures)447 @skip_if_linux()448 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")449 def test_sensors_fans(self):450 self.execute(psutil.sensors_fans)451 # --- others452 @skip_if_linux()453 def test_boot_time(self):454 self.execute(psutil.boot_time)455 # XXX - on Windows this produces a false positive456 @unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows")457 def test_users(self):458 self.execute(psutil.users)459 if WINDOWS:460 # --- win services461 def test_win_service_iter(self):462 self.execute(cext.winservice_enumerate)463 def test_win_service_get(self):464 pass465 def test_win_service_get_config(self):466 name = next(psutil.win_service_iter()).name()467 self.execute(cext.winservice_query_config, name)468 def test_win_service_get_status(self):469 name = next(psutil.win_service_iter()).name()470 self.execute(cext.winservice_query_status, name)471 def test_win_service_get_description(self):472 name = next(psutil.win_service_iter()).name()473 self.execute(cext.winservice_query_descr, name)474if __name__ == '__main__':...
testutils.py
Source:testutils.py
...44 else:45 return f(self)46 return skipIf__47 return skipIf_48 def skip(msg):49 return skipIf(True, msg)50 def skipTest(self, msg):51 with warnings.catch_warnings():52 warnings.simplefilter('always', UserWarning)53 warnings.warn(msg)54 return55 unittest.TestCase.skipTest = skipTest56# Silence warnings caused by the stubbornness of the Python unittest57# maintainers58# http://bugs.python.org/issue942459if (not hasattr(unittest.TestCase, 'assert_')60 or unittest.TestCase.assertTrue is not unittest.TestCase.assertTrue):61 # mavaff...62 unittest.TestCase.assertTrue = unittest.TestCase.assertTrue...
skip_list.py
Source:skip_list.py
1"""2Based on "Skip Lists: A Probabilistic Alternative to Balanced Trees" by William Pugh3https://epaperpress.com/sortsearch/download/skiplist.pdf4"""5from __future__ import annotations6from random import random7from typing import Generic, Optional, TypeVar8KT = TypeVar("KT")9VT = TypeVar("VT")10class Node(Generic[KT, VT]):11 def __init__(self, key: KT, value: VT):12 self.key = key13 self.value = value14 self.forward: list[Node[KT, VT]] = []15 def __repr__(self) -> str:16 """17 :return: Visual representation of Node18 >>> node = Node("Key", 2)19 >>> repr(node)20 'Node(Key: 2)'21 """22 return f"Node({self.key}: {self.value})"23 @property24 def level(self) -> int:25 """26 :return: Number of forward references27 >>> node = Node("Key", 2)28 >>> node.level29 030 >>> node.forward.append(Node("Key2", 4))31 >>> node.level32 133 >>> node.forward.append(Node("Key3", 6))34 >>> node.level35 236 """37 return len(self.forward)38class SkipList(Generic[KT, VT]):39 def __init__(self, p: float = 0.5, max_level: int = 16):40 self.head = Node("root", None)41 self.level = 042 self.p = p43 self.max_level = max_level44 def __str__(self) -> str:45 """46 :return: Visual representation of SkipList47 >>> skip_list = SkipList()48 >>> print(skip_list)49 SkipList(level=0)50 >>> skip_list.insert("Key1", "Value")51 >>> print(skip_list) # doctest: +ELLIPSIS52 SkipList(level=...53 [root]--...54 [Key1]--Key1...55 None *...56 >>> skip_list.insert("Key2", "OtherValue")57 >>> print(skip_list) # doctest: +ELLIPSIS58 SkipList(level=...59 [root]--...60 [Key1]--Key1...61 [Key2]--Key2...62 None *...63 """64 items = list(self)65 if len(items) == 0:66 return f"SkipList(level={self.level})"67 label_size = max((len(str(item)) for item in items), default=4)68 label_size = max(label_size, 4) + 469 node = self.head70 lines = []71 forwards = node.forward.copy()72 lines.append(f"[{node.key}]".ljust(label_size, "-") + "* " * len(forwards))73 lines.append(" " * label_size + "| " * len(forwards))74 while len(node.forward) != 0:75 node = node.forward[0]76 lines.append(77 f"[{node.key}]".ljust(label_size, "-")78 + " ".join(str(n.key) if n.key == node.key else "|" for n in forwards)79 )80 lines.append(" " * label_size + "| " * len(forwards))81 forwards[: node.level] = node.forward82 lines.append("None".ljust(label_size) + "* " * len(forwards))83 return f"SkipList(level={self.level})\n" + "\n".join(lines)84 def __iter__(self):85 node = self.head86 while len(node.forward) != 0:87 yield node.forward[0].key88 node = node.forward[0]89 def random_level(self) -> int:90 """91 :return: Random level from [1, self.max_level] interval.92 Higher values are less likely.93 """94 level = 195 while random() < self.p and level < self.max_level:96 level += 197 return level98 def _locate_node(self, key) -> tuple[Optional[Node[KT, VT]], list[Node[KT, VT]]]:99 """100 :param key: Searched key,101 :return: Tuple with searched node (or None if given key is not present)102 and list of nodes that refer (if key is present) of should refer to103 given node.104 """105 # Nodes with refer or should refer to output node106 update_vector = []107 node = self.head108 for i in reversed(range(self.level)):109 # i < node.level - When node level is lesser than `i` decrement `i`.110 # node.forward[i].key < key - Jumping to node with key value higher111 # or equal to searched key would result112 # in skipping searched key.113 while i < node.level and node.forward[i].key < key:114 node = node.forward[i]115 # Each leftmost node (relative to searched node) will potentially have to116 # be updated.117 update_vector.append(node)118 update_vector.reverse() # Note that we were inserting values in reverse order.119 # len(node.forward) != 0 - If current node doesn't contain any further120 # references then searched key is not present.121 # node.forward[0].key == key - Next node key should be equal to search key122 # if key is present.123 if len(node.forward) != 0 and node.forward[0].key == key:124 return node.forward[0], update_vector125 else:126 return None, update_vector127 def delete(self, key: KT):128 """129 :param key: Key to remove from list.130 >>> skip_list = SkipList()131 >>> skip_list.insert(2, "Two")132 >>> skip_list.insert(1, "One")133 >>> skip_list.insert(3, "Three")134 >>> list(skip_list)135 [1, 2, 3]136 >>> skip_list.delete(2)137 >>> list(skip_list)138 [1, 3]139 """140 node, update_vector = self._locate_node(key)141 if node is not None:142 for i, update_node in enumerate(update_vector):143 # Remove or replace all references to removed node.144 if update_node.level > i and update_node.forward[i].key == key:145 if node.level > i:146 update_node.forward[i] = node.forward[i]147 else:148 update_node.forward = update_node.forward[:i]149 def insert(self, key: KT, value: VT):150 """151 :param key: Key to insert.152 :param value: Value associated with given key.153 >>> skip_list = SkipList()154 >>> skip_list.insert(2, "Two")155 >>> skip_list.find(2)156 'Two'157 >>> list(skip_list)158 [2]159 """160 node, update_vector = self._locate_node(key)161 if node is not None:162 node.value = value163 else:164 level = self.random_level()165 if level > self.level:166 # After level increase we have to add additional nodes to head.167 for i in range(self.level - 1, level):168 update_vector.append(self.head)169 self.level = level170 new_node = Node(key, value)171 for i, update_node in enumerate(update_vector[:level]):172 # Change references to pass through new node.173 if update_node.level > i:174 new_node.forward.append(update_node.forward[i])175 if update_node.level < i + 1:176 update_node.forward.append(new_node)177 else:178 update_node.forward[i] = new_node179 def find(self, key: VT) -> Optional[VT]:180 """181 :param key: Search key.182 :return: Value associated with given key or None if given key is not present.183 >>> skip_list = SkipList()184 >>> skip_list.find(2)185 >>> skip_list.insert(2, "Two")186 >>> skip_list.find(2)187 'Two'188 >>> skip_list.insert(2, "Three")189 >>> skip_list.find(2)190 'Three'191 """192 node, _ = self._locate_node(key)193 if node is not None:194 return node.value195 return None196def test_insert():197 skip_list = SkipList()198 skip_list.insert("Key1", 3)199 skip_list.insert("Key2", 12)200 skip_list.insert("Key3", 41)201 skip_list.insert("Key4", -19)202 node = skip_list.head203 all_values = {}204 while node.level != 0:205 node = node.forward[0]206 all_values[node.key] = node.value207 assert len(all_values) == 4208 assert all_values["Key1"] == 3209 assert all_values["Key2"] == 12210 assert all_values["Key3"] == 41211 assert all_values["Key4"] == -19212def test_insert_overrides_existing_value():213 skip_list = SkipList()214 skip_list.insert("Key1", 10)215 skip_list.insert("Key1", 12)216 skip_list.insert("Key5", 7)217 skip_list.insert("Key7", 10)218 skip_list.insert("Key10", 5)219 skip_list.insert("Key7", 7)220 skip_list.insert("Key5", 5)221 skip_list.insert("Key10", 10)222 node = skip_list.head223 all_values = {}224 while node.level != 0:225 node = node.forward[0]226 all_values[node.key] = node.value227 if len(all_values) != 4:228 print()229 assert len(all_values) == 4230 assert all_values["Key1"] == 12231 assert all_values["Key7"] == 7232 assert all_values["Key5"] == 5233 assert all_values["Key10"] == 10234def test_searching_empty_list_returns_none():235 skip_list = SkipList()236 assert skip_list.find("Some key") is None237def test_search():238 skip_list = SkipList()239 skip_list.insert("Key2", 20)240 assert skip_list.find("Key2") == 20241 skip_list.insert("Some Key", 10)242 skip_list.insert("Key2", 8)243 skip_list.insert("V", 13)244 assert skip_list.find("Y") is None245 assert skip_list.find("Key2") == 8246 assert skip_list.find("Some Key") == 10247 assert skip_list.find("V") == 13248def test_deleting_item_from_empty_list_do_nothing():249 skip_list = SkipList()250 skip_list.delete("Some key")251 assert len(skip_list.head.forward) == 0252def test_deleted_items_are_not_founded_by_find_method():253 skip_list = SkipList()254 skip_list.insert("Key1", 12)255 skip_list.insert("V", 13)256 skip_list.insert("X", 14)257 skip_list.insert("Key2", 15)258 skip_list.delete("V")259 skip_list.delete("Key2")260 assert skip_list.find("V") is None261 assert skip_list.find("Key2") is None262def test_delete_removes_only_given_key():263 skip_list = SkipList()264 skip_list.insert("Key1", 12)265 skip_list.insert("V", 13)266 skip_list.insert("X", 14)267 skip_list.insert("Key2", 15)268 skip_list.delete("V")269 assert skip_list.find("V") is None270 assert skip_list.find("X") == 14271 assert skip_list.find("Key1") == 12272 assert skip_list.find("Key2") == 15273 skip_list.delete("X")274 assert skip_list.find("V") is None275 assert skip_list.find("X") is None276 assert skip_list.find("Key1") == 12277 assert skip_list.find("Key2") == 15278 skip_list.delete("Key1")279 assert skip_list.find("V") is None280 assert skip_list.find("X") is None281 assert skip_list.find("Key1") is None282 assert skip_list.find("Key2") == 15283 skip_list.delete("Key2")284 assert skip_list.find("V") is None285 assert skip_list.find("X") is None286 assert skip_list.find("Key1") is None287 assert skip_list.find("Key2") is None288def test_delete_doesnt_leave_dead_nodes():289 skip_list = SkipList()290 skip_list.insert("Key1", 12)291 skip_list.insert("V", 13)292 skip_list.insert("X", 142)293 skip_list.insert("Key2", 15)294 skip_list.delete("X")295 def traverse_keys(node):296 yield node.key297 for forward_node in node.forward:298 yield from traverse_keys(forward_node)299 assert len(set(traverse_keys(skip_list.head))) == 4300def test_iter_always_yields_sorted_values():301 def is_sorted(lst):302 for item, next_item in zip(lst, lst[1:]):303 if next_item < item:304 return False305 return True306 skip_list = SkipList()307 for i in range(10):308 skip_list.insert(i, i)309 assert is_sorted(list(skip_list))310 skip_list.delete(5)311 skip_list.delete(8)312 skip_list.delete(2)313 assert is_sorted(list(skip_list))314 skip_list.insert(-12, -12)315 skip_list.insert(77, 77)316 assert is_sorted(list(skip_list))317def pytests():318 for i in range(100):319 # Repeat test 100 times due to the probabilistic nature of skip list320 # random values == random bugs321 test_insert()322 test_insert_overrides_existing_value()323 test_searching_empty_list_returns_none()324 test_search()325 test_deleting_item_from_empty_list_do_nothing()326 test_deleted_items_are_not_founded_by_find_method()327 test_delete_removes_only_given_key()328 test_delete_doesnt_leave_dead_nodes()329 test_iter_always_yields_sorted_values()330def main():331 """332 >>> pytests()333 """334 skip_list = SkipList()335 skip_list.insert(2, "2")336 skip_list.insert(4, "4")337 skip_list.insert(6, "4")338 skip_list.insert(4, "5")339 skip_list.insert(8, "4")340 skip_list.insert(9, "4")341 skip_list.delete(4)342 print(skip_list)343if __name__ == "__main__":...
test_skipping.py
Source:test_skipping.py
...28 (unittest.skipIf, True, False))29 for deco, do_skip, dont_skip in op_table:30 class Foo(unittest.TestCase):31 @deco(do_skip, "testing")32 def test_skip(self): pass33 @deco(dont_skip, "testing")34 def test_dont_skip(self): pass35 test_do_skip = Foo("test_skip")36 test_dont_skip = Foo("test_dont_skip")37 suite = unittest.TestSuite([test_do_skip, test_dont_skip])38 events = []39 result = LoggingResult(events)40 suite.run(result)41 self.assertEqual(len(result.skipped), 1)42 expected = ['startTest', 'addSkip', 'stopTest',43 'startTest', 'addSuccess', 'stopTest']44 self.assertEqual(events, expected)45 self.assertEqual(result.testsRun, 2)46 self.assertEqual(result.skipped, [(test_do_skip, "testing")])47 self.assertTrue(result.wasSuccessful())48 def test_skip_class(self):49 @unittest.skip("testing")50 class Foo(unittest.TestCase):51 def test_1(self):52 record.append(1)53 record = []54 result = unittest.TestResult()55 test = Foo("test_1")56 suite = unittest.TestSuite([test])57 suite.run(result)58 self.assertEqual(result.skipped, [(test, "testing")])59 self.assertEqual(record, [])60 def test_skip_non_unittest_class(self):61 @unittest.skip("testing")62 class Mixin:63 def test_1(self):64 record.append(1)65 class Foo(Mixin, unittest.TestCase):66 pass67 record = []68 result = unittest.TestResult()69 test = Foo("test_1")70 suite = unittest.TestSuite([test])71 suite.run(result)72 self.assertEqual(result.skipped, [(test, "testing")])73 self.assertEqual(record, [])74 def test_expected_failure(self):75 class Foo(unittest.TestCase):76 @unittest.expectedFailure77 def test_die(self):78 self.fail("help me!")79 events = []80 result = LoggingResult(events)81 test = Foo("test_die")82 test.run(result)83 self.assertEqual(events,84 ['startTest', 'addExpectedFailure', 'stopTest'])85 self.assertEqual(result.expectedFailures[0][0], test)86 self.assertTrue(result.wasSuccessful())87 def test_unexpected_success(self):88 class Foo(unittest.TestCase):89 @unittest.expectedFailure90 def test_die(self):91 pass92 events = []93 result = LoggingResult(events)94 test = Foo("test_die")95 test.run(result)96 self.assertEqual(events,97 ['startTest', 'addUnexpectedSuccess', 'stopTest'])98 self.assertFalse(result.failures)99 self.assertEqual(result.unexpectedSuccesses, [test])100 self.assertTrue(result.wasSuccessful())101 def test_skip_doesnt_run_setup(self):102 class Foo(unittest.TestCase):103 wasSetUp = False104 wasTornDown = False105 def setUp(self):106 Foo.wasSetUp = True107 def tornDown(self):108 Foo.wasTornDown = True109 @unittest.skip('testing')110 def test_1(self):111 pass112 result = unittest.TestResult()113 test = Foo("test_1")114 suite = unittest.TestSuite([test])115 suite.run(result)116 self.assertEqual(result.skipped, [(test, "testing")])117 self.assertFalse(Foo.wasSetUp)118 self.assertFalse(Foo.wasTornDown)119 def test_decorated_skip(self):120 def decorator(func):121 def inner(*a):122 return func(*a)123 return inner124 class Foo(unittest.TestCase):125 @decorator126 @unittest.skip('testing')127 def test_1(self):128 pass129 result = unittest.TestResult()130 test = Foo("test_1")131 suite = unittest.TestSuite([test])132 suite.run(result)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!