Best Python code snippet using playwright-python
Trimmer.py
Source:Trimmer.py
...20 for region in regions:21 view.erase(edit, region)22 has_matches = True23 if has_matches:24 sublime.set_timeout(lambda: sublime.status_message(25 'Trimmer: trailing whitespace removed.'), 0)26 else:27 sublime.set_timeout(lambda: sublime.status_message(28 'Trimmer: no trailing whitespace found.'), 0)29class DeleteEmptyLinesCommand(sublime_plugin.TextCommand):30 def run(self, edit):31 view = self.view32 has_matches = False33 reobj = re.compile('^[ \t]*$\r?\n', re.MULTILINE)34 for region in selections(view):35 str_buffer = view.substr(region)36 trimmed = reobj.sub('', str_buffer)37 if str_buffer != trimmed:38 view.replace(edit, region, trimmed)39 has_matches = True40 if has_matches is True:41 sublime.set_timeout(lambda: sublime.status_message(42 'Trimmer: empty lines deleted.'), 0)43 else:44 sublime.set_timeout(lambda: sublime.status_message(45 'Trimmer: no empty lines to delete.'), 0)46class RemoveBlankSpaces(sublime_plugin.TextCommand):47 def run(self, edit):48 view = self.view49 has_matches = False50 reobj = re.compile(r'[ \t\r\n\v\f]')51 for region in selections(view):52 str_buffer = view.substr(region)53 trimmed = reobj.sub('', str_buffer)54 if str_buffer != trimmed:55 view.replace(edit, region, trimmed)56 has_matches = True57 if has_matches is True:58 sublime.set_timeout(lambda: sublime.status_message(59 'Trimmer: blanks spaces removed.'), 0)60 else:61 sublime.set_timeout(lambda: sublime.status_message(62 'Trimmer: no blank spaces to remove.'), 0)63class TrimLeadingWhitespaceCommand(sublime_plugin.TextCommand):64 def run(self, edit):65 view = self.view66 has_matches = False67 reobj = re.compile('^[ \t]+', re.MULTILINE)68 for region in selections(view):69 str_buffer = view.substr(region)70 trimmed = reobj.sub('', str_buffer)71 if str_buffer != trimmed:72 view.replace(edit, region, trimmed)73 has_matches = True74 if has_matches is True:75 sublime.set_timeout(lambda: sublime.status_message(76 'Trimmer: leading whitespace removed.'), 0)77 else:78 sublime.set_timeout(lambda: sublime.status_message(79 'Trimmer: no leading whitespace to remove.'), 0)80class TrimLeadingTrailingWhitespace(sublime_plugin.TextCommand):81 def run(self, edit):82 view = self.view83 has_matches = False84 reobj = re.compile('^[ \t]+|[\t ]+$', re.MULTILINE)85 for region in selections(view):86 str_buffer = view.substr(region)87 trimmed = reobj.sub('', str_buffer)88 if str_buffer != trimmed:89 view.replace(edit, region, trimmed)90 has_matches = True91 if has_matches is True:92 sublime.set_timeout(lambda: sublime.status_message(93 'Trimmer: leading and trailing whitespace removed.'), 0)94 else:95 sublime.set_timeout(lambda: sublime.status_message(96 'Trimmer: no leading or trailing whitespace to remove.'), 0)97class CollapseLines(sublime_plugin.TextCommand):98 def run(self, edit):99 view = self.view100 has_matches = False101 reobj = re.compile(r'(?:\s*)(\r?\n)(?:\s*)(?:\r?\n+)')102 for region in selections(view):103 str_buffer = view.substr(region)104 trimmed = reobj.sub(r'\1\1', str_buffer)105 if str_buffer != trimmed:106 view.replace(edit, region, trimmed)107 has_matches = True108 if has_matches is True:109 sublime.set_timeout(lambda: sublime.status_message(110 'Trimmer: lines collapsed.'), 0)111 else:112 sublime.set_timeout(lambda: sublime.status_message(113 'Trimmer: no lines to collapse.'), 0)114class CollapseSpaces(sublime_plugin.TextCommand):115 def run(self, edit):116 view = self.view117 has_matches = False118 reobj = re.compile('([ ])[ ]+')119 for region in selections(view):120 str_buffer = view.substr(region)121 trimmed = reobj.sub(r'\1', str_buffer)122 if str_buffer != trimmed:123 view.replace(edit, region, trimmed)124 has_matches = True125 if has_matches is True:126 sublime.set_timeout(lambda: sublime.status_message(127 'Trimmer: spaces collapsed.'), 0)128 else:129 sublime.set_timeout(lambda: sublime.status_message(130 'Trimmer: no spaces to collapse.'), 0)131class NormalizeSpaces(sublime_plugin.TextCommand):132 def run(self, edit):133 view = self.view134 has_matches = False135 reobj = re.compile('(^\\s+)|\\s(?=\\s+)|(\\s+$)')136 for region in selections(view):137 str_buffer = view.substr(region)138 trimmed = reobj.sub('', str_buffer)139 if str_buffer != trimmed:140 view.replace(edit, region, trimmed)141 has_matches = True142 if has_matches is True:143 sublime.set_timeout(lambda: sublime.status_message(144 'Trimmer: spaces normalized.'), 0)145 else:146 sublime.set_timeout(lambda: sublime.status_message(147 'Trimmer: no spaces to normalize.'), 0)148class TokenizeString(sublime_plugin.TextCommand):149 def run(self, edit):150 view = self.view151 has_matches = False152 reobj = re.compile('^ |\t+|( ){2,}|\t| |\t+$', re.MULTILINE)153 for region in selections(view):154 str_buffer = view.substr(region)155 trimmed = reobj.sub('', str_buffer)156 if str_buffer != trimmed:157 view.replace(edit, region, trimmed)158 has_matches = True159 if has_matches is True:160 sublime.set_timeout(lambda: sublime.status_message(161 'Trimmer: string tokenized.'), 0)162 else:163 sublime.set_timeout(lambda: sublime.status_message(164 'Trimmer: nothing to tokenize.'), 0)165class TrimEdges(sublime_plugin.TextCommand):166 def run(self, edit):167 view = self.view168 has_matches = False169 reobj = re.compile(r'(\A\s+|\s+\Z)')170 if view.size() > 0:171 region = sublime.Region(0, view.size())172 str_buffer = view.substr(region)173 trimmed = reobj.sub('', str_buffer)174 if str_buffer != trimmed:175 view.replace(edit, region, trimmed)176 has_matches = True177 if has_matches is True:178 return sublime.set_timeout(lambda: sublime.status_message(179 'Trimmer: file edges trimmed.'), 0)180 else:181 return sublime.set_timeout(lambda: sublime.status_message(182 'Trimmer: no file edges to trim.'), 0)183class DeleteEmptyTags(sublime_plugin.TextCommand):184 def run(self, edit):185 view = self.view186 has_matches = False187 reobj = re.compile(r'<([A-Z][A-Z0-9]*)\b[^>]*>\s*</\1>', re.IGNORECASE)188 for region in selections(view):189 str_buffer = view.substr(region)190 trimmed = reobj.sub('', str_buffer)191 if str_buffer != trimmed:192 view.replace(edit, region, trimmed)193 has_matches = True194 if has_matches is True:195 sublime.set_timeout(lambda: sublime.status_message(196 'Trimmer: empty tags deleted.'), 0)197 else:198 sublime.set_timeout(lambda: sublime.status_message(199 'Trimmer: no empty tags to delete.'), 0)200class TrimSelections(sublime_plugin.TextCommand):201 def run(self, edit):202 """203 Trim leading and trailing whitespace from selections.204 Originally from the 'MultiâEditâUtils' Plug-in205 https://github.com/philippotto/Sublime-MultiEditUtils206 """207 view = self.view208 selection = view.sel()209 new_regions = []210 for current_region in selection:211 text = view.substr(current_region)212 l_stripped_text = text.lstrip()213 r_stripped_text = l_stripped_text.rstrip()214 l_stripped_count = len(text) - len(l_stripped_text)215 r_stripped_count = len(l_stripped_text) - len(r_stripped_text)216 a = current_region.begin() + l_stripped_count217 b = current_region.end() - r_stripped_count218 if a == b:219 # the region only contained whitespace220 # use the old selection end to avoid jumping of cursor221 a = b = current_region.b222 new_regions.append(sublime.Region(a, b))223 selection.clear()224 for region in new_regions:225 selection.add(region)226 sublime.set_timeout(lambda: sublime.status_message(227 'Trimmer: selections trimmed.'), 0)228class RemoveComments(sublime_plugin.TextCommand):229 def run(self, edit):230 view = self.view231 has_matches = False232 re_single_line_comment = re.compile(r"//.*$", re.MULTILINE)233 re_hash_comment = re.compile("#[^!].*$", re.MULTILINE)234 re_html_comment = re.compile("<!--.*?-->", re.DOTALL)235 re_block_comment = re.compile(r"/\*.*?\*/", re.DOTALL)236 re_ini_comment = re.compile(r"^(?:\s+)?;.*$", re.MULTILINE)237 for region in selections(view):238 str_buffer = view.substr(region)239 #240 # TODO: re-work this brute force approach and filter by syntax/scope241 trimmed = re_single_line_comment.sub('', str_buffer)242 trimmed = re_hash_comment.sub('', trimmed)243 trimmed = re_html_comment.sub('', trimmed)244 trimmed = re_block_comment.sub('', trimmed)245 trimmed = re_ini_comment.sub('', trimmed)246 if str_buffer != trimmed:247 view.replace(edit, region, trimmed)248 has_matches = True249 if has_matches is True:250 view.run_command('collapse_lines')251 sublime.set_timeout(lambda: sublime.status_message(252 'Trimmer: comments removed.'), 0)253 else:254 sublime.set_timeout(lambda: sublime.status_message(255 'Trimmer: no comments to remove.'), 0)256class ReplaceSmartCharactersCommand(sublime_plugin.TextCommand):257 def run(self, edit):258 view = self.view259 has_matches = False260 smart_replacements = [261 [u'[âââ]', u'\''],262 [u'[ââ]', u'"'],263 [u'[â]', u'"'],264 [u'[â¦]', u'...'],265 [u'[â]', u'---'],266 [u'[â]', u'--'],267 [u'[â¢]', u'*'],268 [u'[·]', u'-'],269 [u'[â]', u' '],270 [u'[â]', u' '],271 [u'[ ââ]', u' '],272 [u'[«]', u'<<'],273 [u'[»]', u'>>'],274 [u'[©]', u'(C)'],275 [u'[®]', u'(R)'],276 [u'[â¢]', u'(TM)']277 ]278 for replacement in smart_replacements:279 for region in selections(view):280 source_text = view.substr(region)281 if len(source_text) > 0:282 replaced_text = re.sub(283 replacement[0], replacement[1], source_text)284 if source_text != replaced_text:285 view.replace(edit, region, replaced_text)286 has_matches = True287 if has_matches is True:288 sublime.set_timeout(lambda: sublime.status_message(289 'Trimmer: smart characters replaced.'), 0)290 else:291 sublime.set_timeout(lambda: sublime.status_message(...
python_html.py
Source:python_html.py
...145 desear()146def num_giros_dados():147 numero = ran.randint(5, 9)148 if numero == 5:149 timer.set_timeout(girar_dado1, 200)150 timer.set_timeout(girar_dado2, 200)151 timer.set_timeout(girar_dado1, 400)152 timer.set_timeout(girar_dado2, 400)153 timer.set_timeout(girar_dado1, 600)154 timer.set_timeout(girar_dado2, 600)155 timer.set_timeout(girar_dado1, 800)156 timer.set_timeout(girar_dado2, 800)157 timer.set_timeout(girar_dado1, 1000)158 timer.set_timeout(girar_dado2, 1000)159 elif numero == 6:160 timer.set_timeout(girar_dado1, 200)161 timer.set_timeout(girar_dado2, 200)162 timer.set_timeout(girar_dado1, 400)163 timer.set_timeout(girar_dado2, 400)164 timer.set_timeout(girar_dado1, 600)165 timer.set_timeout(girar_dado2, 600)166 timer.set_timeout(girar_dado1, 800)167 timer.set_timeout(girar_dado2, 800)168 timer.set_timeout(girar_dado1, 1000)169 timer.set_timeout(girar_dado2, 1000)170 timer.set_timeout(girar_dado1, 1200)171 timer.set_timeout(girar_dado2, 1200)172 elif numero == 7:173 timer.set_timeout(girar_dado1, 200)174 timer.set_timeout(girar_dado2, 200)175 timer.set_timeout(girar_dado1, 400)176 timer.set_timeout(girar_dado2, 400)177 timer.set_timeout(girar_dado1, 600)178 timer.set_timeout(girar_dado2, 600)179 timer.set_timeout(girar_dado1, 800)180 timer.set_timeout(girar_dado2, 800)181 timer.set_timeout(girar_dado1, 1000)182 timer.set_timeout(girar_dado2, 1000)183 timer.set_timeout(girar_dado1, 1200)184 timer.set_timeout(girar_dado2, 1200)185 timer.set_timeout(girar_dado1, 1400)186 timer.set_timeout(girar_dado2, 1400)187 elif numero == 8:188 timer.set_timeout(girar_dado1, 200)189 timer.set_timeout(girar_dado2, 200)190 timer.set_timeout(girar_dado1, 400)191 timer.set_timeout(girar_dado2, 400)192 timer.set_timeout(girar_dado1, 600)193 timer.set_timeout(girar_dado2, 600)194 timer.set_timeout(girar_dado1, 800)195 timer.set_timeout(girar_dado2, 800)196 timer.set_timeout(girar_dado1, 1000)197 timer.set_timeout(girar_dado2, 1000)198 timer.set_timeout(girar_dado1, 1200)199 timer.set_timeout(girar_dado2, 1200)200 timer.set_timeout(girar_dado1, 1400)201 timer.set_timeout(girar_dado2, 1400)202 timer.set_timeout(girar_dado1, 1600)203 timer.set_timeout(girar_dado2, 1600)204 elif numero == 9:205 timer.set_timeout(girar_dado1, 200)206 timer.set_timeout(girar_dado2, 200)207 timer.set_timeout(girar_dado1, 400)208 timer.set_timeout(girar_dado1, 400)209 timer.set_timeout(girar_dado2, 600)210 timer.set_timeout(girar_dado1, 600)211 timer.set_timeout(girar_dado2, 800)212 timer.set_timeout(girar_dado1, 800)213 timer.set_timeout(girar_dado2, 1000)214 timer.set_timeout(girar_dado1, 1000)215 timer.set_timeout(girar_dado2, 1200)216 timer.set_timeout(girar_dado2, 1200)217 timer.set_timeout(girar_dado1, 1400)218 timer.set_timeout(girar_dado2, 1400)219 timer.set_timeout(girar_dado1, 1600)220 timer.set_timeout(girar_dado2, 1600)221 timer.set_timeout(girar_dado1, 1800)222 timer.set_timeout(girar_dado2, 1800)223 timer.set_timeout(ejecutar, 1900)224def sorteo(ev):225 num_giros_dados()226def sorteo_par(ev):227 document['sel'].text = "Par"228 sorteo(ev)229def sorteo_impar(ev):230 document['sel'].text = "Impar"231 sorteo(ev)232def suma_apuesta(ev):233 captura = document['apuesta'].value234 if (int(captura) > billetera):235 document['apuesta'].style.color = 'red'236 document['par'].unbind('click')237 document['impar'].unbind('click')...
test_server.py
Source:test_server.py
...28 signal.signal(signal.SIGINT, old_intr)29 signal.signal(signal.SIGTERM, old_term)30 signal.signal(signal.SIGUSR1, old_term)31@pytest.fixture32def set_timeout():33 def make_timeout(sec, callback):34 def _callback(signum, frame):35 signal.alarm(0)36 callback()37 signal.signal(signal.SIGALRM, _callback)38 signal.setitimer(signal.ITIMER_REAL, sec)39 yield make_timeout40@pytest.fixture41def exec_recorder():42 f = tempfile.NamedTemporaryFile(43 mode='w', encoding='utf8',44 prefix='aiotools.tests.server.',45 )46 f.close()47 def write(msg: str) -> None:48 path = f"{f.name}.{os.getpid()}"49 with open(path, 'a', encoding='utf8') as writer:50 writer.write(msg + '\n')51 def read() -> Sequence[str]:52 lines: List[str] = []53 for path in glob.glob(f"{f.name}.*"):54 with open(path, 'r', encoding='utf8') as reader:55 lines.extend(line.strip() for line in reader.readlines())56 return lines57 yield write, read58 for path in glob.glob(f"{f.name}.*"):59 os.unlink(path)60def interrupt():61 os.kill(0, signal.SIGINT)62def interrupt_usr1():63 os.kill(os.getpid(), signal.SIGUSR1)64@aiotools.server # type: ignore65async def myserver_simple(loop, proc_idx, args):66 write = args[0]67 await asyncio.sleep(0)68 write(f'started:{proc_idx}')69 yield70 await asyncio.sleep(0)71 write(f'terminated:{proc_idx}')72def test_server_singleproc(set_timeout, restore_signal, exec_recorder):73 write, read = exec_recorder74 set_timeout(0.2, interrupt)75 aiotools.start_server(76 myserver_simple,77 args=(write,),78 )79 lines = set(read())80 assert 'started:0' in lines81 assert 'terminated:0' in lines82def test_server_multiproc(set_timeout, restore_signal, exec_recorder):83 write, read = exec_recorder84 set_timeout(0.2, interrupt)85 aiotools.start_server(86 myserver_simple,87 num_workers=3,88 args=(write,),89 )90 lines = set(read())91 assert lines == {92 'started:0', 'started:1', 'started:2',93 'terminated:0', 'terminated:1', 'terminated:2',94 }95@aiotools.server # type: ignore96async def myserver_signal(loop, proc_idx, args):97 write = args[0]98 await asyncio.sleep(0)99 write(f'started:{proc_idx}')100 received_signum = yield101 await asyncio.sleep(0)102 write(f'terminated:{proc_idx}:{received_signum}')103def test_server_multiproc_custom_stop_signals(104 set_timeout,105 restore_signal,106 exec_recorder,107):108 write, read = exec_recorder109 set_timeout(0.2, interrupt_usr1)110 aiotools.start_server(111 myserver_signal,112 num_workers=2,113 stop_signals={signal.SIGUSR1},114 args=(write,),115 )116 lines = set(read())117 assert {'started:0', 'started:1'} < lines118 assert {119 f'terminated:0:{int(signal.SIGUSR1)}',120 f'terminated:1:{int(signal.SIGUSR1)}',121 } < lines122@aiotools.server # type: ignore123async def myserver_worker_init_error(loop, proc_idx, args):124 write = args[0]125 class _LogAdaptor:126 def __init__(self, writer):127 self.writer = writer128 def write(self, msg):129 msg = msg.strip().replace('\n', ' ')130 self.writer(f'log:{proc_idx}:{msg}')131 log_stream = _LogAdaptor(write)132 logging.config.dictConfig({133 'version': 1,134 'handlers': {135 'console': {136 'class': 'logging.StreamHandler',137 'stream': log_stream,138 'level': 'DEBUG',139 },140 },141 'loggers': {142 'aiotools': {143 'handlers': ['console'],144 'level': 'DEBUG',145 },146 },147 })148 log = logging.getLogger('aiotools')149 write(f'started:{proc_idx}')150 log.debug('hello')151 if proc_idx in (0, 2):152 # delay until other workers start normally.153 await asyncio.sleep(0.1 * proc_idx)154 raise ZeroDivisionError('oops')155 yield156 # should not be reached if errored.157 await asyncio.sleep(0)158 write(f'terminated:{proc_idx}')159def test_server_worker_init_error(restore_signal, exec_recorder):160 write, read = exec_recorder161 aiotools.start_server(162 myserver_worker_init_error,163 num_workers=4,164 args=(write,),165 )166 lines = set(read())167 assert sum(1 if line.startswith('started:') else 0 for line in lines) == 4168 # workers who did not raise errors have already started,169 # and they should have terminated normally170 # when the errorneous worker interrupted the main loop.171 assert sum(1 if line.startswith('terminated:') else 0 for line in lines) == 2172 assert sum(1 if 'hello' in line else 0 for line in lines) == 4173 assert sum(1 if 'ZeroDivisionError: oops' in line else 0 for line in lines) == 2174def test_server_user_main(set_timeout, restore_signal):175 main_enter = False176 main_exit = False177 @aiotools.main178 def mymain_user_main():179 nonlocal main_enter, main_exit180 main_enter = True181 yield 987182 main_exit = True183 @aiotools.server # type: ignore184 async def myworker_user_main(loop, proc_idx, args):185 assert args[0] == 987 # first arg from user main186 assert args[1] == 123 # second arg from start_server args187 yield188 set_timeout(0.2, interrupt)189 aiotools.start_server(190 myworker_user_main,191 mymain_user_main,192 num_workers=3,193 args=(123,),194 )195 assert main_enter196 assert main_exit197def test_server_user_main_custom_stop_signals(set_timeout, restore_signal):198 main_enter = False199 main_exit = False200 main_signal = None201 worker_signals = mp.Array('i', 3)202 @aiotools.main203 def mymain():204 nonlocal main_enter, main_exit, main_signal205 main_enter = True206 main_signal = yield207 main_exit = True208 @aiotools.server209 async def myworker(loop, proc_idx, args):210 worker_signals = args[0]211 worker_signals[proc_idx] = yield212 def noop(signum, frame):213 pass214 set_timeout(0.2, interrupt_usr1)215 aiotools.start_server(216 myworker,217 mymain,218 num_workers=3,219 stop_signals={signal.SIGUSR1},220 args=(worker_signals,),221 )222 assert main_enter223 assert main_exit224 assert main_signal == signal.SIGUSR1225 assert list(worker_signals) == [signal.SIGUSR1] * 3226def test_server_user_main_tuple(set_timeout, restore_signal):227 main_enter = False228 main_exit = False229 @aiotools.main230 def mymain():231 nonlocal main_enter, main_exit232 main_enter = True233 yield 987, 654234 main_exit = True235 @aiotools.server236 async def myworker(loop, proc_idx, args):237 assert args[0] == 987 # first arg from user main238 assert args[1] == 654 # second arg from user main239 assert args[2] == 123 # third arg from start_server args240 yield241 set_timeout(0.2, interrupt)242 aiotools.start_server(243 myworker,244 mymain,245 num_workers=3,246 args=(123,),247 )248 assert main_enter249 assert main_exit250def test_server_extra_proc(set_timeout, restore_signal):251 extras = mp.Array('i', [0, 0])252 def extra_proc(key, _, pidx, args):253 assert _ is None254 extras[key] = 980 + key255 try:256 while True:257 time.sleep(0.1)258 except KeyboardInterrupt:259 print(f'extra[{key}] interrupted', file=sys.stderr)260 except Exception as e:261 print(f'extra[{key}] exception', e, file=sys.stderr)262 finally:263 print(f'extra[{key}] finish', file=sys.stderr)264 extras[key] = 990 + key265 @aiotools.server266 async def myworker(loop, pidx, args):267 yield268 set_timeout(0.2, interrupt)269 aiotools.start_server(myworker, extra_procs=[270 functools.partial(extra_proc, 0),271 functools.partial(extra_proc, 1)],272 num_workers=3, args=(123, ))273 assert extras[0] == 990274 assert extras[1] == 991275def test_server_extra_proc_custom_stop_signal(set_timeout, restore_signal):276 received_signals = mp.Array('i', [0, 0])277 def extra_proc(key, _, pidx, args):278 received_signals = args[0]279 try:280 while True:281 time.sleep(0.1)282 except aiotools.InterruptedBySignal as e:283 received_signals[key] = e.args[0]284 @aiotools.server285 async def myworker(loop, pidx, args):286 yield287 set_timeout(0.3, interrupt_usr1)288 aiotools.start_server(myworker, extra_procs=[289 functools.partial(extra_proc, 0),290 functools.partial(extra_proc, 1)],291 stop_signals={signal.SIGUSR1},292 args=(received_signals, ),293 num_workers=3)294 assert received_signals[0] == signal.SIGUSR1...
test_lock.py
Source:test_lock.py
...68 t.start()69 for t in threads:70 t.join()71 def test03_lock_timeout(self):72 self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)73 self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 0)74 self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)75 self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 0)76 self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)77 self.assertEqual(self.env.get_timeout(db.DB_SET_LOCK_TIMEOUT), 123456)78 self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)79 self.assertEqual(self.env.get_timeout(db.DB_SET_TXN_TIMEOUT), 7890123)80 def test04_lock_timeout2(self):81 self.env.set_timeout(0, db.DB_SET_LOCK_TIMEOUT)82 self.env.set_timeout(0, db.DB_SET_TXN_TIMEOUT)83 self.env.set_timeout(123456, db.DB_SET_LOCK_TIMEOUT)84 self.env.set_timeout(7890123, db.DB_SET_TXN_TIMEOUT)85 def deadlock_detection() :86 while not deadlock_detection.end :87 deadlock_detection.count = \88 self.env.lock_detect(db.DB_LOCK_EXPIRE)89 if deadlock_detection.count :90 while not deadlock_detection.end :91 pass92 break93 time.sleep(0.01)94 deadlock_detection.end=False95 deadlock_detection.count=096 t=Thread(target=deadlock_detection)97 import sys98 if sys.version_info[0] < 3 :99 t.setDaemon(True)100 else :101 t.daemon = True102 t.start()103 self.env.set_timeout(100000, db.DB_SET_LOCK_TIMEOUT)104 anID = self.env.lock_id()105 anID2 = self.env.lock_id()106 self.assertNotEqual(anID, anID2)107 lock = self.env.lock_get(anID, "shared lock", db.DB_LOCK_WRITE)108 start_time=time.time()109 self.assertRaises(db.DBLockNotGrantedError,110 self.env.lock_get,anID2, "shared lock", db.DB_LOCK_READ)111 end_time=time.time()112 deadlock_detection.end=True113 # Floating point rounding114 self.assertTrue((end_time-start_time) >= 0.0999)115 self.env.lock_put(lock)116 t.join()117 self.env.lock_id_free(anID)...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!