Best Python code snippet using localstack_python
test_boto_elasticsearch_domain.py
Source:test_boto_elasticsearch_domain.py
1# -*- coding: utf-8 -*-2# Import Python libs3from __future__ import absolute_import, print_function, unicode_literals4import copy5import logging6import random7import string8# Import Salt Testing libs9from tests.support.mixins import LoaderModuleMockMixin10from tests.support.unit import skipIf, TestCase11from tests.support.mock import (12 NO_MOCK,13 NO_MOCK_REASON,14 MagicMock,15 patch16)17# Import Salt libs18from salt.ext import six19import salt.loader20from salt.utils.versions import LooseVersion21import salt.modules.boto_elasticsearch_domain as boto_elasticsearch_domain22# Import 3rd-party libs23# pylint: disable=import-error,no-name-in-module24try:25 import boto326 from botocore.exceptions import ClientError27 HAS_BOTO = True28except ImportError:29 HAS_BOTO = False30from salt.ext.six.moves import range31# pylint: enable=import-error,no-name-in-module32# the boto_elasticsearch_domain module relies on the connect_to_region() method33# which was added in boto 2.8.034# https://github.com/boto/boto/commit/33ac26b416fbb48a60602542b4ce15dcc7029f1235required_boto3_version = '1.2.1'36def _has_required_boto():37 '''38 Returns True/False boolean depending on if Boto is installed and correct39 version.40 '''41 if not HAS_BOTO:42 return False43 elif LooseVersion(boto3.__version__) < LooseVersion(required_boto3_version):44 return False45 else:46 return True47if _has_required_boto():48 region = 'us-east-1'49 access_key = 'GKTADJGHEIQSXMKKRBJ08H'50 secret_key = 'askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs'51 conn_parameters = {'region': region, 'key': access_key, 'keyid': secret_key, 'profile': {}}52 error_message = 'An error occurred (101) when calling the {0} operation: Test-defined error'53 error_content = {54 'Error': {55 'Code': 101,56 'Message': "Test-defined error"57 }58 }59 not_found_error = ClientError({60 'Error': {61 'Code': 'ResourceNotFoundException',62 'Message': "Test-defined error"63 }64 }, 'msg')65 domain_ret = dict(DomainName='testdomain',66 ElasticsearchClusterConfig={},67 EBSOptions={},68 AccessPolicies={},69 SnapshotOptions={},70 AdvancedOptions={})71log = logging.getLogger(__name__)72class BotoElasticsearchDomainTestCaseBase(TestCase, LoaderModuleMockMixin):73 conn = None74 def setup_loader_modules(self):75 self.opts = salt.config.DEFAULT_MINION_OPTS76 utils = salt.loader.utils(self.opts, whitelist=['boto3'], context={})77 return {boto_elasticsearch_domain: {'__utils__': utils}}78 def setUp(self):79 super(BotoElasticsearchDomainTestCaseBase, self).setUp()80 boto_elasticsearch_domain.__init__(self.opts)81 del self.opts82 # Set up MagicMock to replace the boto3 session83 # connections keep getting cached from prior tests, can't find the84 # correct context object to clear it. So randomize the cache key, to prevent any85 # cache hits86 conn_parameters['key'] = ''.join(random.choice(string.ascii_lowercase + string.digits) for _ in range(50))87 self.patcher = patch('boto3.session.Session')88 self.addCleanup(self.patcher.stop)89 self.addCleanup(delattr, self, 'patcher')90 mock_session = self.patcher.start()91 session_instance = mock_session.return_value92 self.conn = MagicMock()93 self.addCleanup(delattr, self, 'conn')94 session_instance.client.return_value = self.conn95class BotoElasticsearchDomainTestCaseMixin(object):96 pass97@skipIf(True, 'Skip these tests while investigating failures')98@skipIf(HAS_BOTO is False, 'The boto module must be installed.')99@skipIf(_has_required_boto() is False, 'The boto3 module must be greater than'100 ' or equal to version {0}'101 .format(required_boto3_version))102@skipIf(NO_MOCK, NO_MOCK_REASON)103class BotoElasticsearchDomainTestCase(BotoElasticsearchDomainTestCaseBase, BotoElasticsearchDomainTestCaseMixin):104 '''105 TestCase for salt.modules.boto_elasticsearch_domain module106 '''107 def test_that_when_checking_if_a_domain_exists_and_a_domain_exists_the_domain_exists_method_returns_true(self):108 '''109 Tests checking domain existence when the domain already exists110 '''111 result = boto_elasticsearch_domain.exists(DomainName='testdomain', **conn_parameters)112 self.assertTrue(result['exists'])113 def test_that_when_checking_if_a_domain_exists_and_a_domain_does_not_exist_the_domain_exists_method_returns_false(self):114 '''115 Tests checking domain existence when the domain does not exist116 '''117 self.conn.describe_elasticsearch_domain.side_effect = not_found_error118 result = boto_elasticsearch_domain.exists(DomainName='mydomain', **conn_parameters)119 self.assertFalse(result['exists'])120 def test_that_when_checking_if_a_domain_exists_and_boto3_returns_an_error_the_domain_exists_method_returns_error(self):121 '''122 Tests checking domain existence when boto returns an error123 '''124 self.conn.describe_elasticsearch_domain.side_effect = ClientError(error_content, 'list_domains')125 result = boto_elasticsearch_domain.exists(DomainName='mydomain', **conn_parameters)126 self.assertEqual(result.get('error', {}).get('message'), error_message.format('list_domains'))127 def test_that_when_checking_domain_status_and_a_domain_exists_the_domain_status_method_returns_info(self):128 '''129 Tests checking domain existence when the domain already exists130 '''131 self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}132 result = boto_elasticsearch_domain.status(DomainName='testdomain', **conn_parameters)133 self.assertTrue(result['domain'])134 def test_that_when_checking_domain_status_and_boto3_returns_an_error_the_domain_status_method_returns_error(self):135 '''136 Tests checking domain existence when boto returns an error137 '''138 self.conn.describe_elasticsearch_domain.side_effect = ClientError(error_content, 'list_domains')139 result = boto_elasticsearch_domain.status(DomainName='mydomain', **conn_parameters)140 self.assertEqual(result.get('error', {}).get('message'), error_message.format('list_domains'))141 def test_that_when_describing_domain_it_returns_the_dict_of_properties_returns_true(self):142 '''143 Tests describing parameters if domain exists144 '''145 domainconfig = {}146 for k, v in six.iteritems(domain_ret):147 if k == 'DomainName':148 continue149 domainconfig[k] = {'Options': v}150 self.conn.describe_elasticsearch_domain_config.return_value = {'DomainConfig': domainconfig}151 result = boto_elasticsearch_domain.describe(DomainName=domain_ret['DomainName'], **conn_parameters)152 log.warning(result)153 desired_ret = copy.copy(domain_ret)154 desired_ret.pop('DomainName')155 self.assertEqual(result, {'domain': desired_ret})156 def test_that_when_describing_domain_on_client_error_it_returns_error(self):157 '''158 Tests describing parameters failure159 '''160 self.conn.describe_elasticsearch_domain_config.side_effect = ClientError(error_content, 'list_domains')161 result = boto_elasticsearch_domain.describe(DomainName='testdomain', **conn_parameters)162 self.assertTrue('error' in result)163 def test_that_when_creating_a_domain_succeeds_the_create_domain_method_returns_true(self):164 '''165 tests True domain created.166 '''167 self.conn.create_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}168 args = copy.copy(domain_ret)169 args.update(conn_parameters)170 result = boto_elasticsearch_domain.create(**args)171 self.assertTrue(result['created'])172 def test_that_when_creating_a_domain_fails_the_create_domain_method_returns_error(self):173 '''174 tests False domain not created.175 '''176 self.conn.create_elasticsearch_domain.side_effect = ClientError(error_content, 'create_domain')177 args = copy.copy(domain_ret)178 args.update(conn_parameters)179 result = boto_elasticsearch_domain.create(**args)180 self.assertEqual(result.get('error', {}).get('message'), error_message.format('create_domain'))181 def test_that_when_deleting_a_domain_succeeds_the_delete_domain_method_returns_true(self):182 '''183 tests True domain deleted.184 '''185 result = boto_elasticsearch_domain.delete(DomainName='testdomain',186 **conn_parameters)187 self.assertTrue(result['deleted'])188 def test_that_when_deleting_a_domain_fails_the_delete_domain_method_returns_false(self):189 '''190 tests False domain not deleted.191 '''192 self.conn.delete_elasticsearch_domain.side_effect = ClientError(error_content, 'delete_domain')193 result = boto_elasticsearch_domain.delete(DomainName='testdomain',194 **conn_parameters)195 self.assertFalse(result['deleted'])196 def test_that_when_updating_a_domain_succeeds_the_update_domain_method_returns_true(self):197 '''198 tests True domain updated.199 '''200 self.conn.update_elasticsearch_domain_config.return_value = {'DomainConfig': domain_ret}201 args = copy.copy(domain_ret)202 args.update(conn_parameters)203 result = boto_elasticsearch_domain.update(**args)204 self.assertTrue(result['updated'])205 def test_that_when_updating_a_domain_fails_the_update_domain_method_returns_error(self):206 '''207 tests False domain not updated.208 '''209 self.conn.update_elasticsearch_domain_config.side_effect = ClientError(error_content, 'update_domain')210 args = copy.copy(domain_ret)211 args.update(conn_parameters)212 result = boto_elasticsearch_domain.update(**args)213 self.assertEqual(result.get('error', {}).get('message'), error_message.format('update_domain'))214 def test_that_when_adding_tags_succeeds_the_add_tags_method_returns_true(self):215 '''216 tests True tags added.217 '''218 self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}219 result = boto_elasticsearch_domain.add_tags(DomainName='testdomain', a='b', **conn_parameters)220 self.assertTrue(result['tagged'])221 def test_that_when_adding_tags_fails_the_add_tags_method_returns_false(self):222 '''223 tests False tags not added.224 '''225 self.conn.add_tags.side_effect = ClientError(error_content, 'add_tags')226 self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}227 result = boto_elasticsearch_domain.add_tags(DomainName=domain_ret['DomainName'], a='b', **conn_parameters)228 self.assertFalse(result['tagged'])229 def test_that_when_removing_tags_succeeds_the_remove_tags_method_returns_true(self):230 '''231 tests True tags removed.232 '''233 self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}234 result = boto_elasticsearch_domain.remove_tags(DomainName=domain_ret['DomainName'], TagKeys=['a'], **conn_parameters)235 self.assertTrue(result['tagged'])236 def test_that_when_removing_tags_fails_the_remove_tags_method_returns_false(self):237 '''238 tests False tags not removed.239 '''240 self.conn.remove_tags.side_effect = ClientError(error_content, 'remove_tags')241 self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}242 result = boto_elasticsearch_domain.remove_tags(DomainName=domain_ret['DomainName'], TagKeys=['b'], **conn_parameters)243 self.assertFalse(result['tagged'])244 def test_that_when_listing_tags_succeeds_the_list_tags_method_returns_true(self):245 '''246 tests True tags listed.247 '''248 self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}249 result = boto_elasticsearch_domain.list_tags(DomainName=domain_ret['DomainName'], **conn_parameters)250 self.assertEqual(result['tags'], {})251 def test_that_when_listing_tags_fails_the_list_tags_method_returns_false(self):252 '''253 tests False tags not listed.254 '''255 self.conn.list_tags.side_effect = ClientError(error_content, 'list_tags')256 self.conn.describe_elasticsearch_domain.return_value = {'DomainStatus': domain_ret}257 result = boto_elasticsearch_domain.list_tags(DomainName=domain_ret['DomainName'], **conn_parameters)...
aws_es_info.py
Source:aws_es_info.py
...94 return client.list_packages_for_domain(95 DomainName=module.params['name'],96 ), False97 elif module.params['describe_elasticsearch_domain']:98 return client.describe_elasticsearch_domain(99 DomainName=module.params['name'],100 ), False101 elif module.params['describe_elasticsearch_domain_config']:102 return client.describe_elasticsearch_domain_config(103 DomainName=module.params['name'],104 ), False105 else:106 if client.can_paginate('list_domain_names'):107 paginator = client.get_paginator('list_domain_names')108 return paginator.paginate(), True109 else:110 return client.list_domain_names(), False111 except (BotoCoreError, ClientError) as e:112 module.fail_json_aws(e, msg='Failed to fetch Amazon ES details')...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!