Best Python code snippet using tox_python
watcher.py
Source:watcher.py
1# Copyright (c) 2014 Yubico AB2# All rights reserved.3#4# This program is free software: you can redistribute it and/or modify5# it under the terms of the GNU General Public License as published by6# the Free Software Foundation, either version 3 of the License, or7# (at your option) any later version.8#9# This program is distributed in the hope that it will be useful,10# but WITHOUT ANY WARRANTY; without even the implied warranty of11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the12# GNU General Public License for more details.13#14# You should have received a copy of the GNU General Public License15# along with this program. If not, see <http://www.gnu.org/licenses/>.16#17# Additional permission under GNU GPL version 3 section 718#19# If you modify this program, or any covered work, by linking or20# combining it with the OpenSSL project's OpenSSL library (or a21# modified version of that library), containing parts covered by the22# terms of the OpenSSL or SSLeay licenses, We grant you additional23# permission to convey the resulting work. Corresponding Source for a24# non-source form of such a combination shall include the source code25# for the parts of OpenSSL used as well as that of the covered work.26from __future__ import print_function27from PySide import QtGui, QtCore28from pivman.controller import Controller29from pivman.piv import YkPiv, PivError, DeviceGoneError30from pivman.storage import settings, SETTINGS31from functools import partial32try:33 from Queue import Queue34except ImportError:35 from queue import Queue36class Release(object):37 def __init__(self, fn):38 self._fn = fn39 def __del__(self):40 self._fn()41 def __call__(self):42 self._fn()43 self._fn = lambda: None44class ControllerWatcher(QtCore.QObject):45 _device_found = QtCore.Signal()46 _device_lost = QtCore.Signal()47 def __init__(self):48 super(ControllerWatcher, self).__init__()49 self._waiters = Queue()50 self._controller = None51 self._lock = QtCore.QMutex()52 self._lock.lock()53 self._worker = QtCore.QCoreApplication.instance().worker54 self._worker.post_bg(self._poll, self._release, True)55 self.startTimer(2000)56 def timerEvent(self, event):57 if QtGui.QApplication.activeWindow() and self._lock.tryLock():58 self._worker.post_bg(self._poll, self._release, True)59 event.accept()60 def _release(self, result=None):61 if self._controller and not self._waiters.empty():62 waiter = self._waiters.get_nowait()63 waiter(self._controller, Release(self._release))64 else:65 self._lock.unlock()66 def _poll(self):67 reader = settings[SETTINGS.CARD_READER]68 if self._controller:69 if self._controller.poll():70 return71 self._controller = None72 self._device_lost.emit()73 try:74 self._controller = Controller(YkPiv(reader=reader))75 self._device_found.emit()76 except (PivError, DeviceGoneError) as e:77 print(e)78 def on_found(self, fn, hold_lock=False):79 self._device_found.connect(self.wrap(fn, hold_lock))80 def on_lost(self, fn):81 self._device_lost.connect(fn)82 def use(self, fn, hold_lock=False):83 if not hold_lock:84 def waiter(controller, release):85 fn(controller)86 else:87 waiter = fn88 if self._controller and self._lock.tryLock():89 waiter(self._controller, Release(self._release))90 else:91 self._waiters.put(waiter)92 def wrap(self, fn, hold_lock=False):...
test_locks.py
Source:test_locks.py
...46 assert time == 2047 @via_usim48 async def test_available(self):49 lock = Lock()50 async def hold_lock():51 async with lock:52 await (time + 10)53 assert lock.available54 async with lock:55 assert lock.available56 async with Scope() as scope:57 scope.do(hold_lock())58 await (time + 5)59 assert not lock.available60 assert lock.available61 @via_usim62 async def test_release_exception(self):63 lock = Lock()64 with pytest.raises(KeyError):65 async with lock:66 raise KeyError67 with pytest.raises(KeyError):68 async with lock:69 raise KeyError70 async with lock: # lock must remain acquirable after exception71 assert True72 @via_usim73 async def test_contested_cancel(self):74 lock = Lock()75 async def hold_lock(duration=10):76 async with lock:77 await (time + duration)78 async with Scope() as scope:79 scope.do(hold_lock())80 middle = scope.do(hold_lock())81 scope.do(hold_lock())82 await (time + 5)83 middle.cancel()84 assert time == 2085 @via_usim86 async def test_designated_cancel(self):87 lock = Lock()88 markers = []89 async def hold_lock(mark, duration=10):90 async with lock:91 await (time + duration)92 markers.append(mark)93 async with Scope() as scope:94 # acquire the lock so children have to queue95 async with lock:96 # target is scheduled to get the lock once we release it...97 target = scope.do(hold_lock(0))98 # ..and then release it for its kin...99 scope.do(hold_lock(1))100 await instant101 # ..but we schedule target to cancel first102 target.cancel()103 # target (mark 0) did not insert itself104 # peer (mark 1) did insert itself after acquiring the lock...
blinky2.py
Source:blinky2.py
1import threading2from time import sleep3import RPi.GPIO as GPIO4LED_RED = 115LED_BLUE = 156BTN_FREQ = 317BTN_SEL = 338SLOW = 1029FAST = 10110btnSelector = FAST11btnFrequencyPressed = False12fast_hold = False13slow_hold = False14time_elapsed = 015lock = threading.Lock()16hold_lock = threading.Lock()17def blink_timer():18 global time_elapsed19 global lock20 21 while True:22 #print time_elapsed23 sleep(1)24 lock.acquire()25 time_elapsed += 126 lock.release()27def btnSelector_callback(pin):28 global btnSelector29 print ("Selector button pressed.")30 if btnSelector == FAST:31 btnSelector = SLOW32 elif btnSelector == SLOW:33 btnSelector = FAST34def btnFrequency_callback(pin):35 global btnSelector36 37 print ("Frequency button pressed.")38 global fast_hold39 global slow_hold40 global hold_lock41 hold_lock.acquire()42 # hold red LED43 if btnSelector == FAST:44 if fast_hold == False:45 fast_hold = True46 else:47 fast_hold = False48 # hold blue LED 49 elif btnSelector == SLOW:50 if slow_hold == False:51 slow_hold = True52 else:53 slow_hold = False54 hold_lock.release()55 56def blink_led (pin):57 global lock58 global time_elapsed59 global fast_hold60 global slow_hold61 global btnSelector62 63 if pin == LED_RED:64 blink_freq = 1.065 elif pin == LED_BLUE:66 blink_freq = 0.567 68 while True:69 if time_elapsed % 3 == 0:70 hold_lock.acquire()71 # reduce time between blinks for red LED72 if pin == LED_RED and fast_hold == False:73 blink_freq -= 0.274 if blink_freq < 0.0:75 blink_freq = 1.076 # increase time between blinks for blue LED77 if pin == LED_BLUE and slow_hold == False:78 blink_freq += 0.279 if blink_freq > 1.3:80 blink_freq = 0.581 hold_lock.release()82 83 GPIO.output(pin, GPIO.HIGH)84 sleep(blink_freq)85 GPIO.output(pin, GPIO.LOW)86 sleep(blink_freq)87try: 88 GPIO.setmode(GPIO.BOARD)89 GPIO.setup(LED_RED, GPIO.OUT, initial=GPIO.LOW)90 GPIO.setup(LED_BLUE, GPIO.OUT, initial=GPIO.LOW)91 GPIO.setup(BTN_SEL, GPIO.IN, pull_up_down=GPIO.PUD_UP)92 GPIO.setup(BTN_FREQ, GPIO.IN, pull_up_down=GPIO.PUD_UP)93 GPIO.add_event_detect(BTN_SEL, GPIO.RISING, callback=btnSelector_callback)94 GPIO.add_event_detect(BTN_FREQ, GPIO.RISING, callback=btnFrequency_callback)95 thread1 = threading.Thread(target=blink_led, args=(LED_RED,))96 thread2 = threading.Thread(target=blink_led, args=(LED_BLUE,))97 thread_timer = threading.Thread(target=blink_timer)98 thread1.start()99 thread2.start()100 thread_timer.start()101 thread1.join()102 thread2.join()103 thread_timer.join()104 105except KeyboardInterrupt:106 GPIO.cleanup()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!