Best Python code snippet using localstack_python
assertsql.py
Source:assertsql.py
1# testing/assertsql.py2# Copyright (C) 2005-2020 the SQLAlchemy authors and contributors3# <see AUTHORS file>4#5# This module is part of SQLAlchemy and is released under6# the MIT License: http://www.opensource.org/licenses/mit-license.php7import collections8import contextlib9import re10from .. import event11from .. import util12from ..engine import url13from ..engine.default import DefaultDialect14from ..engine.util import _distill_params15from ..schema import _DDLCompiles16class AssertRule(object):17 is_consumed = False18 errormessage = None19 consume_statement = True20 def process_statement(self, execute_observed):21 pass22 def no_more_statements(self):23 assert False, (24 "All statements are complete, but pending "25 "assertion rules remain"26 )27class SQLMatchRule(AssertRule):28 pass29class CursorSQL(SQLMatchRule):30 consume_statement = False31 def __init__(self, statement, params=None):32 self.statement = statement33 self.params = params34 def process_statement(self, execute_observed):35 stmt = execute_observed.statements[0]36 if self.statement != stmt.statement or (37 self.params is not None and self.params != stmt.parameters38 ):39 self.errormessage = (40 "Testing for exact SQL %s parameters %s received %s %s"41 % (42 self.statement,43 self.params,44 stmt.statement,45 stmt.parameters,46 )47 )48 else:49 execute_observed.statements.pop(0)50 self.is_consumed = True51 if not execute_observed.statements:52 self.consume_statement = True53class CompiledSQL(SQLMatchRule):54 def __init__(self, statement, params=None, dialect="default"):55 self.statement = statement56 self.params = params57 self.dialect = dialect58 def _compare_sql(self, execute_observed, received_statement):59 stmt = re.sub(r"[\n\t]", "", self.statement)60 return received_statement == stmt61 def _compile_dialect(self, execute_observed):62 if self.dialect == "default":63 return DefaultDialect()64 else:65 # ugh66 if self.dialect == "postgresql":67 params = {"implicit_returning": True}68 else:69 params = {}70 return url.URL(self.dialect).get_dialect()(**params)71 def _received_statement(self, execute_observed):72 """reconstruct the statement and params in terms73 of a target dialect, which for CompiledSQL is just DefaultDialect."""74 context = execute_observed.context75 compare_dialect = self._compile_dialect(execute_observed)76 if isinstance(context.compiled.statement, _DDLCompiles):77 compiled = context.compiled.statement.compile(78 dialect=compare_dialect,79 schema_translate_map=context.execution_options.get(80 "schema_translate_map"81 ),82 )83 else:84 compiled = context.compiled.statement.compile(85 dialect=compare_dialect,86 column_keys=context.compiled.column_keys,87 inline=context.compiled.inline,88 schema_translate_map=context.execution_options.get(89 "schema_translate_map"90 ),91 )92 _received_statement = re.sub(r"[\n\t]", "", util.text_type(compiled))93 parameters = execute_observed.parameters94 if not parameters:95 _received_parameters = [compiled.construct_params()]96 else:97 _received_parameters = [98 compiled.construct_params(m) for m in parameters99 ]100 return _received_statement, _received_parameters101 def process_statement(self, execute_observed):102 context = execute_observed.context103 _received_statement, _received_parameters = self._received_statement(104 execute_observed105 )106 params = self._all_params(context)107 equivalent = self._compare_sql(execute_observed, _received_statement)108 if equivalent:109 if params is not None:110 all_params = list(params)111 all_received = list(_received_parameters)112 while all_params and all_received:113 param = dict(all_params.pop(0))114 for idx, received in enumerate(list(all_received)):115 # do a positive compare only116 for param_key in param:117 # a key in param did not match current118 # 'received'119 if (120 param_key not in received121 or received[param_key] != param[param_key]122 ):123 break124 else:125 # all keys in param matched 'received';126 # onto next param127 del all_received[idx]128 break129 else:130 # param did not match any entry131 # in all_received132 equivalent = False133 break134 if all_params or all_received:135 equivalent = False136 if equivalent:137 self.is_consumed = True138 self.errormessage = None139 else:140 self.errormessage = self._failure_message(params) % {141 "received_statement": _received_statement,142 "received_parameters": _received_parameters,143 }144 def _all_params(self, context):145 if self.params:146 if util.callable(self.params):147 params = self.params(context)148 else:149 params = self.params150 if not isinstance(params, list):151 params = [params]152 return params153 else:154 return None155 def _failure_message(self, expected_params):156 return (157 "Testing for compiled statement %r partial params %s, "158 "received %%(received_statement)r with params "159 "%%(received_parameters)r"160 % (161 self.statement.replace("%", "%%"),162 repr(expected_params).replace("%", "%%"),163 )164 )165class RegexSQL(CompiledSQL):166 def __init__(self, regex, params=None):167 SQLMatchRule.__init__(self)168 self.regex = re.compile(regex)169 self.orig_regex = regex170 self.params = params171 self.dialect = "default"172 def _failure_message(self, expected_params):173 return (174 "Testing for compiled statement ~%r partial params %s, "175 "received %%(received_statement)r with params "176 "%%(received_parameters)r"177 % (178 self.orig_regex.replace("%", "%%"),179 repr(expected_params).replace("%", "%%"),180 )181 )182 def _compare_sql(self, execute_observed, received_statement):183 return bool(self.regex.match(received_statement))184class DialectSQL(CompiledSQL):185 def _compile_dialect(self, execute_observed):186 return execute_observed.context.dialect187 def _compare_no_space(self, real_stmt, received_stmt):188 stmt = re.sub(r"[\n\t]", "", real_stmt)189 return received_stmt == stmt190 def _received_statement(self, execute_observed):191 received_stmt, received_params = super(192 DialectSQL, self193 )._received_statement(execute_observed)194 # TODO: why do we need this part?195 for real_stmt in execute_observed.statements:196 if self._compare_no_space(real_stmt.statement, received_stmt):197 break198 else:199 raise AssertionError(200 "Can't locate compiled statement %r in list of "201 "statements actually invoked" % received_stmt202 )203 return received_stmt, execute_observed.context.compiled_parameters204 def _compare_sql(self, execute_observed, received_statement):205 stmt = re.sub(r"[\n\t]", "", self.statement)206 # convert our comparison statement to have the207 # paramstyle of the received208 paramstyle = execute_observed.context.dialect.paramstyle209 if paramstyle == "pyformat":210 stmt = re.sub(r":([\w_]+)", r"%(\1)s", stmt)211 else:212 # positional params213 repl = None214 if paramstyle == "qmark":215 repl = "?"216 elif paramstyle == "format":217 repl = r"%s"218 elif paramstyle == "numeric":219 repl = None220 stmt = re.sub(r":([\w_]+)", repl, stmt)221 return received_statement == stmt222class CountStatements(AssertRule):223 def __init__(self, count):224 self.count = count225 self._statement_count = 0226 def process_statement(self, execute_observed):227 self._statement_count += 1228 def no_more_statements(self):229 if self.count != self._statement_count:230 assert False, "desired statement count %d does not match %d" % (231 self.count,232 self._statement_count,233 )234class AllOf(AssertRule):235 def __init__(self, *rules):236 self.rules = set(rules)237 def process_statement(self, execute_observed):238 for rule in list(self.rules):239 rule.errormessage = None240 rule.process_statement(execute_observed)241 if rule.is_consumed:242 self.rules.discard(rule)243 if not self.rules:244 self.is_consumed = True245 break246 elif not rule.errormessage:247 # rule is not done yet248 self.errormessage = None249 break250 else:251 self.errormessage = list(self.rules)[0].errormessage252class EachOf(AssertRule):253 def __init__(self, *rules):254 self.rules = list(rules)255 def process_statement(self, execute_observed):256 while self.rules:257 rule = self.rules[0]258 rule.process_statement(execute_observed)259 if rule.is_consumed:260 self.rules.pop(0)261 elif rule.errormessage:262 self.errormessage = rule.errormessage263 if rule.consume_statement:264 break265 if not self.rules:266 self.is_consumed = True267 def no_more_statements(self):268 if self.rules and not self.rules[0].is_consumed:269 self.rules[0].no_more_statements()270 elif self.rules:271 super(EachOf, self).no_more_statements()272class Or(AllOf):273 def process_statement(self, execute_observed):274 for rule in self.rules:275 rule.process_statement(execute_observed)276 if rule.is_consumed:277 self.is_consumed = True278 break279 else:280 self.errormessage = list(self.rules)[0].errormessage281class SQLExecuteObserved(object):282 def __init__(self, context, clauseelement, multiparams, params):283 self.context = context284 self.clauseelement = clauseelement285 self.parameters = _distill_params(multiparams, params)286 self.statements = []287class SQLCursorExecuteObserved(288 collections.namedtuple(289 "SQLCursorExecuteObserved",290 ["statement", "parameters", "context", "executemany"],291 )292):293 pass294class SQLAsserter(object):295 def __init__(self):296 self.accumulated = []297 def _close(self):298 self._final = self.accumulated299 del self.accumulated300 def assert_(self, *rules):301 rule = EachOf(*rules)302 observed = list(self._final)303 while observed:304 statement = observed.pop(0)305 rule.process_statement(statement)306 if rule.is_consumed:307 break308 elif rule.errormessage:309 assert False, rule.errormessage310 if observed:311 assert False, "Additional SQL statements remain"312 elif not rule.is_consumed:313 rule.no_more_statements()314@contextlib.contextmanager315def assert_engine(engine):316 asserter = SQLAsserter()317 orig = []318 @event.listens_for(engine, "before_execute")319 def connection_execute(conn, clauseelement, multiparams, params):320 # grab the original statement + params before any cursor321 # execution322 orig[:] = clauseelement, multiparams, params323 @event.listens_for(engine, "after_cursor_execute")324 def cursor_execute(325 conn, cursor, statement, parameters, context, executemany326 ):327 if not context:328 return329 # then grab real cursor statements and associate them all330 # around a single context331 if (332 asserter.accumulated333 and asserter.accumulated[-1].context is context334 ):335 obs = asserter.accumulated[-1]336 else:337 obs = SQLExecuteObserved(context, orig[0], orig[1], orig[2])338 asserter.accumulated.append(obs)339 obs.statements.append(340 SQLCursorExecuteObserved(341 statement, parameters, context, executemany342 )343 )344 try:345 yield asserter346 finally:347 event.remove(engine, "after_cursor_execute", cursor_execute)348 event.remove(engine, "before_execute", connection_execute)...
switch_impl_test.py
Source:switch_impl_test.py
1#!/usr/bin/env python2import unittest3import sys4import os.path5from copy import copy6sys.path.append(os.path.dirname(__file__) + "/../../..")7from pox.openflow.libopenflow_01 import *8from pox.openflow.switch_impl import *9class MockConnection(object):10 def __init__(self):11 self.ofp_handlers = {}12 self.received = []13 @property14 def last(self):15 return self.received[-1]16 def to_switch(self, msg):17 self.ofp_handlers[msg.header_type](msg)18 # from switch19 def send(self, msg):20 self.received.append(msg)21class SwitchImplTest(unittest.TestCase):22 def setUp(self):23 self.conn = MockConnection()24 self.switch = SwitchImpl(1, name="sw1")25 self.switch.set_connection(self.conn)26 self.packet = ethernet(src=EthAddr("00:00:00:00:00:01"), dst=EthAddr("00:00:00:00:00:02"),27 payload=ipv4(srcip=IPAddr("1.2.3.4"), dstip=IPAddr("1.2.3.5"),28 payload=udp(srcport=1234, dstport=53, payload="haha")))29 def test_hello(self):30 c = self.conn31 c.to_switch(ofp_hello(xid=123))32 self.assertEqual(len(c.received), 1)33 self.assertTrue(isinstance(c.last, ofp_hello),34 "should have received hello but got %s" % c.last)35 def test_echo_request(self):36 c = self.conn37 c.to_switch(ofp_echo_request(xid=123))38 self.assertEqual(len(c.received), 1)39 self.assertTrue(isinstance(c.last, ofp_echo_reply) and c.last.xid == 123,40 "should have received echo reply but got %s" % c.last)41 def test_barrier(self):42 c = self.conn43 c.to_switch(ofp_barrier_request(xid=123))44 self.assertEqual(len(c.received), 1)45 self.assertTrue(isinstance(c.last, ofp_barrier_reply) and c.last.xid == 123,46 "should have received echo reply but got %s" % c.last)47 def test_flow_mod(self):48 c = self.conn49 s = self.switch50 c.to_switch(ofp_flow_mod(xid=124, priority=1, match=ofp_match(in_port=1, nw_src="1.2.3.4")))51 self.assertEqual(len(c.received), 0)52 self.assertEqual(len(s.table), 1)53 e = s.table.entries[0]54 self.assertEqual(e.priority,1)55 self.assertEqual(e.match, ofp_match(in_port=1, nw_src="1.2.3.4"))56 def test_packet_out(self):57 c = self.conn58 s = self.switch59 received = []60 s.addListener(DpPacketOut, lambda(event): received.append(event))61 packet = self.packet62 c.to_switch(ofp_packet_out(data=packet, actions=[ofp_action_output(port=2)]))63 self.assertEqual(len(c.received), 0)64 self.assertEqual(len(received), 1)65 event = received[0]66 self.assertEqual(event.port.port_no,2)67 self.assertEqual(event.packet.pack(), packet.pack())68 def test_send_packet_in(self):69 c = self.conn70 s = self.switch71 s.send_packet_in(in_port=1, buffer_id=123, packet=self.packet, xid=314, reason=OFPR_NO_MATCH)72 self.assertEqual(len(c.received), 1)73 self.assertTrue(isinstance(c.last, ofp_packet_in) and c.last.xid == 314,74 "should have received packet_in but got %s" % c.last)75 self.assertEqual(c.last.in_port,1)76 self.assertEqual(c.last.buffer_id,123)77 self.assertEqual(c.last.data, self.packet.pack())78 def test_process_packet(self):79 c = self.conn80 s = self.switch81 received = []82 s.addListener(DpPacketOut, lambda(event): received.append(event))83 # no flow entries -> should result in a packet_in84 s.process_packet(self.packet, in_port=1)85 self.assertEqual(len(c.received), 1)86 self.assertTrue(isinstance(c.last, ofp_packet_in),87 "should have received packet_in but got %s" % c.last)88 self.assertTrue(c.last.buffer_id > 0)89 # let's send a flow_mod with a buffer id90 c.to_switch(ofp_flow_mod(xid=124, buffer_id=c.last.buffer_id, priority=1,91 match=ofp_match(in_port=1, nw_src="1.2.3.4"),92 actions = [ ofp_action_output(port=3) ]93 ))94 # that should have send the packet out port 395 self.assertEqual(len(received), 1)96 event = received[0]97 self.assertEqual(event.port.port_no,3)98 self.assertEqual(event.packet, self.packet)99 # now the next packet should go through on the fast path100 c.received = []101 received = []102 s.process_packet(self.packet, in_port=1)103 self.assertEqual(len(c.received), 0)104 self.assertEqual(len(received), 1)105 event = received[0]106 self.assertEqual(event.port.port_no,3)107 self.assertEqual(event.packet, self.packet)108 109 def test_take_port_down(self):110 c = self.conn111 s = self.switch112 original_num_ports = len(self.switch.ports)113 p = self.switch.ports.values()[0]114 s.take_port_down(p)115 new_num_ports = len(self.switch.ports)116 self.assertTrue(new_num_ports == original_num_ports - 1, "Should have removed the port")117 self.assertEqual(len(c.received), 1)118 self.assertTrue(isinstance(c.last, ofp_port_status),119 "should have received port_status but got %s" % c.last)120 self.assertTrue(c.last.reason == OFPPR_DELETE)121 122 def test_bring_port_up(self):123 c = self.conn124 s = self.switch125 original_num_ports = len(self.switch.ports)126 p = ofp_phy_port(port_no=1234)127 s.bring_port_up(p)128 new_num_ports = len(self.switch.ports)129 self.assertTrue(new_num_ports == original_num_ports + 1, "Should have added the port")130 self.assertEqual(len(c.received), 1)131 self.assertTrue(isinstance(c.last, ofp_port_status),132 "should have received port_status but got %s" % c.last)133 self.assertTrue(c.last.reason == OFPPR_ADD)134if __name__ == '__main__':...
update.py
Source:update.py
1from sqlalchemy.orm.exc import (NoResultFound, MultipleResultsFound)2from app.modules.quiz.models import Quiz3import itertools4class Singleton(type):5 def __init__(cls,name, bases, dict):6 super(Singleton, cls).__init__(name, bases, dict)7 cls.instance = None8 def __call__(cls, *args, **kwargs):9 if cls.instance is None:10 cls.instance = super(Singleton, cls).__call__(*args, **kwargs)11 else:12 cls._instances[cls].__init__(*args, **kwargs)13 return cls.instance14class Update(object):15 __metaclass__ = Singleton16 def __init__(self, session):17 self.__session = session18 def update_quiz(self, quiz_received, quiz_code):19 # type: (object) -> object20 try:21 quiz_found = self.__session.query(Quiz).filter_by(code=quiz_code).one()22 quiz_found.name = quiz_received.name23 quiz_found.description = quiz_received.description24 quiz_found.max_score = quiz_received.max_score25 new_max_score_per_question = quiz_found.max_score / quiz_found.question_count26 questions_received = quiz_received.questions27 questions_found = quiz_found.questions28 questions_iterator = [questions_received, questions_found]29 for question_received, question_found in itertools.product(*questions_iterator):30 if question_received.code == question_found.code:31 question_found.text = question_received.text32 question_found.max_score = new_max_score_per_question33 question_found.score = 0.034 options_received = question_received.options35 options_found = question_found.options36 options_iterator = [options_received, options_found]37 for option_received, option_found in itertools.product(*options_iterator):38 if option_received.code == option_found.code:39 option_found.text = option_received.text40 option_found.is_correct = option_received.is_correct41 option_found.is_selected = option_received.is_selected42 return quiz_found43 except NoResultFound:44 print "Quiz of code {} not found.".format(quiz_code)45 return "-"46 def answer_quiz(self, quiz_received, quiz_code):47 try:48 quiz_found = self.__session.query(Quiz).filter_by(code=quiz_code).one()49 questions_received = quiz_received.questions50 questions_found = quiz_found.questions51 questions_iterator = [questions_received, questions_found]52 for question_received, question_found in itertools.product(*questions_iterator):53 if question_received.code == question_found.code:54 options_received = question_received.options55 options_found = question_found.options56 options_iterator = [options_received, options_found]57 for option_received, option_found in itertools.product(*options_iterator):58 if option_received.code == option_found.code:59 option_found.is_selected = option_received.is_selected60 return quiz_found61 except NoResultFound:62 print "Quiz of code {} not found.".format(quiz_code)63 return "-"64 def grade_quiz(self, quiz_code):65 try:66 quiz = self.__session.query(Quiz).filter_by(code=quiz_code).one()67 total_score = 0.068 questions = quiz.questions69 for question in questions:70 options = question.options71 for option in options:72 if option.is_selected and option.is_correct:73 question.score = question.max_score74 total_score = total_score + question.score75 quiz.score = total_score76 return quiz77 except NoResultFound:78 print "Quiz of code {} not found.".format(quiz_code)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!