Best Python code snippet using autotest_python
lockfile.py
Source:lockfile.py
1"""2lockfile.py - Platform-independent advisory file locks.3Requires Python 2.5 unless you apply 2.4.diff4Locking is done on a per-thread basis instead of a per-process basis.5Usage:6>>> lock = FileLock('somefile')7>>> try:8... lock.acquire()9... except AlreadyLocked:10... print 'somefile', 'is locked already.'11... except LockFailed:12... print 'somefile', 'can\\'t be locked.'13... else:14... print 'got lock'15got lock16>>> print lock.is_locked()17True18>>> lock.release()19>>> lock = FileLock('somefile')20>>> print lock.is_locked()21False22>>> with lock:23... print lock.is_locked()24True25>>> print lock.is_locked()26False27>>> # It is okay to lock twice from the same thread...28>>> with lock:29... lock.acquire()30...31>>> # Though no counter is kept, so you can't unlock multiple times...32>>> print lock.is_locked()33False34Exceptions:35 Error - base class for other exceptions36 LockError - base class for all locking exceptions37 AlreadyLocked - Another thread or process already holds the lock38 LockFailed - Lock failed for some other reason39 UnlockError - base class for all unlocking exceptions40 AlreadyUnlocked - File was not locked.41 NotMyLock - File was locked but not by the current thread/process42"""43from __future__ import division44import sys45import socket46import os47import threading48import time49import errno50import urllib51# Work with PEP8 and non-PEP8 versions of threading module.52if not hasattr(threading, "current_thread"):53 threading.current_thread = threading.currentThread54if not hasattr(threading.Thread, "get_name"):55 threading.Thread.get_name = threading.Thread.getName56__all__ = ['Error', 'LockError', 'LockTimeout', 'AlreadyLocked',57 'LockFailed', 'UnlockError', 'NotLocked', 'NotMyLock',58 'LinkFileLock', 'MkdirFileLock', 'SQLiteFileLock']59class Error(Exception):60 """61 Base class for other exceptions.62 >>> try:63 ... raise Error64 ... except Exception:65 ... pass66 """67 pass68class LockError(Error):69 """70 Base class for error arising from attempts to acquire the lock.71 >>> try:72 ... raise LockError73 ... except Error:74 ... pass75 """76 pass77class LockTimeout(LockError):78 """Raised when lock creation fails within a user-defined period of time.79 >>> try:80 ... raise LockTimeout81 ... except LockError:82 ... pass83 """84 pass85class AlreadyLocked(LockError):86 """Some other thread/process is locking the file.87 >>> try:88 ... raise AlreadyLocked89 ... except LockError:90 ... pass91 """92 pass93class LockFailed(LockError):94 """Lock file creation failed for some other reason.95 >>> try:96 ... raise LockFailed97 ... except LockError:98 ... pass99 """100 pass101class UnlockError(Error):102 """103 Base class for errors arising from attempts to release the lock.104 >>> try:105 ... raise UnlockError106 ... except Error:107 ... pass108 """109 pass110class NotLocked(UnlockError):111 """Raised when an attempt is made to unlock an unlocked file.112 >>> try:113 ... raise NotLocked114 ... except UnlockError:115 ... pass116 """117 pass118class NotMyLock(UnlockError):119 """Raised when an attempt is made to unlock a file someone else locked.120 >>> try:121 ... raise NotMyLock122 ... except UnlockError:123 ... pass124 """125 pass126class LockBase:127 """Base class for platform-specific lock classes."""128 def __init__(self, path, threaded=True):129 """130 >>> lock = LockBase('somefile')131 >>> lock = LockBase('somefile', threaded=False)132 """133 self.path = path134 self.lock_file = os.path.abspath(path) + ".lock"135 self.hostname = socket.gethostname()136 self.pid = os.getpid()137 if threaded:138 self.tname = "%x-" % (threading.current_thread().ident &139 0xffffffff)140 else:141 self.tname = ""142 dirname = os.path.dirname(self.lock_file)143 self.unique_name = os.path.join(dirname,144 "%s.%s%s" % (self.hostname,145 self.tname,146 self.pid))147 def acquire(self, timeout=None):148 """149 Acquire the lock.150 * If timeout is omitted (or None), wait forever trying to lock the151 file.152 * If timeout > 0, try to acquire the lock for that many seconds. If153 the lock period expires and the file is still locked, raise154 LockTimeout.155 * If timeout <= 0, raise AlreadyLocked immediately if the file is156 already locked.157 """158 raise NotImplemented("implement in subclass")159 def release(self):160 """161 Release the lock.162 If the file is not locked, raise NotLocked.163 """164 raise NotImplemented("implement in subclass")165 def is_locked(self):166 """167 Tell whether or not the file is locked.168 """169 raise NotImplemented("implement in subclass")170 def i_am_locking(self):171 """172 Return True if this object is locking the file.173 """174 raise NotImplemented("implement in subclass")175 def break_lock(self):176 """177 Remove a lock. Useful if a locking thread failed to unlock.178 """179 raise NotImplemented("implement in subclass")180 def __enter__(self):181 """182 Context manager support.183 """184 self.acquire()185 return self186 def __exit__(self, *_exc):187 """188 Context manager support.189 """190 self.release()191class LinkFileLock(LockBase):192 """Lock access to a file using atomic property of link(2)."""193 def acquire(self, timeout=None):194 try:195 open(self.unique_name, "wb").close()196 except IOError:197 raise LockFailed("failed to create %s" % self.unique_name)198 end_time = time.time()199 if timeout is not None and timeout > 0:200 end_time += timeout201 while True:202 # Try and create a hard link to it.203 try:204 os.link(self.unique_name, self.lock_file)205 except OSError:206 # Link creation failed. Maybe we've double-locked?207 nlinks = os.stat(self.unique_name).st_nlink208 if nlinks == 2:209 # The original link plus the one I created == 2. We're210 # good to go.211 return212 else:213 # Otherwise the lock creation failed.214 if timeout is not None and time.time() > end_time:215 os.unlink(self.unique_name)216 if timeout > 0:217 raise LockTimeout218 else:219 raise AlreadyLocked220 time.sleep(timeout is not None and timeout/10 or 0.1)221 else:222 # Link creation succeeded. We're good to go.223 return224 def release(self):225 if not self.is_locked():226 raise NotLocked227 elif not os.path.exists(self.unique_name):228 raise NotMyLock229 os.unlink(self.unique_name)230 os.unlink(self.lock_file)231 def is_locked(self):232 return os.path.exists(self.lock_file)233 def i_am_locking(self):234 return (self.is_locked() and235 os.path.exists(self.unique_name) and236 os.stat(self.unique_name).st_nlink == 2)237 def break_lock(self):238 if os.path.exists(self.lock_file):239 os.unlink(self.lock_file)240class MkdirFileLock(LockBase):241 """Lock file by creating a directory."""242 def __init__(self, path, threaded=True):243 """244 >>> lock = MkdirFileLock('somefile')245 >>> lock = MkdirFileLock('somefile', threaded=False)246 """247 LockBase.__init__(self, path, threaded)248 # Lock file itself is a directory. Place the unique file name into249 # it.250 self.unique_name = os.path.join(self.lock_file,251 "%s.%s%s" % (self.hostname,252 self.tname,253 self.pid))254 def acquire(self, timeout=None):255 end_time = time.time()256 if timeout is not None and timeout > 0:257 end_time += timeout258 if timeout is None:259 wait = 0.1260 else:261 wait = max(0, timeout / 10)262 while True:263 try:264 os.mkdir(self.lock_file)265 except OSError:266 err = sys.exc_info()[1]267 if err.errno == errno.EEXIST:268 # Already locked.269 if os.path.exists(self.unique_name):270 # Already locked by me.271 return272 if timeout is not None and time.time() > end_time:273 if timeout > 0:274 raise LockTimeout275 else:276 # Someone else has the lock.277 raise AlreadyLocked278 time.sleep(wait)279 else:280 # Couldn't create the lock for some other reason281 raise LockFailed("failed to create %s" % self.lock_file)282 else:283 open(self.unique_name, "wb").close()284 return285 def release(self):286 if not self.is_locked():287 raise NotLocked288 elif not os.path.exists(self.unique_name):289 raise NotMyLock290 os.unlink(self.unique_name)291 os.rmdir(self.lock_file)292 def is_locked(self):293 return os.path.exists(self.lock_file)294 def i_am_locking(self):295 return (self.is_locked() and296 os.path.exists(self.unique_name))297 def break_lock(self):298 if os.path.exists(self.lock_file):299 for name in os.listdir(self.lock_file):300 os.unlink(os.path.join(self.lock_file, name))301 os.rmdir(self.lock_file)302class SQLiteFileLock(LockBase):303 "Demonstrate SQL-based locking."304 import tempfile305 _fd, testdb = tempfile.mkstemp()306 os.close(_fd)307 os.unlink(testdb)308 del _fd, tempfile309 def __init__(self, path, threaded=True):310 """311 >>> lock = SQLiteFileLock('somefile')312 >>> lock = SQLiteFileLock('somefile', threaded=False)313 """314 LockBase.__init__(self, path, threaded)315 self.lock_file = unicode(self.lock_file)316 self.unique_name = unicode(self.unique_name)317 import sqlite3318 self.connection = sqlite3.connect(SQLiteFileLock.testdb)319 320 c = self.connection.cursor()321 try:322 c.execute("create table locks"323 "("324 " lock_file varchar(32),"325 " unique_name varchar(32)"326 ")")327 except sqlite3.OperationalError:328 pass329 else:330 self.connection.commit()331 import atexit332 atexit.register(os.unlink, SQLiteFileLock.testdb)333 def acquire(self, timeout=None):334 end_time = time.time()335 if timeout is not None and timeout > 0:336 end_time += timeout337 if timeout is None:338 wait = 0.1339 elif timeout <= 0:340 wait = 0341 else:342 wait = timeout / 10343 cursor = self.connection.cursor()344 while True:345 if not self.is_locked():346 # Not locked. Try to lock it.347 cursor.execute("insert into locks"348 " (lock_file, unique_name)"349 " values"350 " (?, ?)",351 (self.lock_file, self.unique_name))352 self.connection.commit()353 # Check to see if we are the only lock holder.354 cursor.execute("select * from locks"355 " where unique_name = ?",356 (self.unique_name,))357 rows = cursor.fetchall()358 if len(rows) > 1:359 # Nope. Someone else got there. Remove our lock.360 cursor.execute("delete from locks"361 " where unique_name = ?",362 (self.unique_name,))363 self.connection.commit()364 else:365 # Yup. We're done, so go home.366 return367 else:368 # Check to see if we are the only lock holder.369 cursor.execute("select * from locks"370 " where unique_name = ?",371 (self.unique_name,))372 rows = cursor.fetchall()373 if len(rows) == 1:374 # We're the locker, so go home.375 return376 377 # Maybe we should wait a bit longer.378 if timeout is not None and time.time() > end_time:379 if timeout > 0:380 # No more waiting.381 raise LockTimeout382 else:383 # Someone else has the lock and we are impatient..384 raise AlreadyLocked385 # Well, okay. We'll give it a bit longer.386 time.sleep(wait)387 def release(self):388 if not self.is_locked():389 raise NotLocked390 if not self.i_am_locking():391 raise NotMyLock((self._who_is_locking(), self.unique_name))392 cursor = self.connection.cursor()393 cursor.execute("delete from locks"394 " where unique_name = ?",395 (self.unique_name,))396 self.connection.commit()397 def _who_is_locking(self):398 cursor = self.connection.cursor()399 cursor.execute("select unique_name from locks"400 " where lock_file = ?",401 (self.lock_file,))402 return cursor.fetchone()[0]403 404 def is_locked(self):405 cursor = self.connection.cursor()406 cursor.execute("select * from locks"407 " where lock_file = ?",408 (self.lock_file,))409 rows = cursor.fetchall()410 return not not rows411 def i_am_locking(self):412 cursor = self.connection.cursor()413 cursor.execute("select * from locks"414 " where lock_file = ?"415 " and unique_name = ?",416 (self.lock_file, self.unique_name))417 return not not cursor.fetchall()418 def break_lock(self):419 cursor = self.connection.cursor()420 cursor.execute("delete from locks"421 " where lock_file = ?",422 (self.lock_file,))423 self.connection.commit()424if hasattr(os, "link"):425 FileLock = LinkFileLock426else:...
process_status_lock.py
Source:process_status_lock.py
1#! /usr/bin/env python32# -*- coding: utf-8 -*-3import os4from datetime import datetime5from time import sleep6from sppm.settings import hlog, SPPM_CONFIG7class ProcessStatusLock:8 MAX_IDLE_SECONDS = 29 @staticmethod10 def get_pid_from_file(pid_file):11 """12 ä»æ件读åPID13 :return:14 """15 with open(pid_file, 'r') as f:16 return int(f.readline())17 @staticmethod18 def wait_unlock_by_child(lock_file):19 n = 020 while True:21 hlog.debug('ä»èµæºæ件è·ååè¿ç¨ç¶æï¼æ£æµåè¿ç¨æ¯å¦ç©ºé²æéåº......ï¼%s' % lock_file)22 if SPPM_CONFIG.enable_timeout and n >= SPPM_CONFIG.timeout:23 hlog.debug('çå¾
åè¿ç¨éåºè¶
æ¶')24 break25 if not ProcessStatusLock.is_working(lock_file) and ProcessStatusLock.is_idle(lock_file):26 hlog.debug('åè¿ç¨ç©ºé²æå·²ç»éåº')27 break28 n += 129 sleep(1)30 @staticmethod31 def wait_unlock_by_file(lock_file):32 n = 033 while True:34 hlog.debug('çå¾
éæ¾æ件éï¼%s' % lock_file)35 if SPPM_CONFIG.enable_timeout and n >= SPPM_CONFIG.timeout:36 hlog.debug('çå¾
éæ¾æ件éè¶
æ¶')37 break38 if not os.path.exists(lock_file):39 hlog.debug('æ件éï¼%s å·²ç»éæ¾' % lock_file)40 break41 n += 142 sleep(1)43 @staticmethod44 def wait_unlock(lock_file):45 """46 çå¾
åè¿ç¨éæ¾æ件é47 :return:48 """49 # noinspection PyBroadException50 try:51 ProcessStatusLock.wait_unlock_by_child(lock_file)52 ProcessStatusLock.wait_unlock_by_file(lock_file)53 except Exception:54 hlog.info('åè¿ç¨å·²ç»éåº')55 @staticmethod56 def lock(process_pid, lock_file, is_working=False):57 """58 åè¿ç¨è¿è¡æ¶ï¼å建æ件é59 :param process_pid: è¿ç¨ç¼å·60 :param lock_file: èµæºæ件路å¾61 :param is_working: å·¥ä½ç¶æ62 :return:63 """64 block_timestamp = datetime.now().timestamp()65 with open(lock_file, 'w') as f:66 f.write(str(process_pid) + '\n')67 f.write(str(block_timestamp) + '\n')68 f.write(str(int(is_working)) + '\n')69 @staticmethod70 def get_block_timestamp(lock_file):71 """72 ä»æ件éè·åæ¶é´æ³73 :param lock_file: èµæºæ件路å¾74 :return:75 """76 block_timestamp = 0.077 block_timestamp_line_no = 278 line_no = 179 with open(lock_file, 'r') as f:80 if line_no == block_timestamp_line_no:81 block_timestamp = float(f.readline())82 line_no += 183 return block_timestamp84 @staticmethod85 def is_idle(lock_file):86 """87 è¿ç¨æ¯å¦ç©ºé²88 :param lock_file: èµæºæ件路å¾89 :return:90 """91 now_timestamp = datetime.now().timestamp()92 block_timestamp = ProcessStatusLock.get_block_timestamp(lock_file)93 result = now_timestamp - block_timestamp >= ProcessStatusLock.MAX_IDLE_SECONDS94 return result95 @staticmethod96 def is_working(lock_file):97 """98 è¿ç¨æ´»è·99 :param lock_file: èµæºæ件路å¾100 :return:101 """102 result = False103 total_line_num = 3104 with open(lock_file, 'r') as f:105 lines = f.readlines()106 if len(lines) != total_line_num:107 raise ValueError('æ æçèµæºæ件ï¼%s' % lock_file)108 result = int(lines[2])109 return result110def working_lock(is_active: bool):...
test_lock.py
Source:test_lock.py
1from pathlib import Path2import pytest3import unittest.mock as mock4from acondbs.misc.lock import lock5from acondbs.misc.lock import TimeOutAcquiringLock6##__________________________________________________________________||7def test_acquire_release(tmpdir_factory):8 folder = Path(tmpdir_factory.mktemp('lock'))9 lock_file = folder.joinpath('.lock')10 l = lock(lock_file)11 assert not l.locked12 l.acquire()13 assert l.locked14 assert lock_file.exists()15 l.release()16 assert not l.locked17 assert not lock_file.exists()18##__________________________________________________________________||19def test_release_when_not_lockded(tmpdir_factory):20 folder = Path(tmpdir_factory.mktemp('lock'))21 lock_file = folder.joinpath('.lock')22 l = lock(lock_file)23 assert not l.locked24 l.release()25 assert not l.locked26 assert not lock_file.exists()27##__________________________________________________________________||28def test_release_not_delete_when_not_lockded(tmpdir_factory):29 folder = Path(tmpdir_factory.mktemp('lock'))30 lock_file = folder.joinpath('.lock')31 lock_file.touch()32 l = lock(lock_file)33 assert not l.locked34 l.release()35 assert not l.locked36 assert lock_file.exists()37##__________________________________________________________________||38def test_acquire_timeout(tmpdir_factory):39 timeout = 0.1 # sec40 folder = Path(tmpdir_factory.mktemp('lock'))41 lock_file = folder.joinpath('.lock')42 lock_file.touch()43 l = lock(lock_file, timeout=timeout)44 with pytest.raises(TimeOutAcquiringLock):45 l.acquire()46##__________________________________________________________________||47def test_with_success(tmpdir_factory):48 folder = Path(tmpdir_factory.mktemp('lock'))49 lock_file = folder.joinpath('.lock')50 with lock(lock_file) as l:51 assert l.locked52 assert lock_file.exists()53 assert not l.locked54 assert not lock_file.exists()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!