Best Python code snippet using tox_python
_strategy.py
Source:_strategy.py
...225 success = False226 reason = ("apply of stage id %s not supported during an abort"227 % stage_id)228 return success, reason229 def _abort(self, stage_id=None):230 """231 Strategy Abort232 """233 if STRATEGY_PHASE.APPLY == self._current_phase:234 if stage_id is not None:235 if not self.apply_phase.is_inprogress() or \236 stage_id != self.apply_phase.current_stage:237 reason = "apply not inprogress for stage id %s" % stage_id238 return False, reason239 if self._state in [STRATEGY_STATE.APPLYING, STRATEGY_STATE.APPLY_FAILED,240 STRATEGY_STATE.APPLY_TIMEOUT]:241 self._state = STRATEGY_STATE.ABORTING242 abort_phase = self.apply_phase.abort()243 if not abort_phase:244 abort_phase = StrategyPhase(STRATEGY_PHASE.ABORT)245 abort_phase.strategy = self246 self._phase[STRATEGY_PHASE.ABORT] = abort_phase247 # In the case of a single stage apply, if we are not currently248 # applying anything, we need to go to the aborted state now.249 if self.apply_phase.current_stage == self.apply_phase.stop_at_stage:250 self._state = STRATEGY_STATE.ABORTED251 self.abort_complete(STRATEGY_RESULT.ABORTED, "")252 elif STRATEGY_STATE.APPLIED != self._state:253 self._state = STRATEGY_STATE.ABORTED254 else:255 reason = "apply not inprogress"256 return False, reason257 else:258 reason = "apply not inprogress"259 return False, reason260 return True, ''261 def _handle_event(self, event, event_data=None):262 """263 Strategy Handle Event264 """265 handled = False266 if STRATEGY_STATE.BUILDING == self._state:267 if STRATEGY_PHASE.BUILD == self._current_phase:268 handled = self.build_phase.handle_event(event, event_data)269 elif STRATEGY_STATE.APPLYING == self._state:270 if STRATEGY_PHASE.APPLY == self._current_phase:271 handled = self.apply_phase.handle_event(event, event_data)272 elif STRATEGY_STATE.ABORTING == self._state:273 if STRATEGY_PHASE.APPLY == self._current_phase:274 handled = self.apply_phase.handle_event(event, event_data)275 elif STRATEGY_PHASE.ABORT == self._current_phase:276 handled = self.abort_phase.handle_event(event, event_data)277 return handled278 def phase_save(self):279 """280 Strategy Phase Save281 """282 self.save()283 def phase_extend_timeout(self, phase):284 """285 Strategy Phase Extend Timeout286 """287 phase.refresh_timeouts()288 def phase_complete(self, phase, phase_result, phase_result_reason=None):289 """290 Strategy Phase Complete291 """292 self.save()293 result, result_reason = \294 strategy_result_update(STRATEGY_RESULT.INITIAL, '',295 phase_result, phase_result_reason)296 if STRATEGY_STATE.BUILDING == self._state:297 if self._phase[STRATEGY_PHASE.BUILD] == phase:298 if phase.is_success() or phase.is_degraded():299 self._state = STRATEGY_STATE.READY_TO_APPLY300 self.build_complete(result, result_reason)301 elif phase.is_failed():302 self._state = STRATEGY_STATE.BUILD_FAILED303 self.build_complete(result, result_reason)304 elif phase.is_timed_out():305 self._state = STRATEGY_STATE.BUILD_TIMEOUT306 self.build_complete(result, result_reason)307 elif STRATEGY_STATE.APPLYING == self._state:308 if self._phase[STRATEGY_PHASE.APPLY] == phase:309 if phase.is_success() or phase.is_degraded():310 self._state = STRATEGY_STATE.APPLIED311 self.apply_complete(result, result_reason)312 elif phase.is_failed():313 self._state = STRATEGY_STATE.APPLY_FAILED314 self.apply_complete(result, result_reason)315 self._abort()316 self._current_phase = STRATEGY_PHASE.ABORT317 self.abort_phase.apply()318 elif phase.is_timed_out():319 self._state = STRATEGY_STATE.APPLY_TIMEOUT320 self.apply_complete(result, result_reason)321 self._abort()322 self._current_phase = STRATEGY_PHASE.ABORT323 self.abort_phase.apply()324 elif STRATEGY_STATE.ABORTING == self._state:325 if self._phase[STRATEGY_PHASE.APPLY] == phase:326 if phase.is_success() or phase.is_degraded():327 self._state = STRATEGY_STATE.APPLIED328 self.apply_complete(result, result_reason)329 elif phase.is_failed():330 self._state = STRATEGY_STATE.APPLY_FAILED331 self.apply_complete(result, result_reason)332 elif phase.is_timed_out():333 self._state = STRATEGY_STATE.APPLY_TIMEOUT334 self.apply_complete(result, result_reason)335 self._current_phase = STRATEGY_PHASE.ABORT336 self.abort_phase.apply()337 elif self._phase[STRATEGY_PHASE.ABORT] == phase:338 if phase.is_success() or phase.is_degraded():339 self._state = STRATEGY_STATE.ABORTED340 self.abort_complete(result, result_reason)341 elif phase.is_failed():342 self._state = STRATEGY_STATE.ABORT_FAILED343 self.abort_complete(result, result_reason)344 elif phase.is_timed_out():345 self._state = STRATEGY_STATE.ABORT_TIMEOUT346 self.abort_complete(result, result_reason)347 self.save()348 def refresh_timeouts(self):349 """350 Strategy Refresh Timeouts351 """352 self.build_phase.refresh_timeouts()353 self.apply_phase.refresh_timeouts()354 self.abort_phase.refresh_timeouts()355 def save(self):356 """357 Strategy Save (can be overridden by child class)358 """359 pass360 def build(self):361 """362 Strategy Build (can be overridden by child class)363 """364 self._build()365 self.save()366 def build_complete(self, result, result_reason):367 """368 Strategy Build Complete (can be overridden by child class)369 """370 self.save()371 return result, result_reason372 def apply(self, stage_id):373 """374 Strategy Apply (can be overridden by child class)375 """376 success, reason = self._apply(stage_id)377 self.save()378 return success, reason379 def apply_complete(self, result, result_reason):380 """381 Strategy Apply Complete (can be overridden by child class)382 """383 self.save()384 return result, result_reason385 def abort(self, stage_id):386 """387 Strategy Abort (can be overridden by child class)388 """389 success, reason = self._abort(stage_id)390 self.save()391 return success, reason392 def abort_complete(self, result, result_reason):393 """394 Strategy Abort Complete (can be overridden by child class)395 """396 self.save()397 return result, result_reason398 def handle_event(self, event, event_data=None):399 """400 Strategy Handle Event (can be overridden by child class)401 """402 return self._handle_event(event, event_data)403 def from_dict(self, data, build_phase=None, apply_phase=None, abort_phase=None):...
tasks.py
Source:tasks.py
1from datetime import datetime2from time import sleep3from subprocess import STDOUT, Popen, PIPE4import traceback5import logbook6from logbook import FileHandler, NestedSetup7from sqlalchemy import Column, DateTime, ForeignKey, Integer, String8from sqlalchemy.ext.declarative import declared_attr9from sqlalchemy.orm import relationship, backref10from db import Base, session11from db.fields import JSONEncodedDict12from log_utils import log_filename, get_thread_handlers13from thread_utils import make_exec_threaded, thread_wait14def action_fn(action):15 def do_action(instance):16 action_start_dt = getattr(instance, '%s_start_dt' % (action,), None)17 if not action_start_dt:18 instance.call_and_record_action(action)19 else:20 raise Exception('%s already started at %s' % 21 (action, action_start_dt))22 return do_action23class Task(Base):24 __tablename__ = 'task'25 id = Column(Integer, primary_key=True)26 type = Column(String(50), nullable=False)27 rollout_id = Column(Integer, ForeignKey('rollout.id'), nullable=False)28 parent_id = Column(Integer, ForeignKey('task.id'))29 state = Column(JSONEncodedDict(1000))30 run_start_dt = Column(DateTime)31 run_error = Column(String(500))32 run_error_dt = Column(DateTime)33 run_return = Column(String(500))34 run_return_dt = Column(DateTime)35 run_traceback = Column(String(1000))36 revert_start_dt = Column(DateTime)37 revert_error = Column(String(500))38 revert_error_dt = Column(DateTime)39 revert_return = Column(String(500))40 revert_return_dt = Column(DateTime)41 revert_traceback = Column(String(1000))42 rollout = relationship('Rollout', backref=backref('tasks', order_by=id))43 children = relationship('Task', backref=backref('parent', remote_side='Task.id', order_by=id))44 @declared_attr45 def __mapper_args__(cls):46 # Implementing single table inheritance47 if cls.__name__ == 'Task':48 return {'polymorphic_on': cls.type,}49 else:50 return {'polymorphic_identity': cls.__name__}51 def __init__(self, rollout_id, *args, **kwargs):52 self.rollout_id = rollout_id53 self.state = {}54 self._init(*args, **kwargs)55 @classmethod56 def _from_id(cls, id):57 return session.Session.query(cls).get(id)58 def _init(self, *args, **kwargs):59 pass60 @classmethod61 def _run(cls, state, children, abort, term):62 pass63 @classmethod64 def _revert(cls, state, children, abort, term):65 pass66 run = action_fn('run')67 __revert = action_fn('revert')68 def revert(self):69 if not self.run_start_dt:70 raise Exception('Cannot revert before running')71 self.__revert()72 run_threaded = make_exec_threaded('run')73 revert_threaded = make_exec_threaded('revert')74 def get_signals(self, action):75 from rollout import Rollout76 signal_action = {'run': 'rollout', 'revert': 'rollback'}[action]77 abort_signal = Rollout.get_signal(self.rollout_id, 'abort_%s' % signal_action)78 term_signal = Rollout.get_signal(self.rollout_id, 'term_%s' % signal_action)79 if not abort_signal and term_signal:80 raise Exception('Cannot run: one or more signals don\'t exist: '81 'abort: %s, term: %s' % (abort_signal, term_signal))82 return abort_signal, term_signal83 def call_and_record_action(self, action):84 setattr(self, '%s_start_dt' % (action,), datetime.now())85 self.save()86 # action_fn should be a classmethod, hence getattr explicitly on type87 action_fn = getattr(type(self), '_%s' % (action,))88 try:89 with self.log_setup_action(action):90 abort, term = self.get_signals(action)91 action_return = action_fn(self.state, self.children, abort, term)92 except Exception, e:93 setattr(self, '%s_error' % (action,), e.message)94 setattr(self, '%s_traceback' % (action,), repr(traceback.format_exc()))95 setattr(self, '%s_error_dt' % (action,), datetime.now())96 raise97 else:98 setattr(self, '%s_return' % (action,), action_return)99 setattr(self, '%s_return_dt' % (action,), datetime.now())100 finally:101 self.save()102 def log_setup_action(self, action):103 return NestedSetup(104 get_thread_handlers() +105 (FileHandler(log_filename(106 self.rollout_id, self.id, action), bubble=True),))107 def save(self):108 if self not in session.Session:109 session.Session.add(self)110 session.Session.commit()111 def __repr__(self):112 return '<%s: id=%s, rollout_id=%s, state=%s>' % (113 self.__class__.__name__, self.id, self.rollout_id, repr(self.state))114 def friendly_str(self):115 return repr(self)116 def friendly_html(self):117 from kettleweb.app import url_for118 inner = [self.friendly_str()]119 for action in 'run', 'revert':120 if not getattr(self, '%s_start_dt' % action):121 continue122 log_url = url_for(123 'log_view', rollout_id=self.rollout_id, args='/'.join(124 map(str, (self.id, action))))125 log_link = '<a href="{url}">{action}</a>'.format(126 url=log_url, action='%s log' % action.title())127 inner.append(log_link)128 return '<span class="task {class_}">{inner}</span>'.format(129 class_=self.status(), inner=' '.join(inner))130 def status(self):131 if not self.run_start_dt:132 return 'not_started'133 else:134 if not self.revert_start_dt:135 if not self.run_return_dt:136 return 'started'137 else:138 return 'finished'139 else:140 if not self.revert_return_dt:141 return 'rolling_back'142 else:143 return 'rolled_back'144class ExecTask(Task):145 desc_string = ''146 def _init(self, children, *args, **kwargs):147 self.save()148 for child in children:149 child.parent = self150 child.save()151 @classmethod152 def _run(cls, state, children, abort, term):153 cls.exec_forwards(state, children, abort, term)154 @classmethod155 def _revert(cls, state, children, abort, term):156 cls.exec_backwards(state, children, abort, term)157 def friendly_str(self):158 return '%s%s' %\159 (type(self).desc_string, ''.join(['\n\t%s' % child.friendly_str() for child in self.children]))160 def friendly_html(self):161 return '%s<ul>%s</ul>' %\162 (type(self).desc_string,163 ''.join(['<li>%s</li>' % child.friendly_html() for child in self.children]))164class SequentialExecTask(ExecTask):165 def _init(self, children, *args, **kwargs):166 super(SequentialExecTask, self)._init(children, *args, **kwargs)167 self.state['task_order'] = [child.id for child in children]168 @classmethod169 def exec_forwards(cls, state, tasks, abort, term):170 cls.exec_tasks('run_threaded', state['task_order'], tasks, abort, term)171 @classmethod172 def exec_backwards(cls, state, tasks, abort, term):173 run_tasks = [t for t in tasks if t.run_start_dt]174 run_task_ids = set(t.id for t in run_tasks)175 task_order = [t_id for t_id in reversed(state['task_order']) if t_id in run_task_ids]176 cls.exec_tasks('revert_threaded', task_order, run_tasks, abort, term)177 @staticmethod178 def exec_tasks(method_name, task_order, tasks, abort, term):179 task_ids = {task.id: task for task in tasks}180 for task_id in task_order:181 if abort.is_set() or term.is_set():182 break183 task = task_ids.pop(task_id)184 thread = getattr(task, method_name)(abort)185 thread_wait(thread, abort)186 if thread.exc_info is not None:187 raise Exception('Caught exception while executing task %s: %s' %188 (task, thread.exc_info))189 else:190 assert not task_ids, ("SequentialExecTask has children that are not "191 "in its task_order: %s" % (task_ids,))192 def friendly_html(self):193 task_order = self.state['task_order']194 child_ids = {c.id: c for c in self.children}195 return '%s<ul>%s</ul>' %\196 (type(self).desc_string,197 ''.join(['<li>%s</li>' % child_ids[id].friendly_html() for id in task_order]))198class ParallelExecTask(ExecTask):199 desc_string = 'Execute in parallel:'200 @classmethod201 def exec_forwards(cls, state, tasks, abort, term):202 cls.exec_tasks('run_threaded', tasks, abort, term)203 @classmethod204 def exec_backwards(cls, state, tasks, abort, term):205 cls.exec_tasks('revert_threaded', [t for t in tasks if t.run_start_dt],206 abort, term)207 @staticmethod208 def exec_tasks(method_name, tasks, abort, term):209 threads = []210 for task in tasks:211 if abort.is_set() or term.is_set():212 break213 thread = getattr(task, method_name)(abort)214 threads.append(thread)215 for thread in threads:216 thread_wait(thread, abort)217 if thread.exc_info is not None:218 raise Exception('Caught exception while executing task %s: %s' %219 (thread.target, thread.exc_info))220class DelayTask(Task):221 def _init(self, minutes=0, seconds=0, reversible=False):222 self.state.update(223 minutes=minutes,224 seconds=seconds,225 reversible=reversible)226 @classmethod227 def _run(cls, state, children, abort, term):228 cls.wait(cls.get_secs(state), abort=abort, term=term)229 @classmethod230 def _revert(cls, state, children, abort, term):231 if state['reversible']:232 cls.wait(cls.get_secs(state), abort=abort, term=term)233 @classmethod234 def wait(cls, secs, abort, term):235 logbook.info('Waiting for %s' % (cls.min_sec_str(secs),),)236 for i in xrange(int(secs)):237 if abort.is_set() or term.is_set():238 break239 sleep(1)240 def friendly_str(self):241 delay_secs = self.get_secs(self.state)242 delay_str = self.min_sec_str(delay_secs)243 if self.run_start_dt and not self.run_return_dt:244 remaining_secs = delay_secs - (datetime.now() - self.run_start_dt).seconds245 remaining_str = ' (%s left)' % self.min_sec_str(remaining_secs)246 else:247 remaining_str = ''248 rev_str = ' (reversible)' if self.state['reversible'] else ''249 return 'Delay for {delay_str}{remaining_str}{rev_str}'.format(250 delay_str=delay_str, remaining_str=remaining_str, rev_str=rev_str)251 @staticmethod252 def min_sec_str(secs):253 mins = secs / 60254 secs = secs % 60255 if mins:256 return '%d:%02d mins' % (mins, secs)257 else:258 return '%d secs' % (secs,)259 @staticmethod260 def get_secs(state):261 return (state.get('minutes', 0) * 60) + state.get('seconds', 0)262def subprocess_run(command, abort, term, log=True, **kwargs):263 if log:264 logbook.info(command)265 p = Popen(266 args=command,267 stdout=PIPE,268 stderr=STDOUT,269 **kwargs270 )271 outputs = []272 for line in iter(p.stdout.readline, ''):273 if term and term.is_set():274 term = None # Don't spam calls to terminate275 p.terminate()276 logbook.info('Caught TERM signal: stopping')277 line = line.strip()278 outputs.append(line)279 if log:280 logbook.info(line)281 returncode = p.wait()282 if returncode == 0:283 return outputs284 else:285 if abort:286 abort.set()287 exc = Exception(command, outputs, returncode)...
abort.py
Source:abort.py
1"""2Test the server sasl aborting at different stages3"""4from servicetest import EventPattern5from gabbletest import exec_test, call_async6import constants as cs7from saslutil import SaslEventAuthenticator, connect_and_get_sasl_channel, \8 abort_auth9JID = "test@example.org"10PASSWORD = "pass"11EXCHANGE = [("", "remote challenge"),12 ("Another step", "Here we go"),13 ("local response", "")]14MECHANISMS = ["PLAIN", "DIGEST-MD5", "ABORT-TEST"]15def test_abort_early(q, bus, conn, stream):16 chan, props = connect_and_get_sasl_channel(q, bus, conn)17 abort_auth(q, chan, 31337, "maybe if I use an undefined code you'll crash")18def start_mechanism(q, bus, conn,19 mechanism="ABORT-TEST", initial_response=EXCHANGE[0][1]):20 chan, props = connect_and_get_sasl_channel(q, bus, conn)21 chan.SASLAuthentication.StartMechanismWithData(mechanism, initial_response)22 q.expect('dbus-signal', signal='SASLStatusChanged',23 interface=cs.CHANNEL_IFACE_SASL_AUTH,24 args=[cs.SASL_STATUS_IN_PROGRESS, '', {}])25 e = q.expect('sasl-auth', initial_response=initial_response)26 return chan, e.authenticator27def test_abort_mid(q, bus, conn, stream):28 chan, authenticator = start_mechanism(q, bus, conn)29 authenticator.challenge(EXCHANGE[1][0])30 q.expect('dbus-signal', signal='NewChallenge',31 interface=cs.CHANNEL_IFACE_SASL_AUTH,32 args=[EXCHANGE[1][0]])33 abort_auth(q, chan, cs.SASL_ABORT_REASON_INVALID_CHALLENGE,34 "wrong data from server")35def test_disconnect_mid(q, bus, conn, stream):36 chan, authenticator = start_mechanism(q, bus, conn)37 authenticator.challenge(EXCHANGE[1][0])38 q.expect('dbus-signal', signal='NewChallenge',39 interface=cs.CHANNEL_IFACE_SASL_AUTH,40 args=[EXCHANGE[1][0]])41 call_async(q, conn, 'Disconnect')42 q.expect_many(EventPattern('dbus-signal', signal='StatusChanged',43 args=[cs.CONN_STATUS_DISCONNECTED,44 cs.CSR_REQUESTED]),45 EventPattern('dbus-return', method='Disconnect'))46def test_abort_connected(q, bus, conn, stream):47 initial_response = '\0' + JID.split('@')[0] + '\0' + PASSWORD48 chan, authenticator = start_mechanism(q, bus, conn,49 mechanism='PLAIN',50 initial_response=initial_response)51 authenticator.success(None)52 q.expect('dbus-signal', signal='SASLStatusChanged',53 interface=cs.CHANNEL_IFACE_SASL_AUTH,54 args=[cs.SASL_STATUS_SERVER_SUCCEEDED, '', {}])55 chan.SASLAuthentication.AcceptSASL()56 q.expect('dbus-signal', signal='SASLStatusChanged',57 interface=cs.CHANNEL_IFACE_SASL_AUTH,58 args=[cs.SASL_STATUS_SUCCEEDED, '', {}])59 q.expect('dbus-signal', signal='StatusChanged',60 args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])61 call_async(q, chan.SASLAuthentication, 'AbortSASL',62 cs.SASL_ABORT_REASON_USER_ABORT, "aborting too late")63 q.expect('dbus-error', method='AbortSASL', name=cs.NOT_AVAILABLE)64 chan.Close()65def test_give_up_while_waiting(q, bus, conn, stream,66 channel_method,67 authenticator_method):68 """Regression test for https://bugs.freedesktop.org/show_bug.cgi?id=5214669 where closing the auth channel while waiting for a challenge from the70 server would not abort the SASL exchange, and the challenge subsequently71 arriving would make Gabble assert.72 """73 chan, authenticator = start_mechanism(q, bus, conn)74 # While Gabble is waiting for waiting for the server to make the next move,75 # a Telepathy client does something to try to end the authentication76 # process.77 channel_method(chan)78 # And then we hear something back from the server.79 authenticator_method(authenticator)80 # FIXME: Gabble should probably send <abort/> and wait for the server to81 # say <failure><aborted/> rather than unceremoniously closing the82 # connection.83 #84 # In the bug we're testing for, the stream connection would indeed be lost,85 # but Gabble would also crash and leave a core dump behind. So this test86 # would appear to pass, but 'make check' would fail as we want.87 q.expect_many(88 EventPattern('stream-connection-lost'),89 EventPattern('dbus-signal', signal='ConnectionError'),90 EventPattern(91 'dbus-signal', signal="StatusChanged",92 args=[cs.CONN_STATUS_DISCONNECTED,93 cs.CSR_AUTHENTICATION_FAILED]),94 )95def test_close_then_challenge(q, bus, conn, stream):96 """This is the specific scenario for which 52146 was reported. The channel97 was being closed because its handler crashed.98 """99 test_give_up_while_waiting(q, bus, conn, stream,100 lambda chan: chan.Close(),101 lambda authenticator: authenticator.challenge(EXCHANGE[1][0]))102def test_close_then_success(q, bus, conn, stream):103 test_give_up_while_waiting(q, bus, conn, stream,104 lambda chan: chan.Close(),105 lambda authenticator: authenticator.success())106def test_close_then_failure(q, bus, conn, stream):107 test_give_up_while_waiting(q, bus, conn, stream,108 lambda chan: chan.Close(),109 lambda authenticator: authenticator.not_authorized())110def test_abort_then_challenge(q, bus, conn, stream):111 test_give_up_while_waiting(q, bus, conn, stream,112 lambda chan: chan.SASLAuthentication.AbortSASL(113 cs.SASL_ABORT_REASON_USER_ABORT, "bored now"),114 lambda authenticator: authenticator.challenge(EXCHANGE[1][0]))115def test_abort_then_success(q, bus, conn, stream):116 """FIXME: this test fails because the channel changes its state from117 TP_SASL_STATUS_CLIENT_FAILED to TP_SASL_STATUS_SERVER_SUCCEEDED and waits118 for the client to ack it when <success> arrives, which is dumb.119 """120 test_give_up_while_waiting(q, bus, conn, stream,121 lambda chan: chan.SASLAuthentication.AbortSASL(122 cs.SASL_ABORT_REASON_USER_ABORT, "bored now"),123 lambda authenticator: authenticator.success())124def test_abort_then_failure(q, bus, conn, stream):125 test_give_up_while_waiting(q, bus, conn, stream,126 lambda chan: chan.SASLAuthentication.AbortSASL(127 cs.SASL_ABORT_REASON_USER_ABORT, "bored now"),128 lambda authenticator: authenticator.not_authorized())129def abort_and_close(chan):130 chan.SASLAuthentication.AbortSASL(131 cs.SASL_ABORT_REASON_USER_ABORT, "bored now")132 chan.Close()133def test_abort_and_close_then_challenge(q, bus, conn, stream):134 test_give_up_while_waiting(q, bus, conn, stream,135 abort_and_close,136 lambda authenticator: authenticator.challenge(EXCHANGE[1][0]))137def test_abort_and_close_then_failure(q, bus, conn, stream):138 test_give_up_while_waiting(q, bus, conn, stream,139 abort_and_close,140 lambda authenticator: authenticator.not_authorized())141def exec_test_(func):142 # Can't use functools.partial, because the authenticator is stateful.143 authenticator = SaslEventAuthenticator(JID.split('@')[0], MECHANISMS)144 exec_test(func, do_connect=False, authenticator=authenticator,145 params={'password': None,146 'account' : JID,147 })148if __name__ == '__main__':149 exec_test_(test_abort_early)150 exec_test_(test_abort_mid)151 exec_test_(test_disconnect_mid)152 exec_test_(test_abort_connected)153 exec_test_(test_close_then_challenge)154 exec_test_(test_close_then_success)155 exec_test_(test_close_then_failure)156 exec_test_(test_abort_then_challenge)157 # exec_test_(test_abort_then_success)158 exec_test_(test_abort_then_failure)159 exec_test_(test_abort_and_close_then_challenge)...
misc_test.py
Source:misc_test.py
...3import webob.exc4import webapp25import test_base6class TestMiscellaneous(test_base.BaseTestCase):7 def test_abort(self):8 self.assertRaises(webob.exc.HTTPOk, webapp2.abort, 200)9 self.assertRaises(webob.exc.HTTPCreated, webapp2.abort, 201)10 self.assertRaises(webob.exc.HTTPAccepted, webapp2.abort, 202)11 self.assertRaises(webob.exc.HTTPNonAuthoritativeInformation, webapp2.abort, 203)12 self.assertRaises(webob.exc.HTTPNoContent, webapp2.abort, 204)13 self.assertRaises(webob.exc.HTTPResetContent, webapp2.abort, 205)14 self.assertRaises(webob.exc.HTTPPartialContent, webapp2.abort, 206)15 self.assertRaises(webob.exc.HTTPMultipleChoices, webapp2.abort, 300)16 self.assertRaises(webob.exc.HTTPMovedPermanently, webapp2.abort, 301)17 self.assertRaises(webob.exc.HTTPFound, webapp2.abort, 302)18 self.assertRaises(webob.exc.HTTPSeeOther, webapp2.abort, 303)19 self.assertRaises(webob.exc.HTTPNotModified, webapp2.abort, 304)20 self.assertRaises(webob.exc.HTTPUseProxy, webapp2.abort, 305)21 self.assertRaises(webob.exc.HTTPTemporaryRedirect, webapp2.abort, 307)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!