Best Python code snippet using ATX
rabbitmq.py
Source:rabbitmq.py
1# Encoding: utf-82# --3# Copyright (c) 2008-2022 Net-ng.4# All rights reserved.5#6# This software is licensed under the BSD License, as described in7# the file LICENSE.txt, which you should have received as part of8# this distribution.9# --10"""Provides the classes to interact with RabbitMQ"""11from functools import partial12from concurrent.futures import ThreadPoolExecutor13try:14 import urlparse15except ImportError:16 import urllib.parse as urlparse17import amqpstorm18import transaction19from nagare.services import plugin, proxy20class Message(amqpstorm.message.Message):21 @property22 def delivery_info(self):23 return dict(self.method, channel=self.channel)24 def __str__(self):25 return 'Message[body: {}, delivery_info: {}, properties: {}]'.format(26 self.body,27 self.delivery_info,28 self.properties29 )30class RabbitMQ(plugin.Plugin):31 """The RabbitMQ client service32 """33 LOAD_PRIORITY = 1034 CONFIG_SPEC = dict(35 plugin.Plugin.CONFIG_SPEC,36 uri='string(default=None)',37 host='string(default="127.0.0.1")',38 port='integer(default=5672)',39 user='string(default="guest")',40 password='string(default="guest")',41 vhost='string(default="/")',42 connect_timeout='float(default=0)',43 heartbeat='integer(default=60)',44 lazy='boolean(default=False)',45 ssl='boolean(default=False)'46 )47 CONFIG_TRANSLATIONS = {48 'host': 'hostname',49 'user': 'username',50 'vhost': 'virtual_host',51 'connect_timeout': 'timeout'52 }53 def __init__(self, name, dist, **config):54 super(RabbitMQ, self).__init__(name, dist, **config)55 self.connection = None56 @property57 def is_open(self):58 return (self.connection is not None) and self.connection.is_open59 @property60 def is_closed(self):61 return (self.connection is None) or self.connection.is_closed62 def open(self):63 self.connection.open()64 def close(self):65 self.connection.close()66 def handle_start(self, app):67 config = {self.CONFIG_TRANSLATIONS.get(k, k): v for k, v in self.plugin_config.items()}68 uri = config.pop('uri')69 if uri:70 uri = urlparse.urlparse(uri)71 if uri.scheme == 'amqp':72 config.update({73 'hostname': uri.hostname or '127.0.0.1',74 'port': uri.port or 5672,75 'username': uri.username or 'guest',76 'password': uri.password or 'guest',77 'virtual_host': uri.path or '/'78 })79 self.connection = amqpstorm.Connection(**config)80 def create_channel(self, prefetch=None):81 """Create a channel on the connection82 """83 channel = self.connection.channel()84 if prefetch:85 channel.basic.qos(prefetch)86 return channel87 def send_heartbeat(self):88 self._channel0.send_heartbeat()89class _Channel(object):90 def __init__(91 self,92 rabbitmq_service, exchange=None, queue=None,93 route='', durable=False, prefetch=None,94 auto_decode=False, pool=1, transaction=True,95 **config96 ):97 self.rabbitmq = rabbitmq_service98 self.exchange = exchange99 self.queue = queue100 self.route = route101 self.durable = durable102 self.prefetch = prefetch103 self.auto_decode = auto_decode104 self.pool_size = pool105 self.transaction = transaction106 self.pool = None107 self.out_channel = self.in_channel = None108 def close(self, reply_code=0, reply_text='Normal shutdown'):109 self.out_channel.close(reply_code, reply_text)110 self.out_channel = None111 self.in_channel.close(reply_code, reply_text)112 self.in_channel = None113 def create_out_channel(self, **params):114 return self.rabbitmq.create_channel(**params)115 def create_in_channel(self, **params):116 return self.rabbitmq.create_channel(**params)117 @staticmethod118 def declare_in_exchange(channel, exchange, mode, **params):119 channel.exchange.declare(exchange, mode, **params)120 @staticmethod121 def declare_in_queue(channel, queue, **params):122 channel.queue.declare(queue, **params)123 def handle_start(124 self,125 queue, auto_delete, durable,126 route,127 exchange, mode,128 prefetch, **config129 ):130 self.out_channel = self.create_out_channel()131 in_channel = self.create_in_channel(prefetch=prefetch)132 if exchange is not None:133 self.declare_in_exchange(in_channel, exchange, mode)134 if queue is not None:135 self.declare_in_queue(in_channel, queue, auto_delete=auto_delete, durable=durable)136 if (queue is not None) and (exchange is not None):137 in_channel.queue.bind(queue, exchange, route)138 self.in_channel = in_channel139 def handle_request(self, chain, **params):140 if self.transaction:141 self.out_channel.tx.select()142 transaction.get().join(self)143 return chain.next(**params)144 def sortKey(self):145 return '~sqlz' # Commit after the SQLAlchemy transaction146 def tpc_finish(self, transaction):147 self.out_channel.tx.commit()148 def abort(self, transaction):149 self.out_channel.tx.rollback()150 tpc_abort = abort151 tpc_begin = tpc_commit = tpc_vote = commit = lambda self, transaction: None152 def send_raw_message(self, message, mandatory=False, immediate=False):153 if self.exchange is not None:154 message.delivery_mode = 2 if self.durable else 1155 message.publish(self.route, self.exchange, mandatory=mandatory, immediate=immediate)156 def send_raw(self, body, mandatory=False, immediate=False, **properties):157 message = Message(self.out_channel, body=body, properties=properties)158 self.send_raw_message(message, mandatory, immediate)159 def send(self, correlation_id, app_id, content_type, body, mandatory=False, immediate=False, **properties):160 properties.update({161 'correlation_id': correlation_id,162 'app_id': app_id,163 'content_type': content_type164 })165 message = Message.create(self.out_channel, body, properties)166 self.send_raw_message(message, mandatory, immediate)167 def _on_receive(self, consumer, body, channel, method, properties):168 message = Message(channel, body=body, method=method, properties=properties, auto_decode=self.auto_decode)169 self.pool.submit(consumer, message).add_done_callback(lambda future: future.result())170 def on_receive(self, consumer, exclusive=False, consumer_tag=''):171 self.in_channel.basic.consume(172 partial(self._on_receive, consumer), self.queue,173 no_ack=not self.prefetch,174 exclusive=exclusive,175 consumer_tag=consumer_tag176 )177 def start_consuming(self):178 if self.pool is None:179 self.pool = ThreadPoolExecutor(self.pool_size)180 self.in_channel.start_consuming(to_tuple=True)181 def stop_consuming(self):182 self.in_channel.stop_consuming()183 self.pool.shutdown()184 self.pool = None185 def ack(self, message, multiple=False):186 if self.prefetch is not None:187 delivery_tag = message.delivery_info['delivery_tag']188 self.in_channel.basic.ack(delivery_tag, multiple)189 def nack(self, message, multiple=False, requeue=False):190 if self.prefetch is not None:191 delivery_tag = message.delivery_info['delivery_tag']192 self.in_channel.basic.nack(delivery_tag, multiple=multiple, requeue=requeue)193 def reject(self, message, multiple=False, requeue=False):194 if self.prefetch is not None:195 delivery_tag = message.delivery_info['delivery_tag']196 self.in_channel.basic.reject(delivery_tag, multiple=multiple, requeue=requeue)197@proxy.proxy_to(_Channel, lambda self: self.channels[self.name], {'handle_start'})198class Channel(plugin.Plugin):199 # LOAD_PRIORITY = 15200 CONFIG_SPEC = dict(201 plugin.Plugin.CONFIG_SPEC,202 exchange='string(default=None)',203 mode='string(default="direct")',204 queue='string(default=None)',205 route='string(default="")',206 auto_delete='boolean(default=True)',207 durable='boolean(default=False)',208 prefetch='integer(default=None)',209 auto_decode='boolean(default=False)',210 pool='integer(default=1)',211 transaction='boolean(default=True)'212 )213 channels = {}214 def __init__(215 self,216 name, dist,217 rabbitmq_service, exchange=None, queue=None,218 mode='direct', route='', auto_delete=True, durable=False, prefetch=None,219 auto_decode=False, pool=1, transaction=True,220 services_service=None,221 **config222 ):223 services_service(224 super(Channel, self).__init__, name, dist,225 exchange=exchange, queue=queue,226 mode=mode, route=route, auto_delete=auto_delete, durable=durable, prefetch=prefetch,227 auto_decode=auto_decode, pool=pool, transaction=transaction,228 **config229 )230 self.queue = queue231 self.exchange = exchange232 self.route = route233 self.__class__.channels[name] = _Channel(234 rabbitmq_service, exchange=exchange, queue=queue,235 mode=mode, route=route, auto_delete=auto_delete, durable=durable, prefetch=prefetch,236 auto_decode=auto_decode, pool=pool, transaction=transaction,237 **config238 )239 def handle_start(self, app):...
mydbstatic.py
Source:mydbstatic.py
...3from ctypes import *4import platform5if sys.version_info < (3,):6 auto_encode = c_char_p7 def auto_decode(result, func, args):8 return result9else:10 class auto_encode(c_char_p):11 @classmethod12 def from_param(cls, value):13 if value is None:14 return value15 else:16 return value.encode()17 def auto_decode(result, func, args):18 if result is None:19 return result20 else:21 return result.decode()22_FunctionList = (23# ('dbFreeBase', None, (c_void_p,)),24 ('dbReadDatabase', c_int, None,25 (c_void_p, auto_encode, auto_encode, auto_encode)),26 ('dbAllocEntry', c_void_p, None, (c_void_p,)),27 ('dbFirstRecordType', c_int, None, (c_void_p,)),28 ('dbGetRecordTypeName', c_char_p, auto_decode, (c_void_p,)),29 ('dbNextRecordType', c_int, None, (c_void_p,)),30 ('dbFreeEntry', None, None, (c_void_p,)),31 ('dbCopyEntry', c_void_p, None, (c_void_p,)),...
test_decode.py
Source:test_decode.py
1import pytest2from hbutils.encoding import auto_decode3@pytest.mark.unittest4class TestEncodingDecode:5 def test_auto_decode(self):6 assert auto_decode(b'kdsfjldsjflkdsmgds') == 'kdsfjldsjflkdsmgds'7 assert auto_decode(b'kdsfjldsjflkdsmgds', 'utf-8') == 'kdsfjldsjflkdsmgds'8 assert auto_decode(b'\xd0\x94\xd0\xbe\xd0\xb1\xd1\x80\xd1\x8b\xd0\xb9 \xd0'9 b'\xb2\xd0\xb5\xd1\x87\xd0\xb5\xd1\x80') == "ÐобÑÑй веÑеÑ"10 assert auto_decode(b'\xa4\xb3\xa4\xf3\xa4\xd0\xa4\xf3\xa4\xcf') == "ããã°ãã¯"11 assert auto_decode(b'\xcd\xed\xc9\xcf\xba\xc3') == "æä¸å¥½"12 with pytest.raises(UnicodeDecodeError):13 auto_decode(b'\xa4\xb3\xa4\xf3\xa4\xd0\xa4\xf3\xa4\xcf', 'utf-8')14 with pytest.raises(UnicodeDecodeError):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!