Best Python code snippet using localstack_python
sqs.py
Source:sqs.py
1"""Helper functions for interacting with SQS2"""3from __future__ import absolute_import, print_function, unicode_literals4import json5import logging6import boto37import six8from django.conf import settings9from django.core.exceptions import ImproperlyConfigured10from queue_fetcher.exceptions import (BotoInitFailedException,11 QueueNotFoundError,12 MessageSendFailed)13from queue_fetcher.utils.mock_sqs import MockQueue14outbox = {}15logger = logging.getLogger(__name__)16SQS_NOT_SETUP = (17 "TEST_SQS is not set in your application's settings file. "18 "Please add TEST_SQS = False to your settings."19)20_MOCKS = {}21def _get_sqs_queue(region_name, queue_name, account=None):22 sqs = boto3.resource('sqs', region_name=region_name)23 if account is not None:24 queue = sqs.get_queue_by_name(25 QueueName=queue_name,26 QueueOwnerAWSAccountId=account)27 else:28 queue = sqs.get_queue_by_name(QueueName=queue_name)29 return queue30def _is_arn(name):31 """Return whether the given name is an ARN.32 """33 return name.startswith('arn:aws:sqs:')34def get_queue(name, region_name='eu-west-1', account=None,35 raise_exception=True):36 """Return the AWS Queue Object referenced by name.37 You can specify a region_name or account to get a specific queue.38 The name can also be an ARN, overriding the region_name and account set39 here.40 If TEST_SQS is set in settings, this will return a mock object.41 NOTE: TEST_SQS must be set to either True or False for this to work.42 """43 try:44 test_sqs = settings.TEST_SQS45 except AttributeError:46 raise ImproperlyConfigured(SQS_NOT_SETUP)47 if _is_arn(name):48 region_name, account, queue_name = name.split(':')[3:]49 if account is None:50 queue_name = name51 sqs = boto3.resource('sqs', region_name=region_name)52 if sqs is None:53 raise BotoInitFailedException('Could not initialise sqs')54 if test_sqs:55 if name not in _MOCKS:56 _MOCKS[queue_name] = MockQueue(queue_name)57 queue = _MOCKS[queue_name]58 else:59 try:60 if account is None:61 queue = sqs.get_queue_by_name(QueueName=queue_name)62 else:63 queue = sqs.get_queue_by_name(64 QueueName=queue_name,65 QueueOwnerAWSAccountId=account)66 except Exception as e:67 if raise_exception:68 raise QueueNotFoundError(69 'Error getting queue for {}'.format(queue_name))70 logger.warning('Error getting queue for name: %s - %s',71 queue_name, e)72 queue = None73 return queue74def send_message(queue, message, raise_exception=True):75 """Send message on queue.76 This handles the nitty-gritty of interacting with SQS from your Django app.77 If TEST_SQS is set in settings, this will print the output to console.78 NOTE: TEST_SQS must be set to either True or False for this to work.79 """80 try:81 test_sqs = settings.TEST_SQS82 except AttributeError:83 raise ImproperlyConfigured(SQS_NOT_SETUP)84 if isinstance(message, six.binary_type):85 message = message.decode('utf-8')86 is_text = isinstance(message, six.string_types)87 if test_sqs:88 # Test Mode: Don't even try and send it!89 if is_text:90 message = json.loads(message)91 if queue.name not in outbox:92 outbox[queue.name] = []93 outbox[queue.name].append(message)94 logger.info('New message on queue %s: %s', queue.name, message)95 else:96 if not is_text:97 message = json.dumps(message)98 try:99 queue.send_message(MessageBody=message)100 except Exception as exc:101 if raise_exception:102 raise MessageSendFailed(103 'Could not send message {} over queue {}'.format(message,104 queue))105 logger.warning('Could not send message over queue %s - %s',106 six.text_type(queue),107 six.text_type(exc))108def queue_send(queue, message, raise_exception=True):109 """Combined queue retrieval and send110 """111 queue = get_queue(settings.QUEUES[queue], raise_exception=raise_exception)112 send_message(queue, message, raise_exception=raise_exception)113def requeue(queue, message, raise_exception=True):114 """Put the message back on the queue.115 """116 queue_send(queue, [message], raise_exception=raise_exception)117def clear_outbox():118 """Clear the test outbox.119 """120 keys = [k for k in outbox]121 for key in keys:...
test_aws_helpers.py
Source:test_aws_helpers.py
1import pytest2# from aws_setting_utils import s3_buckets, prefix, region3# creation functions4from create_aws_resources import generate_sqs_policy5from create_aws_resources import generate_bucket_arn_from_name6from create_aws_resources import create_bucket7from create_aws_resources import create_prefixed_buckets8from create_aws_resources import create_prefixed_swf_domain9# descruciton functions10from destroy_aws_resources import get_prefixed_buckets11from destroy_aws_resources import check_prefix_against_protected12from destroy_aws_resources import check_candidates_against_protected13from destroy_aws_resources import delete_bucket14from destroy_aws_resources import delete_prefixed_buckets15# setting config functions16from moto import mock_s317import boto318def test_generate_bucket_arn_from_name():19 assert generate_bucket_arn_from_name("hello") == "arn:aws:s3:*:*:hello"20def test_generate_sqs_policy_json():21 test_json = { "Version": "2012-10-17",22 "Id": "test_sqs/SQSDefaultPolicy",23 "Statement": [24 {25 "Sid": "",26 "Effect": "Allow",27 "Principal": {28 "AWS": "*"29 },30 "Action": "SQS:SendMessage",31 "Resource": "test_sqs",32 "Condition": {33 "ArnLike": {34 "aws:SourceArn": "test_bucket"35 }36 }37 }38 ]39 }40 result = generate_sqs_policy("test_sqs", "test_bucket")41 print result 42 assert generate_sqs_policy("test_sqs", "test_bucket") == test_json43def test_positive_check_prefix_against_protected():44 prefix = 'this'45 protected = ["this", "that", "other"]46 result = check_prefix_against_protected(prefix, protected)47 assert result == True48def test_negative_check_prefix_against_protected():49 prefix = 'this'50 protected = ["that", "other"]51 result = check_prefix_against_protected(prefix, protected)52 assert result == False53#54# @mock_s355# def test_create_bucket():56# s3 = boto3.resource('s3')57# bucket_name = "test_moto_bucket"58# location = "us-east-1"59# assert create_bucket(s3, bucket_name, location) == True60#61# location = "eu-central-1"62# assert create_bucket(s3, bucket_name, location) == True63#64# @mock_s365# def test_get_prefixed_bucket():66# # setup via creating some test buckets67# s3 = boto3.resource('s3')68# s3.create_bucket(Bucket="pfa-test_moto_bucket1")69# s3.create_bucket(Bucket="pfa-test_moto_bucket2")70# s3.create_bucket(Bucket="pfa-test_moto_bucket3")71# s3.create_bucket(Bucket="pfb-test_moto_bucket4")72# s3.create_bucket(Bucket="pfb-test_moto_bucket5")73# for b in s3.buckets.all():74# print b75# test_prefix = "pfb"76# test_bucket_list = ["pfb_test_moto_bucket4", "pfb_test_moto_bucket5"]77# prefixed_buckets = map(lambda x: x.name, get_prefixed_buckets(test_prefix))78# print prefixed_buckets...
main.py
Source:main.py
1import boto32__author__ = 'dengjing'3def test_sqs():4 # sqs_client = boto3.client('sqs')5 # print(sqs_client.list_queues())6 sqs_resource = boto3.resource('sqs')7 queue = sqs_resource.get_queue_by_name(QueueName='river')8 # print(queue.attributes.get('DelaySeconds'))9 # print(sqs_resource.get_available_subresources())10 response = queue.send_message(MessageBody='hello world!')11 print response12def test_sqs_receive_messages():13 sqs_resource = boto3.resource('sqs')14 queue = sqs_resource.get_queue_by_name(QueueName='river')15 message = queue.receive_messages(MaxNumberOfMessages=10)16 print len(message)17 print message[0].body18 print queue19def base_test():20 # client = boto3.client('sqs')21 # print(client.list_queues())22 sqs = boto3.resource('sqs')23 queue = sqs.Queue(url='https://us-west-2.queue.amazonaws.com/052792705405/river')24 print queue.receive_messages()[0].body25 print dir(queue)26if __name__ == '__main__':27 # test_sqs()28 # test_sqs_receive_messages()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!