Best Python code snippet using localstack_python
rest.py
Source:rest.py
1# pylint: disable=no-member2import math3import os4import requests5from flask import Blueprint, abort, request, jsonify, current_app, Response6from flask_restful import Api, reqparse7from sqlalchemy.orm.exc import NoResultFound8from sqlalchemy import func, or_9from sqlalchemy.orm import aliased10from flask_simple_api import error_abort11from flask_security import current_user12from .. import models13from ..models import Error, Session, Test, User, Subject, db, UserStarredTests14from .. import activity15from ..utils.identification import parse_test_id, parse_session_id16from ..utils.users import has_role17from ..utils.rest import ModelResource18from ..filters import filter_by_statuses19from ..search import get_orm_query_from_search_string20blueprint = Blueprint('rest', __name__, url_prefix='/rest')21rest = Api(blueprint)22def _resource(*args, **kwargs):23 def decorator(resource):24 rest.add_resource(resource, *args, **kwargs)25 return resource26 return decorator27##########################################################################28session_parser = reqparse.RequestParser()29session_parser.add_argument('user_id', type=int, default=None)30session_parser.add_argument('subject_name', type=str, default=None)31session_parser.add_argument('search', type=str, default=None)32session_parser.add_argument('parent_logical_id', type=str, default=None)33session_parser.add_argument('id', type=str, default=None)34@_resource('/sessions', '/sessions/<object_id>')35class SessionResource(ModelResource):36 MODEL = Session37 DEFAULT_SORT = (Session.start_time.desc(),)38 def _get_object_by_id(self, object_id):39 return _get_object_by_id_or_logical_id(self.MODEL, object_id)40 def _get_iterator(self):41 args = session_parser.parse_args()42 if args.id is not None:43 return _get_query_by_id_or_logical_id(self.MODEL, args.id)44 if args.search:45 returned = get_orm_query_from_search_string('session', args.search, abort_on_syntax_error=True)46 else:47 returned = super(SessionResource, self)._get_iterator()48 if args.parent_logical_id is not None:49 returned = returned.filter(Session.parent_logical_id == args.parent_logical_id)50 else:51 returned = returned.filter(Session.parent_logical_id == None)52 if args.subject_name is not None:53 returned = (54 returned55 .join(Session.subject_instances)56 .join(Subject)57 .filter(Subject.name == args.subject_name))58 if args.user_id is not None:59 returned = returned.filter(Session.user_id == args.user_id)60 returned = filter_by_statuses(returned, self.MODEL)61 return returned62test_query_parser = reqparse.RequestParser()63test_query_parser.add_argument('session_id', default=None)64test_query_parser.add_argument('info_id', type=int, default=None)65test_query_parser.add_argument('search', type=str, default=None)66test_query_parser.add_argument('after_index', type=int, default=None)67test_query_parser.add_argument('before_index', type=int, default=None)68test_query_parser.add_argument('id', type=str, default=None)69test_query_parser.add_argument('starred', type=bool, default=False)70test_query_parser.add_argument('user_id', type=int, default=None)71@_resource('/tests', '/tests/<object_id>', '/sessions/<session_id>/tests')72class TestResource(ModelResource):73 MODEL = Test74 DEFAULT_SORT = (Test.start_time.desc(),)75 SORTABLE_FIELDS = INVERSE_SORTS = ['start_time', 'test_index']76 from .filter_configs import TEST_FILTERS as FILTER_CONFIG77 def _get_object_by_id(self, object_id):78 return _get_object_by_id_or_logical_id(self.MODEL, object_id)79 def _render_many(self, objects, *, in_collection: bool):80 args = test_query_parser.parse_args()81 user_id = args.user_id or getattr(current_user, 'id', None)82 rendered_tests = super()._render_many(objects, in_collection=in_collection)83 if not rendered_tests or not current_user.is_authenticated:84 return rendered_tests85 if not in_collection:86 rendered_tests['is_starred'] = db.session.query(db.session.query(UserStarredTests).filter(UserStarredTests.user_id == user_id, UserStarredTests.test_id==rendered_tests['id']).exists()).scalar()87 else:88 rendered_test_ids = [test['id'] for test in rendered_tests['tests']]89 starred_test_ids = set([test_id for (test_id, ) in db.session.query(Test.id).join(UserStarredTests).filter(UserStarredTests.user_id == user_id, Test.id.in_(rendered_test_ids)).all()])90 for test in rendered_tests['tests']:91 test['is_starred'] = test['id'] in starred_test_ids92 return rendered_tests93 def _get_iterator(self):94 args = test_query_parser.parse_args()95 user_id = user_id = args.user_id or getattr(current_user, 'id', None)96 if args.id is not None:97 return _get_query_by_id_or_logical_id(self.MODEL, args.id)98 if args.starred:99 return Test.query.join(UserStarredTests).filter(UserStarredTests.user_id == user_id).order_by(UserStarredTests.star_creation_time.desc()).all()100 if args.session_id is None:101 args.session_id = request.view_args.get('session_id')102 if args.search:103 returned = get_orm_query_from_search_string('test', args.search, abort_on_syntax_error=True)104 else:105 returned = super(TestResource, self)._get_iterator().join(Session, Session.id == Test.session_id)106 # get session107 if args.session_id is not None:108 returned = self._filter_by_session_id(returned, args.session_id)109 if args.info_id is not None:110 returned = returned.filter(Test.test_info_id == args.info_id)111 returned = filter_by_statuses(returned, Test)112 if args.session_id is not None:113 if args.after_index is not None:114 returned = returned.filter(self.MODEL.test_index > args.after_index).order_by(self.MODEL.test_index.asc()).limit(1).all()115 elif args.before_index is not None:116 returned = returned.filter(self.MODEL.test_index < args.before_index).order_by(self.MODEL.test_index.desc()).limit(1).all()117 return returned118 def _filter_by_session_id(self, query, session_id):119 try:120 int(session_id)121 except ValueError:122 id_field = lambda model: model.logical_id123 else:124 id_field = lambda model: model.id125 session_aliased = aliased(Session)126 children = (127 db.session.query(id_field(Session))128 .filter(129 db.session.query(session_aliased.id)130 .filter(id_field(session_aliased) == session_id)131 .filter(session_aliased.logical_id == Session.parent_logical_id)132 .exists()133 )134 .all()135 )136 children_ids = [row[0] for row in children]137 criterion = id_field(Session) == session_id138 if children_ids:139 criterion |= id_field(Session).in_(children_ids)140 returned = query.filter(criterion)141 return returned142 def _paginate(self, query, metadata):143 count_pages = bool(request.args.get('session_id'))144 if count_pages:145 num_objects = query.count()146 else:147 num_objects = None148 returned = super()._paginate(query, metadata)149 if count_pages:150 metadata['num_pages'] = math.ceil(num_objects / metadata['page_size']) or 1151 return returned152@_resource('/test_infos/<object_id>')153class TestInfoResource(ModelResource):154 MODEL = models.TestInformation155@_resource('/subjects', '/subjects/<object_id>')156class SubjectResource(ModelResource):157 MODEL = models.Subject158 ONLY_FIELDS = ['id', 'name', 'last_activity']159 SORTABLE_FIELDS = ['last_activity', 'name']160 INVERSE_SORTS = ['last_activity']161 def _get_object_by_id(self, object_id):162 try:163 object_id = int(object_id)164 except ValueError:165 try:166 return self.MODEL.query.filter(models.Subject.name == object_id).one()167 except NoResultFound:168 abort(requests.codes.not_found)169 else:170 return self.MODEL.query.get_or_404(object_id)171def _get_query_by_id_or_logical_id(model, object_id):172 query_filter = model.logical_id == object_id173 try:174 numeric_object_id = int(object_id)175 except ValueError:176 pass177 else:178 query_filter = (model.id == numeric_object_id) | query_filter179 return model.query.filter(query_filter)180def _get_object_by_id_or_logical_id(model, object_id):181 returned = _get_query_by_id_or_logical_id(model, object_id).first()182 if returned is None:183 abort(requests.codes.not_found) # pylint: disable=no-member184 return returned185session_test_user_query_parser = reqparse.RequestParser()186session_test_user_query_parser.add_argument('session_id', type=int, default=None)187session_test_user_query_parser.add_argument('test_id', type=int, default=None)188session_test_user_query_parser.add_argument('user_id', type=int, default=None)189session_test_query_parser = reqparse.RequestParser()190session_test_query_parser.add_argument('session_id', type=int, default=None)191session_test_query_parser.add_argument('test_id', type=int, default=None)192errors_query_parser = reqparse.RequestParser()193errors_query_parser.add_argument('session_id', default=None)194errors_query_parser.add_argument('test_id', default=None)195errors_query_parser.add_argument('interruptions', default=False, type=bool)196@_resource('/warnings', '/warnings/<int:object_id>')197class WarningsResource(ModelResource):198 MODEL = models.Warning199 DEFAULT_SORT = (models.Warning.timestamp.asc(),)200 def _get_iterator(self):201 args = session_test_user_query_parser.parse_args()202 returned = self.MODEL.query.filter_by(test_id=args.test_id, session_id=args.session_id)203 return returned204@_resource('/errors', '/errors/<int:object_id>')205class ErrorResource(ModelResource):206 MODEL = Error207 DEFAULT_SORT = (Error.timestamp.asc(),)208 def _get_iterator(self):209 args = errors_query_parser.parse_args()210 if args.session_id is not None:211 session = _get_object_by_id_or_logical_id(Session, args.session_id)212 # We want to query only the session's own errors if it is either a non-parllel session or a child session213 if session.parent_logical_id is not None or not session.is_parent_session:214 query = db.session.query(Error).filter(Error.session_id == session.id)215 else:216 query = db.session.query(Error).join(Session).filter(or_(Session.id == session.id, Session.parent_logical_id == session.logical_id))217 elif args.test_id is not None:218 query = Error.query.filter_by(test_id=parse_test_id(args.test_id))219 else:220 abort(requests.codes.bad_request)221 if args.interruptions:222 query = query.filter_by(is_interruption=True)223 else:224 query = query.filter((self.MODEL.is_interruption == False) | (self.MODEL.is_interruption == None)) # pylint: disable=singleton-comparison225 return query226@blueprint.route('/tracebacks/<uuid>')227def get_traceback(uuid):228 if not current_app.config['DEBUG'] and not current_app.config['TESTING']:229 abort(requests.codes.not_found)230 path = _get_traceback_path(uuid)231 if not os.path.isfile(path):232 abort(requests.codes.not_found)233 def sender():234 with open(path, 'rb') as f:235 while True:236 buff = f.read(4096)237 if not buff:238 break239 yield buff240 return Response(sender(), headers={'Content-Encoding': 'gzip', 'Content-Type': 'application/json'})241def _get_traceback_path(uuid):242 return os.path.join(current_app.config['TRACEBACK_DIR'], uuid[:2], uuid + '.gz')243@_resource('/users', '/users/<object_id>')244class UserResource(ModelResource):245 ONLY_FIELDS = ['id', 'email', 'last_activity']246 SORTABLE_FIELDS = ['last_activity', 'email', 'first_name', 'last_name']247 INVERSE_SORTS = ['last_activity']248 MODEL = User249 def _get_iterator(self):250 returned = super()._get_iterator()251 if request.args.get('current_user'):252 if not current_user.is_authenticated:253 return []254 object_id = current_user.id255 returned = returned.filter(self.MODEL.id == object_id)256 filter = request.args.get('filter')257 if filter:258 filter = filter.lower()259 returned = returned.filter(func.lower(User.first_name).contains(filter) | func.lower(User.last_name).contains(filter) | func.lower(User.email).contains(filter))260 return returned261 def _get_object_by_id(self, object_id):262 try:263 object_id = int(object_id)264 except ValueError:265 try:266 object_id = User.query.filter_by(email=object_id).one().id267 except NoResultFound:268 abort(requests.codes.not_found)269 return User.query.get_or_404(int(object_id))270@_resource('/comments', '/comments/<object_id>', methods=['get', 'delete', 'put'])271class CommentResource(ModelResource):272 MODEL = models.Comment273 DEFAULT_SORT = (models.Comment.timestamp.asc(),)274 def _get_iterator(self):275 args = session_test_query_parser.parse_args()276 if not ((args.session_id is not None) ^ (args.test_id is not None)): # pylint: disable=superfluous-parens277 error_abort('Either test_id or session_id must be passed to the query')278 return models.Comment.query.filter_by(session_id=args.session_id, test_id=args.test_id)279 def delete(self, object_id=None):280 if object_id is None:281 error_abort('Not implemented', code=requests.codes.not_implemented)282 comment = models.Comment.query.get_or_404(object_id)283 if comment.session_id is not None:284 obj = models.Session.query.get(comment.session_id)285 else:286 obj = models.Test.query.get(comment.test_id)287 if comment.user_id != current_user.id:288 error_abort('Not allowed to delete comment', code=requests.codes.forbidden)289 obj.num_comments = type(obj).num_comments - 1290 models.db.session.add(obj)291 models.db.session.delete(comment)292 models.db.session.commit()293 def put(self, object_id=None):294 if object_id is None:295 error_abort('Not implemented', code=requests.codes.not_implemented)296 comment = models.Comment.query.get_or_404(object_id)297 if comment.user_id != current_user.id:298 error_abort('Not allowed to delete comment', code=requests.codes.forbidden)299 comment.comment = request.get_json().get('comment', {}).get('comment')300 comment.edited = True301 models.db.session.add(comment)302 models.db.session.commit()303 return jsonify({'comment': self._render_single(comment, in_collection=False)})304related_entity_parser = reqparse.RequestParser()305related_entity_parser.add_argument('session_id', default=None, type=int)306related_entity_parser.add_argument('test_id', default=None, type=int)307@_resource('/entities', '/entities/<int:object_id>')308class RelatedEntityResource(ModelResource):309 MODEL = models.Entity310 def _get_iterator(self):311 args = related_entity_parser.parse_args()312 if not ((args.session_id is None) ^ (args.test_id is None)):313 error_abort('Either test_id or session_id must be provided')314 if args.session_id is not None:315 return self._get_all_children_entities(args.session_id)316 elif args.test_id is not None:317 return models.Entity.query.join(models.test_entity).filter(models.test_entity.c.test_id == args.test_id)318 else:319 raise NotImplementedError() # pragma: no cover320 def _get_all_children_entities(self, session_id):321 query = models.Entity.query.join(models.session_entity).join(models.Session)322 session_alias = aliased(models.Session)323 criterion = models.Session.id == session_id324 criterion |= (db.session.query(session_alias)325 .filter(session_alias.id == session_id)326 .filter(session_alias.logical_id is not None)327 .filter(session_alias.logical_id == models.Session.parent_logical_id).exists().correlate(models.Session))328 return query.filter(criterion)329@_resource('/migrations', '/migrations/<object_id>')330class MigrationsResource(ModelResource):331 MODEL = models.BackgroundMigration332 DEFAULT_SORT = (models.BackgroundMigration.started_time.desc(),)333 ONLY_FIELDS = [334 'id',335 'name',336 'started',337 'started_time',338 'finished',339 'finished_time',340 'total_num_objects',341 'remaining_num_objects'342 ]343@_resource('/cases', '/cases/<object_id>')344class CaseResource(ModelResource):345 MODEL = models.TestInformation346 DEFAULT_SORT = (models.TestInformation.name, models.TestInformation.file_name, models.TestInformation.class_name)347 def _get_iterator(self):348 search = request.args.get('search')349 if search:350 returned = get_orm_query_from_search_string('case', search, abort_on_syntax_error=True)351 else:352 returned = super()._get_iterator()353 returned = returned.filter(~self.MODEL.file_name.like('/%'))354 return returned355@_resource('/replications', '/replications/<object_id>')356class ReplicationsResource(ModelResource):357 MODEL = models.Replication358 ONLY_FIELDS = [359 'id',360 'paused',361 'avg_per_second',362 'backlog_remaining',363 'last_replicated_timestamp',364 'last_error',365 'service_type',366 'username',367 'url',368 'index_name'369 ]370 def _render_many(self, objects, *, in_collection: bool):371 returned = super()._render_many(objects, in_collection=in_collection)372 [latest_timestamp] = models.db.session.query(func.max(models.Test.updated_at)).one()373 if latest_timestamp:374 latest_timestamp = latest_timestamp.timestamp()375 if in_collection:376 collection = returned.get('replications', [])377 else:378 collection = [returned]379 for replication in collection:380 last_replicated = replication['last_replicated_timestamp']381 if not latest_timestamp or not last_replicated:382 lag = None383 else:384 lag = latest_timestamp - last_replicated385 replication['lag_seconds'] = lag386 return returned387 def put(self, object_id=None):388 if object_id is None:389 error_abort('Not implemented', code=requests.codes.not_implemented)390 if not has_role(current_user, 'admin'):391 error_abort('Forbidden', code=requests.codes.forbidden)392 replication = models.Replication.query.get_or_404(object_id)393 request_json = request.get_json().get("replication", {})394 for field_name in {'username', 'url', 'password'}:395 value = request_json.get(field_name)396 if value is not None:397 setattr(replication, field_name, value)398 models.db.session.commit()399 return jsonify({'replication': self._render_single(replication, in_collection=False)})400@blueprint.route('/admin_alerts')401def get_admin_alerts():402 return jsonify({403 'admin_alerts': [404 {405 'id': index,406 "message": alert,407 }408 for index, alert in enumerate(_iter_alerts(), 1)409 ]410 })411def _iter_alerts():412 if models.Replication.query.filter(models.Replication.last_error != None).count():...
test_query_parser.py
Source:test_query_parser.py
1import unittest2from query_parser import QueryParser3class TestQueryParser(unittest.TestCase):4 test_query_parser = QueryParser()5 def test_split_query_into_arguments(self):6 query = "search user field value"7 query_args = QueryParser.split_query_into_arguments(query)8 self.assertEqual(query_args[0], "search")9 self.assertEqual(query_args[1], "user")10 self.assertEqual(query_args[2], "field")11 self.assertEqual(query_args[3], "value")12 def test_type_of_query(self):13 query_args = ["search", "user", "field", "value"]14 query_type = self.test_query_parser.get_type_of_query(query_args)15 self.assertEqual(query_type, QueryParser.QueryType.SEARCH_QUERY)16 query_args = ["list", "user"]17 query_type = self.test_query_parser.get_type_of_query(query_args)18 self.assertEqual(query_type, QueryParser.QueryType.LIST_QUERY)19 query_args = ["help"]20 query_type = self.test_query_parser.get_type_of_query(query_args)21 self.assertEqual(query_type, QueryParser.QueryType.HELP_QUERY)22if __name__ == '__main__':...
__main__.py
Source:__main__.py
1import unittest2from test_parse_date import *3from test_parsing import *4from test_query_lexer import *5from test_query_parser import *6from test_query import *7from test_textutils import *8from test_todos import *9if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!