Best Python code snippet using yandex-tank
lock_tests.py
Source:lock_tests.py
...58 lock = self.locktype()59 del lock60 def test_acquire_destroy(self):61 lock = self.locktype()62 lock.acquire()63 del lock64 def test_acquire_release(self):65 lock = self.locktype()66 lock.acquire()67 lock.release()68 del lock69 def test_try_acquire(self):70 lock = self.locktype()71 self.assertTrue(lock.acquire(False))72 lock.release()73 def test_try_acquire_contended(self):74 lock = self.locktype()75 lock.acquire()76 result = []77 def f():78 result.append(lock.acquire(False))79 Bunch(f, 1).wait_for_finished()80 self.assertFalse(result[0])81 lock.release()82 def test_acquire_contended(self):83 lock = self.locktype()84 lock.acquire()85 N = 586 def f():87 lock.acquire()88 lock.release()89 b = Bunch(f, N)90 b.wait_for_started()91 _wait()92 self.assertEqual(len(b.finished), 0)93 lock.release()94 b.wait_for_finished()95 self.assertEqual(len(b.finished), N)96 def test_with(self):97 lock = self.locktype()98 def f():99 lock.acquire()100 lock.release()101 def _with(err=None):102 with lock:103 if err is not None:104 raise err105 _with()106 # Check the lock is unacquired107 Bunch(f, 1).wait_for_finished()108 self.assertRaises(TypeError, _with, TypeError)109 # Check the lock is unacquired110 Bunch(f, 1).wait_for_finished()111 def test_thread_leak(self):112 # The lock shouldn't leak a Thread instance when used from a foreign113 # (non-threading) thread.114 lock = self.locktype()115 def f():116 lock.acquire()117 lock.release()118 n = len(threading.enumerate())119 # We run many threads in the hope that existing threads ids won't120 # be recycled.121 Bunch(f, 15).wait_for_finished()122 self.assertEqual(n, len(threading.enumerate()))123class LockTests(BaseLockTests):124 """125 Tests for non-recursive, weak locks126 (which can be acquired and released from different threads).127 """128 def test_reacquire(self):129 # Lock needs to be released before re-acquiring.130 lock = self.locktype()131 phase = []132 def f():133 lock.acquire()134 phase.append(None)135 lock.acquire()136 phase.append(None)137 start_new_thread(f, ())138 while len(phase) == 0:139 _wait()140 _wait()141 self.assertEqual(len(phase), 1)142 lock.release()143 while len(phase) == 1:144 _wait()145 self.assertEqual(len(phase), 2)146 def test_different_thread(self):147 # Lock can be released from a different thread.148 lock = self.locktype()149 lock.acquire()150 def f():151 lock.release()152 b = Bunch(f, 1)153 b.wait_for_finished()154 lock.acquire()155 lock.release()156class RLockTests(BaseLockTests):157 """158 Tests for recursive locks.159 """160 def test_reacquire(self):161 lock = self.locktype()162 lock.acquire()163 lock.acquire()164 lock.release()165 lock.acquire()166 lock.release()167 lock.release()168 def test_release_unacquired(self):169 # Cannot release an unacquired lock170 lock = self.locktype()171 self.assertRaises(RuntimeError, lock.release)172 lock.acquire()173 lock.acquire()174 lock.release()175 lock.acquire()176 lock.release()177 lock.release()178 self.assertRaises(RuntimeError, lock.release)179 def test_different_thread(self):180 # Cannot release from a different thread181 lock = self.locktype()182 def f():183 lock.acquire()184 b = Bunch(f, 1, True)185 try:186 self.assertRaises(RuntimeError, lock.release)187 finally:188 b.do_finish()189 def test__is_owned(self):190 lock = self.locktype()191 self.assertFalse(lock._is_owned())192 lock.acquire()193 self.assertTrue(lock._is_owned())194 lock.acquire()195 self.assertTrue(lock._is_owned())196 result = []197 def f():198 result.append(lock._is_owned())199 Bunch(f, 1).wait_for_finished()200 self.assertFalse(result[0])201 lock.release()202 self.assertTrue(lock._is_owned())203 lock.release()204 self.assertFalse(lock._is_owned())205class EventTests(BaseTestCase):206 """207 Tests for Event objects.208 """209 def test_is_set(self):210 evt = self.eventtype()211 self.assertFalse(evt.is_set())212 evt.set()213 self.assertTrue(evt.is_set())214 evt.set()215 self.assertTrue(evt.is_set())216 evt.clear()217 self.assertFalse(evt.is_set())218 evt.clear()219 self.assertFalse(evt.is_set())220 def _check_notify(self, evt):221 # All threads get notified222 N = 5223 results1 = []224 results2 = []225 def f():226 results1.append(evt.wait())227 results2.append(evt.wait())228 b = Bunch(f, N)229 b.wait_for_started()230 _wait()231 self.assertEqual(len(results1), 0)232 evt.set()233 b.wait_for_finished()234 self.assertEqual(results1, [True] * N)235 self.assertEqual(results2, [True] * N)236 def test_notify(self):237 evt = self.eventtype()238 self._check_notify(evt)239 # Another time, after an explicit clear()240 evt.set()241 evt.clear()242 self._check_notify(evt)243 def test_timeout(self):244 evt = self.eventtype()245 results1 = []246 results2 = []247 N = 5248 def f():249 results1.append(evt.wait(0.0))250 t1 = time.time()251 r = evt.wait(0.2)252 t2 = time.time()253 results2.append((r, t2 - t1))254 Bunch(f, N).wait_for_finished()255 self.assertEqual(results1, [False] * N)256 for r, dt in results2:257 self.assertFalse(r)258 self.assertTrue(dt >= 0.2, dt)259 # The event is set260 results1 = []261 results2 = []262 evt.set()263 Bunch(f, N).wait_for_finished()264 self.assertEqual(results1, [True] * N)265 for r, dt in results2:266 self.assertTrue(r)267class ConditionTests(BaseTestCase):268 """269 Tests for condition variables.270 """271 def test_acquire(self):272 cond = self.condtype()273 # Be default we have an RLock: the condition can be acquired multiple274 # times.275 cond.acquire()276 cond.acquire()277 cond.release()278 cond.release()279 lock = threading.Lock()280 cond = self.condtype(lock)281 cond.acquire()282 self.assertFalse(lock.acquire(False))283 cond.release()284 self.assertTrue(lock.acquire(False))285 self.assertFalse(cond.acquire(False))286 lock.release()287 with cond:288 self.assertFalse(lock.acquire(False))289 def test_unacquired_wait(self):290 cond = self.condtype()291 self.assertRaises(RuntimeError, cond.wait)292 def test_unacquired_notify(self):293 cond = self.condtype()294 self.assertRaises(RuntimeError, cond.notify)295 def _check_notify(self, cond):296 N = 5297 results1 = []298 results2 = []299 phase_num = 0300 def f():301 cond.acquire()302 cond.wait()303 cond.release()304 results1.append(phase_num)305 cond.acquire()306 cond.wait()307 cond.release()308 results2.append(phase_num)309 b = Bunch(f, N)310 b.wait_for_started()311 _wait()312 self.assertEqual(results1, [])313 # Notify 3 threads at first314 cond.acquire()315 cond.notify(3)316 _wait()317 phase_num = 1318 cond.release()319 while len(results1) < 3:320 _wait()321 self.assertEqual(results1, [1] * 3)322 self.assertEqual(results2, [])323 # Notify 5 threads: they might be in their first or second wait324 cond.acquire()325 cond.notify(5)326 _wait()327 phase_num = 2328 cond.release()329 while len(results1) + len(results2) < 8:330 _wait()331 self.assertEqual(results1, [1] * 3 + [2] * 2)332 self.assertEqual(results2, [2] * 3)333 # Notify all threads: they are all in their second wait334 cond.acquire()335 cond.notify_all()336 _wait()337 phase_num = 3338 cond.release()339 while len(results2) < 5:340 _wait()341 self.assertEqual(results1, [1] * 3 + [2] * 2)342 self.assertEqual(results2, [2] * 3 + [3] * 2)343 b.wait_for_finished()344 def test_notify(self):345 cond = self.condtype()346 self._check_notify(cond)347 # A second time, to check internal state is still ok.348 self._check_notify(cond)349 def test_timeout(self):350 cond = self.condtype()351 results = []352 N = 5353 def f():354 cond.acquire()355 t1 = time.time()356 cond.wait(0.2)357 t2 = time.time()358 cond.release()359 results.append(t2 - t1)360 Bunch(f, N).wait_for_finished()361 self.assertEqual(len(results), 5)362 for dt in results:363 self.assertTrue(dt >= 0.2, dt)364class BaseSemaphoreTests(BaseTestCase):365 """366 Common tests for {bounded, unbounded} semaphore objects.367 """368 def test_constructor(self):369 self.assertRaises(ValueError, self.semtype, value = -1)370 self.assertRaises(ValueError, self.semtype, value = -sys.maxint)371 def test_acquire(self):372 sem = self.semtype(1)373 sem.acquire()374 sem.release()375 sem = self.semtype(2)376 sem.acquire()377 sem.acquire()378 sem.release()379 sem.release()380 def test_acquire_destroy(self):381 sem = self.semtype()382 sem.acquire()383 del sem384 def test_acquire_contended(self):385 sem = self.semtype(7)386 sem.acquire()387 N = 10388 results1 = []389 results2 = []390 phase_num = 0391 def f():392 sem.acquire()393 results1.append(phase_num)394 sem.acquire()395 results2.append(phase_num)396 b = Bunch(f, 10)397 b.wait_for_started()398 while len(results1) + len(results2) < 6:399 _wait()400 self.assertEqual(results1 + results2, [0] * 6)401 phase_num = 1402 for i in range(7):403 sem.release()404 while len(results1) + len(results2) < 13:405 _wait()406 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)407 phase_num = 2408 for i in range(6):409 sem.release()410 while len(results1) + len(results2) < 19:411 _wait()412 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)413 # The semaphore is still locked414 self.assertFalse(sem.acquire(False))415 # Final release, to let the last thread finish416 sem.release()417 b.wait_for_finished()418 def test_try_acquire(self):419 sem = self.semtype(2)420 self.assertTrue(sem.acquire(False))421 self.assertTrue(sem.acquire(False))422 self.assertFalse(sem.acquire(False))423 sem.release()424 self.assertTrue(sem.acquire(False))425 def test_try_acquire_contended(self):426 sem = self.semtype(4)427 sem.acquire()428 results = []429 def f():430 results.append(sem.acquire(False))431 results.append(sem.acquire(False))432 Bunch(f, 5).wait_for_finished()433 # There can be a thread switch between acquiring the semaphore and434 # appending the result, therefore results will not necessarily be435 # ordered.436 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )437 def test_default_value(self):438 # The default initial value is 1.439 sem = self.semtype()440 sem.acquire()441 def f():442 sem.acquire()443 sem.release()444 b = Bunch(f, 1)445 b.wait_for_started()446 _wait()447 self.assertFalse(b.finished)448 sem.release()449 b.wait_for_finished()450 def test_with(self):451 sem = self.semtype(2)452 def _with(err=None):453 with sem:454 self.assertTrue(sem.acquire(False))455 sem.release()456 with sem:457 self.assertFalse(sem.acquire(False))458 if err:459 raise err460 _with()461 self.assertTrue(sem.acquire(False))462 sem.release()463 self.assertRaises(TypeError, _with, TypeError)464 self.assertTrue(sem.acquire(False))465 sem.release()466class SemaphoreTests(BaseSemaphoreTests):467 """468 Tests for unbounded semaphores.469 """470 def test_release_unacquired(self):471 # Unbounded releases are allowed and increment the semaphore's value472 sem = self.semtype(1)473 sem.release()474 sem.acquire()475 sem.acquire()476 sem.release()477class BoundedSemaphoreTests(BaseSemaphoreTests):478 """479 Tests for bounded semaphores.480 """481 def test_release_unacquired(self):482 # Cannot go past the initial value483 sem = self.semtype()484 self.assertRaises(ValueError, sem.release)485 sem.acquire()486 sem.release()...
lockable_directory.py
Source:lockable_directory.py
...20 self.temp_dir = tempfile.TemporaryDirectory(prefix="litani-test")21 self.temp_dir_p = pathlib.Path(self.temp_dir.name)22 def tearDown(self):23 self.temp_dir.cleanup()24 def test_cannot_initially_acquire(self):25 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)26 self.assertFalse(lock_dir.acquire())27 def test_multiproc_cannot_initially_acquire(self):28 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)29 self.assertFalse(lock_dir.acquire())30 lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)31 self.assertFalse(lock_dir.acquire())32 def test_can_acquire_after_release(self):33 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)34 lock_dir.release()35 self.assertTrue(lock_dir.acquire())36 lock_dir.release()37 self.assertTrue(lock_dir.acquire())38 def test_multiproc_can_acquire_after_release(self):39 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)40 lock_dir.release()41 lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)42 self.assertTrue(lock_dir_2.acquire())43 def test_no_double_acquire(self):44 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)45 lock_dir.release()46 self.assertTrue(lock_dir.acquire())47 self.assertFalse(lock_dir.acquire())48 def test_multiproc_no_double_acquire(self):49 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)50 lock_dir.release()51 lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)52 self.assertTrue(lock_dir.acquire())53 self.assertFalse(lock_dir_2.acquire())54 def test_repeat_no_double_acquire(self):55 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)56 lock_dir.release()57 self.assertTrue(lock_dir.acquire())58 self.assertFalse(lock_dir.acquire())59 lock_dir.release()60 self.assertTrue(lock_dir.acquire())61 self.assertFalse(lock_dir.acquire())62 def test_try_acquire(self):63 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)64 lock_dir.release()65 with lock_dir.try_acquire():66 pass # No exception67 def test_no_double_try_acquire(self):68 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)69 lock_dir.release()70 with lock_dir.try_acquire():71 with self.assertRaises(lib.litani.AcquisitionFailed):72 with lock_dir.try_acquire():73 pass74 def test_multiproc_no_double_try_acquire(self):75 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)76 lock_dir.release()77 lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)78 with lock_dir.try_acquire():79 with self.assertRaises(lib.litani.AcquisitionFailed):80 with lock_dir_2.try_acquire():81 pass82 def test_no_try_acquire_acquire(self):83 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)84 lock_dir.release()85 with lock_dir.try_acquire():86 self.assertFalse(lock_dir.acquire())87 def test_multiproc_no_try_acquire_acquire(self):88 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)89 lock_dir.release()90 lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)91 with lock_dir.try_acquire():92 self.assertFalse(lock_dir_2.acquire())93 def test_no_acquire_try_acquire(self):94 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)95 with self.assertRaises(lib.litani.AcquisitionFailed):96 with lock_dir.try_acquire():97 pass98 def test_multiproc_no_acquire_try_acquire(self):99 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)100 lock_dir_2 = lib.litani.LockableDirectory(self.temp_dir_p)101 with self.assertRaises(lib.litani.AcquisitionFailed):102 with lock_dir_2.try_acquire():103 pass104 def test_context_manager_releases(self):105 lock_dir = lib.litani.LockableDirectory(self.temp_dir_p)106 lock_dir.release()107 with lock_dir.try_acquire():108 pass...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!