Best Python code snippet using playwright-python
_strategy.py
Source:_strategy.py
...225 success = False226 reason = ("apply of stage id %s not supported during an abort"227 % stage_id)228 return success, reason229 def _abort(self, stage_id=None):230 """231 Strategy Abort232 """233 if STRATEGY_PHASE.APPLY == self._current_phase:234 if stage_id is not None:235 if not self.apply_phase.is_inprogress() or \236 stage_id != self.apply_phase.current_stage:237 reason = "apply not inprogress for stage id %s" % stage_id238 return False, reason239 if self._state in [STRATEGY_STATE.APPLYING, STRATEGY_STATE.APPLY_FAILED,240 STRATEGY_STATE.APPLY_TIMEOUT]:241 self._state = STRATEGY_STATE.ABORTING242 abort_phase = self.apply_phase.abort()243 if not abort_phase:244 abort_phase = StrategyPhase(STRATEGY_PHASE.ABORT)245 abort_phase.strategy = self246 self._phase[STRATEGY_PHASE.ABORT] = abort_phase247 # In the case of a single stage apply, if we are not currently248 # applying anything, we need to go to the aborted state now.249 if self.apply_phase.current_stage == self.apply_phase.stop_at_stage:250 self._state = STRATEGY_STATE.ABORTED251 self.abort_complete(STRATEGY_RESULT.ABORTED, "")252 elif STRATEGY_STATE.APPLIED != self._state:253 self._state = STRATEGY_STATE.ABORTED254 else:255 reason = "apply not inprogress"256 return False, reason257 else:258 reason = "apply not inprogress"259 return False, reason260 return True, ''261 def _handle_event(self, event, event_data=None):262 """263 Strategy Handle Event264 """265 handled = False266 if STRATEGY_STATE.BUILDING == self._state:267 if STRATEGY_PHASE.BUILD == self._current_phase:268 handled = self.build_phase.handle_event(event, event_data)269 elif STRATEGY_STATE.APPLYING == self._state:270 if STRATEGY_PHASE.APPLY == self._current_phase:271 handled = self.apply_phase.handle_event(event, event_data)272 elif STRATEGY_STATE.ABORTING == self._state:273 if STRATEGY_PHASE.APPLY == self._current_phase:274 handled = self.apply_phase.handle_event(event, event_data)275 elif STRATEGY_PHASE.ABORT == self._current_phase:276 handled = self.abort_phase.handle_event(event, event_data)277 return handled278 def phase_save(self):279 """280 Strategy Phase Save281 """282 self.save()283 def phase_extend_timeout(self, phase):284 """285 Strategy Phase Extend Timeout286 """287 phase.refresh_timeouts()288 def phase_complete(self, phase, phase_result, phase_result_reason=None):289 """290 Strategy Phase Complete291 """292 self.save()293 result, result_reason = \294 strategy_result_update(STRATEGY_RESULT.INITIAL, '',295 phase_result, phase_result_reason)296 if STRATEGY_STATE.BUILDING == self._state:297 if self._phase[STRATEGY_PHASE.BUILD] == phase:298 if phase.is_success() or phase.is_degraded():299 self._state = STRATEGY_STATE.READY_TO_APPLY300 self.build_complete(result, result_reason)301 elif phase.is_failed():302 self._state = STRATEGY_STATE.BUILD_FAILED303 self.build_complete(result, result_reason)304 elif phase.is_timed_out():305 self._state = STRATEGY_STATE.BUILD_TIMEOUT306 self.build_complete(result, result_reason)307 elif STRATEGY_STATE.APPLYING == self._state:308 if self._phase[STRATEGY_PHASE.APPLY] == phase:309 if phase.is_success() or phase.is_degraded():310 self._state = STRATEGY_STATE.APPLIED311 self.apply_complete(result, result_reason)312 elif phase.is_failed():313 self._state = STRATEGY_STATE.APPLY_FAILED314 self.apply_complete(result, result_reason)315 self._abort()316 self._current_phase = STRATEGY_PHASE.ABORT317 self.abort_phase.apply()318 elif phase.is_timed_out():319 self._state = STRATEGY_STATE.APPLY_TIMEOUT320 self.apply_complete(result, result_reason)321 self._abort()322 self._current_phase = STRATEGY_PHASE.ABORT323 self.abort_phase.apply()324 elif STRATEGY_STATE.ABORTING == self._state:325 if self._phase[STRATEGY_PHASE.APPLY] == phase:326 if phase.is_success() or phase.is_degraded():327 self._state = STRATEGY_STATE.APPLIED328 self.apply_complete(result, result_reason)329 elif phase.is_failed():330 self._state = STRATEGY_STATE.APPLY_FAILED331 self.apply_complete(result, result_reason)332 elif phase.is_timed_out():333 self._state = STRATEGY_STATE.APPLY_TIMEOUT334 self.apply_complete(result, result_reason)335 self._current_phase = STRATEGY_PHASE.ABORT336 self.abort_phase.apply()337 elif self._phase[STRATEGY_PHASE.ABORT] == phase:338 if phase.is_success() or phase.is_degraded():339 self._state = STRATEGY_STATE.ABORTED340 self.abort_complete(result, result_reason)341 elif phase.is_failed():342 self._state = STRATEGY_STATE.ABORT_FAILED343 self.abort_complete(result, result_reason)344 elif phase.is_timed_out():345 self._state = STRATEGY_STATE.ABORT_TIMEOUT346 self.abort_complete(result, result_reason)347 self.save()348 def refresh_timeouts(self):349 """350 Strategy Refresh Timeouts351 """352 self.build_phase.refresh_timeouts()353 self.apply_phase.refresh_timeouts()354 self.abort_phase.refresh_timeouts()355 def save(self):356 """357 Strategy Save (can be overridden by child class)358 """359 pass360 def build(self):361 """362 Strategy Build (can be overridden by child class)363 """364 self._build()365 self.save()366 def build_complete(self, result, result_reason):367 """368 Strategy Build Complete (can be overridden by child class)369 """370 self.save()371 return result, result_reason372 def apply(self, stage_id):373 """374 Strategy Apply (can be overridden by child class)375 """376 success, reason = self._apply(stage_id)377 self.save()378 return success, reason379 def apply_complete(self, result, result_reason):380 """381 Strategy Apply Complete (can be overridden by child class)382 """383 self.save()384 return result, result_reason385 def abort(self, stage_id):386 """387 Strategy Abort (can be overridden by child class)388 """389 success, reason = self._abort(stage_id)390 self.save()391 return success, reason392 def abort_complete(self, result, result_reason):393 """394 Strategy Abort Complete (can be overridden by child class)395 """396 self.save()397 return result, result_reason398 def handle_event(self, event, event_data=None):399 """400 Strategy Handle Event (can be overridden by child class)401 """402 return self._handle_event(event, event_data)403 def from_dict(self, data, build_phase=None, apply_phase=None, abort_phase=None):...
tasks.py
Source:tasks.py
1from datetime import datetime2from time import sleep3from subprocess import STDOUT, Popen, PIPE4import traceback5import logbook6from logbook import FileHandler, NestedSetup7from sqlalchemy import Column, DateTime, ForeignKey, Integer, String8from sqlalchemy.ext.declarative import declared_attr9from sqlalchemy.orm import relationship, backref10from db import Base, session11from db.fields import JSONEncodedDict12from log_utils import log_filename, get_thread_handlers13from thread_utils import make_exec_threaded, thread_wait14def action_fn(action):15 def do_action(instance):16 action_start_dt = getattr(instance, '%s_start_dt' % (action,), None)17 if not action_start_dt:18 instance.call_and_record_action(action)19 else:20 raise Exception('%s already started at %s' % 21 (action, action_start_dt))22 return do_action23class Task(Base):24 __tablename__ = 'task'25 id = Column(Integer, primary_key=True)26 type = Column(String(50), nullable=False)27 rollout_id = Column(Integer, ForeignKey('rollout.id'), nullable=False)28 parent_id = Column(Integer, ForeignKey('task.id'))29 state = Column(JSONEncodedDict(1000))30 run_start_dt = Column(DateTime)31 run_error = Column(String(500))32 run_error_dt = Column(DateTime)33 run_return = Column(String(500))34 run_return_dt = Column(DateTime)35 run_traceback = Column(String(1000))36 revert_start_dt = Column(DateTime)37 revert_error = Column(String(500))38 revert_error_dt = Column(DateTime)39 revert_return = Column(String(500))40 revert_return_dt = Column(DateTime)41 revert_traceback = Column(String(1000))42 rollout = relationship('Rollout', backref=backref('tasks', order_by=id))43 children = relationship('Task', backref=backref('parent', remote_side='Task.id', order_by=id))44 @declared_attr45 def __mapper_args__(cls):46 # Implementing single table inheritance47 if cls.__name__ == 'Task':48 return {'polymorphic_on': cls.type,}49 else:50 return {'polymorphic_identity': cls.__name__}51 def __init__(self, rollout_id, *args, **kwargs):52 self.rollout_id = rollout_id53 self.state = {}54 self._init(*args, **kwargs)55 @classmethod56 def _from_id(cls, id):57 return session.Session.query(cls).get(id)58 def _init(self, *args, **kwargs):59 pass60 @classmethod61 def _run(cls, state, children, abort, term):62 pass63 @classmethod64 def _revert(cls, state, children, abort, term):65 pass66 run = action_fn('run')67 __revert = action_fn('revert')68 def revert(self):69 if not self.run_start_dt:70 raise Exception('Cannot revert before running')71 self.__revert()72 run_threaded = make_exec_threaded('run')73 revert_threaded = make_exec_threaded('revert')74 def get_signals(self, action):75 from rollout import Rollout76 signal_action = {'run': 'rollout', 'revert': 'rollback'}[action]77 abort_signal = Rollout.get_signal(self.rollout_id, 'abort_%s' % signal_action)78 term_signal = Rollout.get_signal(self.rollout_id, 'term_%s' % signal_action)79 if not abort_signal and term_signal:80 raise Exception('Cannot run: one or more signals don\'t exist: '81 'abort: %s, term: %s' % (abort_signal, term_signal))82 return abort_signal, term_signal83 def call_and_record_action(self, action):84 setattr(self, '%s_start_dt' % (action,), datetime.now())85 self.save()86 # action_fn should be a classmethod, hence getattr explicitly on type87 action_fn = getattr(type(self), '_%s' % (action,))88 try:89 with self.log_setup_action(action):90 abort, term = self.get_signals(action)91 action_return = action_fn(self.state, self.children, abort, term)92 except Exception, e:93 setattr(self, '%s_error' % (action,), e.message)94 setattr(self, '%s_traceback' % (action,), repr(traceback.format_exc()))95 setattr(self, '%s_error_dt' % (action,), datetime.now())96 raise97 else:98 setattr(self, '%s_return' % (action,), action_return)99 setattr(self, '%s_return_dt' % (action,), datetime.now())100 finally:101 self.save()102 def log_setup_action(self, action):103 return NestedSetup(104 get_thread_handlers() +105 (FileHandler(log_filename(106 self.rollout_id, self.id, action), bubble=True),))107 def save(self):108 if self not in session.Session:109 session.Session.add(self)110 session.Session.commit()111 def __repr__(self):112 return '<%s: id=%s, rollout_id=%s, state=%s>' % (113 self.__class__.__name__, self.id, self.rollout_id, repr(self.state))114 def friendly_str(self):115 return repr(self)116 def friendly_html(self):117 from kettleweb.app import url_for118 inner = [self.friendly_str()]119 for action in 'run', 'revert':120 if not getattr(self, '%s_start_dt' % action):121 continue122 log_url = url_for(123 'log_view', rollout_id=self.rollout_id, args='/'.join(124 map(str, (self.id, action))))125 log_link = '<a href="{url}">{action}</a>'.format(126 url=log_url, action='%s log' % action.title())127 inner.append(log_link)128 return '<span class="task {class_}">{inner}</span>'.format(129 class_=self.status(), inner=' '.join(inner))130 def status(self):131 if not self.run_start_dt:132 return 'not_started'133 else:134 if not self.revert_start_dt:135 if not self.run_return_dt:136 return 'started'137 else:138 return 'finished'139 else:140 if not self.revert_return_dt:141 return 'rolling_back'142 else:143 return 'rolled_back'144class ExecTask(Task):145 desc_string = ''146 def _init(self, children, *args, **kwargs):147 self.save()148 for child in children:149 child.parent = self150 child.save()151 @classmethod152 def _run(cls, state, children, abort, term):153 cls.exec_forwards(state, children, abort, term)154 @classmethod155 def _revert(cls, state, children, abort, term):156 cls.exec_backwards(state, children, abort, term)157 def friendly_str(self):158 return '%s%s' %\159 (type(self).desc_string, ''.join(['\n\t%s' % child.friendly_str() for child in self.children]))160 def friendly_html(self):161 return '%s<ul>%s</ul>' %\162 (type(self).desc_string,163 ''.join(['<li>%s</li>' % child.friendly_html() for child in self.children]))164class SequentialExecTask(ExecTask):165 def _init(self, children, *args, **kwargs):166 super(SequentialExecTask, self)._init(children, *args, **kwargs)167 self.state['task_order'] = [child.id for child in children]168 @classmethod169 def exec_forwards(cls, state, tasks, abort, term):170 cls.exec_tasks('run_threaded', state['task_order'], tasks, abort, term)171 @classmethod172 def exec_backwards(cls, state, tasks, abort, term):173 run_tasks = [t for t in tasks if t.run_start_dt]174 run_task_ids = set(t.id for t in run_tasks)175 task_order = [t_id for t_id in reversed(state['task_order']) if t_id in run_task_ids]176 cls.exec_tasks('revert_threaded', task_order, run_tasks, abort, term)177 @staticmethod178 def exec_tasks(method_name, task_order, tasks, abort, term):179 task_ids = {task.id: task for task in tasks}180 for task_id in task_order:181 if abort.is_set() or term.is_set():182 break183 task = task_ids.pop(task_id)184 thread = getattr(task, method_name)(abort)185 thread_wait(thread, abort)186 if thread.exc_info is not None:187 raise Exception('Caught exception while executing task %s: %s' %188 (task, thread.exc_info))189 else:190 assert not task_ids, ("SequentialExecTask has children that are not "191 "in its task_order: %s" % (task_ids,))192 def friendly_html(self):193 task_order = self.state['task_order']194 child_ids = {c.id: c for c in self.children}195 return '%s<ul>%s</ul>' %\196 (type(self).desc_string,197 ''.join(['<li>%s</li>' % child_ids[id].friendly_html() for id in task_order]))198class ParallelExecTask(ExecTask):199 desc_string = 'Execute in parallel:'200 @classmethod201 def exec_forwards(cls, state, tasks, abort, term):202 cls.exec_tasks('run_threaded', tasks, abort, term)203 @classmethod204 def exec_backwards(cls, state, tasks, abort, term):205 cls.exec_tasks('revert_threaded', [t for t in tasks if t.run_start_dt],206 abort, term)207 @staticmethod208 def exec_tasks(method_name, tasks, abort, term):209 threads = []210 for task in tasks:211 if abort.is_set() or term.is_set():212 break213 thread = getattr(task, method_name)(abort)214 threads.append(thread)215 for thread in threads:216 thread_wait(thread, abort)217 if thread.exc_info is not None:218 raise Exception('Caught exception while executing task %s: %s' %219 (thread.target, thread.exc_info))220class DelayTask(Task):221 def _init(self, minutes=0, seconds=0, reversible=False):222 self.state.update(223 minutes=minutes,224 seconds=seconds,225 reversible=reversible)226 @classmethod227 def _run(cls, state, children, abort, term):228 cls.wait(cls.get_secs(state), abort=abort, term=term)229 @classmethod230 def _revert(cls, state, children, abort, term):231 if state['reversible']:232 cls.wait(cls.get_secs(state), abort=abort, term=term)233 @classmethod234 def wait(cls, secs, abort, term):235 logbook.info('Waiting for %s' % (cls.min_sec_str(secs),),)236 for i in xrange(int(secs)):237 if abort.is_set() or term.is_set():238 break239 sleep(1)240 def friendly_str(self):241 delay_secs = self.get_secs(self.state)242 delay_str = self.min_sec_str(delay_secs)243 if self.run_start_dt and not self.run_return_dt:244 remaining_secs = delay_secs - (datetime.now() - self.run_start_dt).seconds245 remaining_str = ' (%s left)' % self.min_sec_str(remaining_secs)246 else:247 remaining_str = ''248 rev_str = ' (reversible)' if self.state['reversible'] else ''249 return 'Delay for {delay_str}{remaining_str}{rev_str}'.format(250 delay_str=delay_str, remaining_str=remaining_str, rev_str=rev_str)251 @staticmethod252 def min_sec_str(secs):253 mins = secs / 60254 secs = secs % 60255 if mins:256 return '%d:%02d mins' % (mins, secs)257 else:258 return '%d secs' % (secs,)259 @staticmethod260 def get_secs(state):261 return (state.get('minutes', 0) * 60) + state.get('seconds', 0)262def subprocess_run(command, abort, term, log=True, **kwargs):263 if log:264 logbook.info(command)265 p = Popen(266 args=command,267 stdout=PIPE,268 stderr=STDOUT,269 **kwargs270 )271 outputs = []272 for line in iter(p.stdout.readline, ''):273 if term and term.is_set():274 term = None # Don't spam calls to terminate275 p.terminate()276 logbook.info('Caught TERM signal: stopping')277 line = line.strip()278 outputs.append(line)279 if log:280 logbook.info(line)281 returncode = p.wait()282 if returncode == 0:283 return outputs284 else:285 if abort:286 abort.set()287 exc = Exception(command, outputs, returncode)...
abort.py
Source:abort.py
1"""2Test the server sasl aborting at different stages3"""4from servicetest import EventPattern5from gabbletest import exec_test, call_async6import constants as cs7from saslutil import SaslEventAuthenticator, connect_and_get_sasl_channel, \8 abort_auth9JID = "test@example.org"10PASSWORD = "pass"11EXCHANGE = [("", "remote challenge"),12 ("Another step", "Here we go"),13 ("local response", "")]14MECHANISMS = ["PLAIN", "DIGEST-MD5", "ABORT-TEST"]15def test_abort_early(q, bus, conn, stream):16 chan, props = connect_and_get_sasl_channel(q, bus, conn)17 abort_auth(q, chan, 31337, "maybe if I use an undefined code you'll crash")18def start_mechanism(q, bus, conn,19 mechanism="ABORT-TEST", initial_response=EXCHANGE[0][1]):20 chan, props = connect_and_get_sasl_channel(q, bus, conn)21 chan.SASLAuthentication.StartMechanismWithData(mechanism, initial_response)22 q.expect('dbus-signal', signal='SASLStatusChanged',23 interface=cs.CHANNEL_IFACE_SASL_AUTH,24 args=[cs.SASL_STATUS_IN_PROGRESS, '', {}])25 e = q.expect('sasl-auth', initial_response=initial_response)26 return chan, e.authenticator27def test_abort_mid(q, bus, conn, stream):28 chan, authenticator = start_mechanism(q, bus, conn)29 authenticator.challenge(EXCHANGE[1][0])30 q.expect('dbus-signal', signal='NewChallenge',31 interface=cs.CHANNEL_IFACE_SASL_AUTH,32 args=[EXCHANGE[1][0]])33 abort_auth(q, chan, cs.SASL_ABORT_REASON_INVALID_CHALLENGE,34 "wrong data from server")35def test_disconnect_mid(q, bus, conn, stream):36 chan, authenticator = start_mechanism(q, bus, conn)37 authenticator.challenge(EXCHANGE[1][0])38 q.expect('dbus-signal', signal='NewChallenge',39 interface=cs.CHANNEL_IFACE_SASL_AUTH,40 args=[EXCHANGE[1][0]])41 call_async(q, conn, 'Disconnect')42 q.expect_many(EventPattern('dbus-signal', signal='StatusChanged',43 args=[cs.CONN_STATUS_DISCONNECTED,44 cs.CSR_REQUESTED]),45 EventPattern('dbus-return', method='Disconnect'))46def test_abort_connected(q, bus, conn, stream):47 initial_response = '\0' + JID.split('@')[0] + '\0' + PASSWORD48 chan, authenticator = start_mechanism(q, bus, conn,49 mechanism='PLAIN',50 initial_response=initial_response)51 authenticator.success(None)52 q.expect('dbus-signal', signal='SASLStatusChanged',53 interface=cs.CHANNEL_IFACE_SASL_AUTH,54 args=[cs.SASL_STATUS_SERVER_SUCCEEDED, '', {}])55 chan.SASLAuthentication.AcceptSASL()56 q.expect('dbus-signal', signal='SASLStatusChanged',57 interface=cs.CHANNEL_IFACE_SASL_AUTH,58 args=[cs.SASL_STATUS_SUCCEEDED, '', {}])59 q.expect('dbus-signal', signal='StatusChanged',60 args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])61 call_async(q, chan.SASLAuthentication, 'AbortSASL',62 cs.SASL_ABORT_REASON_USER_ABORT, "aborting too late")63 q.expect('dbus-error', method='AbortSASL', name=cs.NOT_AVAILABLE)64 chan.Close()65def test_give_up_while_waiting(q, bus, conn, stream,66 channel_method,67 authenticator_method):68 """Regression test for https://bugs.freedesktop.org/show_bug.cgi?id=5214669 where closing the auth channel while waiting for a challenge from the70 server would not abort the SASL exchange, and the challenge subsequently71 arriving would make Gabble assert.72 """73 chan, authenticator = start_mechanism(q, bus, conn)74 # While Gabble is waiting for waiting for the server to make the next move,75 # a Telepathy client does something to try to end the authentication76 # process.77 channel_method(chan)78 # And then we hear something back from the server.79 authenticator_method(authenticator)80 # FIXME: Gabble should probably send <abort/> and wait for the server to81 # say <failure><aborted/> rather than unceremoniously closing the82 # connection.83 #84 # In the bug we're testing for, the stream connection would indeed be lost,85 # but Gabble would also crash and leave a core dump behind. So this test86 # would appear to pass, but 'make check' would fail as we want.87 q.expect_many(88 EventPattern('stream-connection-lost'),89 EventPattern('dbus-signal', signal='ConnectionError'),90 EventPattern(91 'dbus-signal', signal="StatusChanged",92 args=[cs.CONN_STATUS_DISCONNECTED,93 cs.CSR_AUTHENTICATION_FAILED]),94 )95def test_close_then_challenge(q, bus, conn, stream):96 """This is the specific scenario for which 52146 was reported. The channel97 was being closed because its handler crashed.98 """99 test_give_up_while_waiting(q, bus, conn, stream,100 lambda chan: chan.Close(),101 lambda authenticator: authenticator.challenge(EXCHANGE[1][0]))102def test_close_then_success(q, bus, conn, stream):103 test_give_up_while_waiting(q, bus, conn, stream,104 lambda chan: chan.Close(),105 lambda authenticator: authenticator.success())106def test_close_then_failure(q, bus, conn, stream):107 test_give_up_while_waiting(q, bus, conn, stream,108 lambda chan: chan.Close(),109 lambda authenticator: authenticator.not_authorized())110def test_abort_then_challenge(q, bus, conn, stream):111 test_give_up_while_waiting(q, bus, conn, stream,112 lambda chan: chan.SASLAuthentication.AbortSASL(113 cs.SASL_ABORT_REASON_USER_ABORT, "bored now"),114 lambda authenticator: authenticator.challenge(EXCHANGE[1][0]))115def test_abort_then_success(q, bus, conn, stream):116 """FIXME: this test fails because the channel changes its state from117 TP_SASL_STATUS_CLIENT_FAILED to TP_SASL_STATUS_SERVER_SUCCEEDED and waits118 for the client to ack it when <success> arrives, which is dumb.119 """120 test_give_up_while_waiting(q, bus, conn, stream,121 lambda chan: chan.SASLAuthentication.AbortSASL(122 cs.SASL_ABORT_REASON_USER_ABORT, "bored now"),123 lambda authenticator: authenticator.success())124def test_abort_then_failure(q, bus, conn, stream):125 test_give_up_while_waiting(q, bus, conn, stream,126 lambda chan: chan.SASLAuthentication.AbortSASL(127 cs.SASL_ABORT_REASON_USER_ABORT, "bored now"),128 lambda authenticator: authenticator.not_authorized())129def abort_and_close(chan):130 chan.SASLAuthentication.AbortSASL(131 cs.SASL_ABORT_REASON_USER_ABORT, "bored now")132 chan.Close()133def test_abort_and_close_then_challenge(q, bus, conn, stream):134 test_give_up_while_waiting(q, bus, conn, stream,135 abort_and_close,136 lambda authenticator: authenticator.challenge(EXCHANGE[1][0]))137def test_abort_and_close_then_failure(q, bus, conn, stream):138 test_give_up_while_waiting(q, bus, conn, stream,139 abort_and_close,140 lambda authenticator: authenticator.not_authorized())141def exec_test_(func):142 # Can't use functools.partial, because the authenticator is stateful.143 authenticator = SaslEventAuthenticator(JID.split('@')[0], MECHANISMS)144 exec_test(func, do_connect=False, authenticator=authenticator,145 params={'password': None,146 'account' : JID,147 })148if __name__ == '__main__':149 exec_test_(test_abort_early)150 exec_test_(test_abort_mid)151 exec_test_(test_disconnect_mid)152 exec_test_(test_abort_connected)153 exec_test_(test_close_then_challenge)154 exec_test_(test_close_then_success)155 exec_test_(test_close_then_failure)156 exec_test_(test_abort_then_challenge)157 # exec_test_(test_abort_then_success)158 exec_test_(test_abort_then_failure)159 exec_test_(test_abort_and_close_then_challenge)...
misc_test.py
Source:misc_test.py
...3import webob.exc4import webapp25import test_base6class TestMiscellaneous(test_base.BaseTestCase):7 def test_abort(self):8 self.assertRaises(webob.exc.HTTPOk, webapp2.abort, 200)9 self.assertRaises(webob.exc.HTTPCreated, webapp2.abort, 201)10 self.assertRaises(webob.exc.HTTPAccepted, webapp2.abort, 202)11 self.assertRaises(webob.exc.HTTPNonAuthoritativeInformation, webapp2.abort, 203)12 self.assertRaises(webob.exc.HTTPNoContent, webapp2.abort, 204)13 self.assertRaises(webob.exc.HTTPResetContent, webapp2.abort, 205)14 self.assertRaises(webob.exc.HTTPPartialContent, webapp2.abort, 206)15 self.assertRaises(webob.exc.HTTPMultipleChoices, webapp2.abort, 300)16 self.assertRaises(webob.exc.HTTPMovedPermanently, webapp2.abort, 301)17 self.assertRaises(webob.exc.HTTPFound, webapp2.abort, 302)18 self.assertRaises(webob.exc.HTTPSeeOther, webapp2.abort, 303)19 self.assertRaises(webob.exc.HTTPNotModified, webapp2.abort, 304)20 self.assertRaises(webob.exc.HTTPUseProxy, webapp2.abort, 305)21 self.assertRaises(webob.exc.HTTPTemporaryRedirect, webapp2.abort, 307)...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!