Best Python code snippet using localstack_python
horizontal_shard.py
Source:horizontal_shard.py
1# ext/horizontal_shard.py2# Copyright (C) 2005-2017 the SQLAlchemy authors and contributors3# <see AUTHORS file>4#5# This module is part of SQLAlchemy and is released under6# the MIT License: http://www.opensource.org/licenses/mit-license.php7"""Horizontal sharding support.8Defines a rudimental 'horizontal sharding' system which allows a Session to9distribute queries and persistence operations across multiple databases.10For a usage example, see the :ref:`examples_sharding` example included in11the source distribution.12"""13from .. import util14from ..orm.session import Session15from ..orm.query import Query16__all__ = ['ShardedSession', 'ShardedQuery']17class ShardedQuery(Query):18 def __init__(self, *args, **kwargs):19 super(ShardedQuery, self).__init__(*args, **kwargs)20 self.id_chooser = self.session.id_chooser21 self.query_chooser = self.session.query_chooser22 self._shard_id = None23 def set_shard(self, shard_id):24 """return a new query, limited to a single shard ID.25 all subsequent operations with the returned query will26 be against the single shard regardless of other state.27 """28 q = self._clone()29 q._shard_id = shard_id30 return q31 def _execute_and_instances(self, context):32 def iter_for_shard(shard_id):33 context.attributes['shard_id'] = shard_id34 result = self._connection_from_session(35 mapper=self._mapper_zero(),36 shard_id=shard_id).execute(37 context.statement,38 self._params)39 return self.instances(result, context)40 if self._shard_id is not None:41 return iter_for_shard(self._shard_id)42 else:43 partial = []44 for shard_id in self.query_chooser(self):45 partial.extend(iter_for_shard(shard_id))46 # if some kind of in memory 'sorting'47 # were done, this is where it would happen48 return iter(partial)49 def _get_impl(self, ident, fallback_fn):50 def _fallback(query, ident):51 if self._shard_id is not None:52 return fallback_fn(self, ident)53 else:54 ident = util.to_list(ident)55 for shard_id in self.id_chooser(self, ident):56 q = self.set_shard(shard_id)57 o = fallback_fn(q, ident)58 if o is not None:59 return o60 else:61 return None62 return super(ShardedQuery, self)._get_impl(ident, _fallback)63class ShardedSession(Session):64 def __init__(self, shard_chooser, id_chooser, query_chooser, shards=None,65 query_cls=ShardedQuery, **kwargs):66 """Construct a ShardedSession.67 :param shard_chooser: A callable which, passed a Mapper, a mapped68 instance, and possibly a SQL clause, returns a shard ID. This id69 may be based off of the attributes present within the object, or on70 some round-robin scheme. If the scheme is based on a selection, it71 should set whatever state on the instance to mark it in the future as72 participating in that shard.73 :param id_chooser: A callable, passed a query and a tuple of identity74 values, which should return a list of shard ids where the ID might75 reside. The databases will be queried in the order of this listing.76 :param query_chooser: For a given Query, returns the list of shard_ids77 where the query should be issued. Results from all shards returned78 will be combined together into a single listing.79 :param shards: A dictionary of string shard names80 to :class:`~sqlalchemy.engine.Engine` objects.81 """82 super(ShardedSession, self).__init__(query_cls=query_cls, **kwargs)83 self.shard_chooser = shard_chooser84 self.id_chooser = id_chooser85 self.query_chooser = query_chooser86 self.__binds = {}87 self.connection_callable = self.connection88 if shards is not None:89 for k in shards:90 self.bind_shard(k, shards[k])91 def connection(self, mapper=None, instance=None, shard_id=None, **kwargs):92 if shard_id is None:93 shard_id = self.shard_chooser(mapper, instance)94 if self.transaction is not None:95 return self.transaction.connection(mapper, shard_id=shard_id)96 else:97 return self.get_bind(98 mapper,99 shard_id=shard_id,100 instance=instance101 ).contextual_connect(**kwargs)102 def get_bind(self, mapper, shard_id=None,103 instance=None, clause=None, **kw):104 if shard_id is None:105 shard_id = self.shard_chooser(mapper, instance, clause=clause)106 return self.__binds[shard_id]107 def bind_shard(self, shard_id, bind):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!