Best Python code snippet using localstack_python
test_sqs_queue.py
Source:test_sqs_queue.py
1from cloudrail.knowledge.context.aws.aws_environment_context import AwsEnvironmentContext2from tests.knowledge.context.aws_context_test import AwsContextTest3from tests.knowledge.context.test_context_annotation import TestOptions, context4class TestSqsQueue(AwsContextTest):5 def get_component(self):6 return 'sqs_queues'7 @context(module_path="encrypted_at_rest")8 def test_encrypted_at_rest(self, ctx: AwsEnvironmentContext):9 self.assertEqual(len(ctx.sqs_queues), 2)10 sqs_queue = next((sqs_queue for sqs_queue in ctx.sqs_queues if sqs_queue.queue_name == 'sqs_encrypted'), None)11 self.assertTrue(sqs_queue.encrypted_at_rest)12 self.assertTrue(sqs_queue.arn)13 self.assertFalse(sqs_queue.resource_based_policy)14 if not sqs_queue.is_managed_by_iac:15 self.assertEqual(sqs_queue.get_cloud_resource_url(),16 'https://console.aws.amazon.com/sqs/v2/home?region=us-east-1#'17 '/queues/https%3A%2F%2Fqueue.amazonaws.com%2F115553109071%2Fsqs_encrypted')18 @context(module_path="encrypted_at_rest_with_customer_managed_cmk")19 def test_encrypted_at_rest_with_customer_managed_cmk(self, ctx: AwsEnvironmentContext):20 self.assertEqual(len(ctx.sqs_queues), 1)21 self.assertTrue(ctx.sqs_queues[0].encrypted_at_rest)22 self.assertTrue(ctx.sqs_queues[0].arn)23 self.assertFalse(ctx.sqs_queues[0].resource_based_policy)24 self.assertEqual(ctx.sqs_queues[0].queue_name, 'sqs_encrypted')25 self.assertEqual(ctx.sqs_queues[0].kms_key, 'arn:aws:kms:us-east-1:115553109071:key/ed8cfa36-e3fe-4981-a565-ed4ecd238d50')26 @context(module_path="no_encryption")27 def test_no_encryption(self, ctx: AwsEnvironmentContext):28 self.assertEqual(len(ctx.sqs_queues), 1)29 sqs_queue = ctx.sqs_queues[0]30 self.assertFalse(sqs_queue.encrypted_at_rest)31 self.assertFalse(sqs_queue.resource_based_policy)32 self.assertFalse(sqs_queue.tags)33 @context(module_path="bad_policy")34 def test_bad_policy(self, ctx: AwsEnvironmentContext):35 sqs_queue = ctx.sqs_queues[0]36 self.assertEqual(sqs_queue.resource_based_policy.statements[0].actions[0], 'sqs:*')37 self.assertTrue(sqs_queue.resource_based_policy.queue_name)38 self.assertFalse(sqs_queue.encrypted_at_rest)39 self.assertTrue(sqs_queue.arn)40 self.assertEqual(sqs_queue.queue_name, 'cloudrail-not-secure-queue')41 @context(module_path="good_policy")42 def test_good_policy(self, ctx: AwsEnvironmentContext):43 sqs_queue = ctx.sqs_queues[0]44 self.assertEqual(sqs_queue.resource_based_policy.statements[0].actions[0], 'sqs:SendMessage')45 self.assertTrue(sqs_queue.resource_based_policy.queue_name)46 self.assertFalse(sqs_queue.encrypted_at_rest)47 self.assertTrue(sqs_queue.arn)48 self.assertEqual(sqs_queue.queue_name, 'cloudrail-secure-queue')49 @context(module_path="secure_policy_existing_queue", base_scanner_data_for_iac='account-data-existing-sqs-queue-secure-policy.zip',50 test_options=TestOptions(run_cloudmapper=False))51 def test_secure_policy_existing_queue(self, ctx: AwsEnvironmentContext):52 sqs_queue = ctx.sqs_queues[0]53 self.assertEqual(sqs_queue.resource_based_policy.statements[0].actions[0], 'sqs:SendMessage')54 self.assertTrue(sqs_queue.resource_based_policy.queue_name)55 self.assertFalse(sqs_queue.encrypted_at_rest)56 self.assertTrue(sqs_queue.arn)57 self.assertEqual(sqs_queue.queue_name, 'cloudrail-secure-queue_2')58 self.assertEqual(sqs_queue.account, '115553109071')59 self.assertEqual(sqs_queue.region, 'us-east-1')60 @context(module_path="bad_policy_policy_inside_main_resource")61 def test_bad_policy_policy_inside_main_resource(self, ctx: AwsEnvironmentContext):62 sqs_queue = ctx.sqs_queues[0]63 self.assertEqual(sqs_queue.resource_based_policy.statements[0].actions[0], 'sqs:*')64 self.assertTrue(sqs_queue.resource_based_policy.queue_name)65 self.assertFalse(sqs_queue.encrypted_at_rest)66 self.assertTrue(sqs_queue.arn)67 self.assertEqual(sqs_queue.queue_name, 'cloudrail-not-secure-queue')68 @context(module_path="with_tags")69 def test_with_tags(self, ctx: AwsEnvironmentContext):70 sqs = next((sqs for sqs in ctx.sqs_queues if sqs.queue_name == 'sqs_non_encrypted'), None)71 self.assertIsNotNone(sqs)...
test_sqs_client.py
Source:test_sqs_client.py
1import os2import pdb3from horey.aws_api.aws_clients.sqs_client import SQSClient4from horey.aws_api.aws_services_entities.sqs_queue import SQSQueue5from horey.h_logger import get_logger6from horey.aws_api.base_entities.aws_account import AWSAccount7from horey.common_utils.common_utils import CommonUtils8configuration_values_file_full_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "h_logger_configuration_values.py")9logger = get_logger(configuration_values_file_full_path=configuration_values_file_full_path)10accounts_file_full_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ignore", "aws_api_managed_accounts.py"))11accounts = CommonUtils.load_object_from_module(accounts_file_full_path, "main")12AWSAccount.set_aws_account(accounts["1111"])13AWSAccount.set_aws_region(accounts["1111"].regions['us-west-2'])14mock_values_file_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "ignore", "mock_values.py"))15mock_values = CommonUtils.load_object_from_module(mock_values_file_path, "main")16def test_init_client():17 assert isinstance(SQSClient(), SQSClient)18def test_provision_queue():19 client = SQSClient()20 sqs_queue = SQSQueue({})21 sqs_queue.region = AWSAccount.get_aws_region()22 sqs_queue.name = "sqs_queue_horey_test"23 sqs_queue.visibility_timeout = "30"24 sqs_queue.maximum_message_size = "262144"25 sqs_queue.message_retention_period = "604800"26 sqs_queue.delay_seconds = "0"27 sqs_queue.tags = {"name": sqs_queue.name}28 client.provision_queue(sqs_queue)29 assert sqs_queue.queue_url is not None30def test_provision_queue_dlq():31 client = SQSClient()32 sqs_queue = SQSQueue({})33 sqs_queue.region = AWSAccount.get_aws_region()34 sqs_queue.name = "sqs_queue_horey_test_dlq"35 sqs_queue.visibility_timeout = "30"36 sqs_queue.maximum_message_size = "262144"37 sqs_queue.message_retention_period = "604800"38 sqs_queue.delay_seconds = "0"39 sqs_queue.tags = {"name": sqs_queue.name}40 client.provision_queue(sqs_queue)41 assert sqs_queue.queue_url is not None42def test_provision_queue_update():43 client = SQSClient()44 sqs_queue = SQSQueue({})45 sqs_queue.region = AWSAccount.get_aws_region()46 sqs_queue.name = "sqs_queue_horey_test"47 sqs_queue.visibility_timeout = "30"48 sqs_queue.maximum_message_size = "262144"49 sqs_queue.message_retention_period = "604800"50 sqs_queue.delay_seconds = "0"51 sqs_queue.redrive_policy = "{\"deadLetterTargetArn\":\"" + mock_values["dlq_arn"] + "\",\"maxReceiveCount\":1000}"52 sqs_queue.tags = {"name": sqs_queue.name}53 client.provision_queue(sqs_queue)54 assert sqs_queue.queue_url is not None55if __name__ == "__main__":56 test_init_client()57 test_provision_queue_dlq()58 test_provision_queue()...
resource.py
Source:resource.py
...12 self.url = sqs_queue.url13 for method in getmembers(sqs_queue, ismethod):14 if not method[0].startswith('_') and method[0] not in ['receive_messages', 'send_message', 'send_messages']:15 setattr(self, method[0], method[1])16 def __get_sqs_queue(self):17 return self.__sqs_queue18 sqs_queue = property(__get_sqs_queue)19 def receive_jms_messages(self, **kwargs):20 kwargs['MessageAttributeNames'] = _add_required_message_attribute_names(kwargs.get('MessageAttributeNames') or [])21 return [ _create_jms_message(sqs_message) for sqs_message in self.sqs_queue.receive_messages(**kwargs) ]22 def send_bytes_message(self, JMSReplyTo=None, JMSCorrelationId=None, **kwargs):23 return self.sqs_queue.send_message(24 **_encode_jms_message(25 JMSMessageType=JMSMessageType.BYTE, 26 JMSReplyTo=JMSReplyTo, 27 JMSCorrelationId=JMSCorrelationId, 28 **kwargs)29 )30 def send_jms_messages(self, **kwargs):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!