Best Python code snippet using localstack_python
test_discovery.py
Source:test_discovery.py
1import time2from botocore.awsrequest import AWSRequest3from botocore.client import ClientMeta4from botocore.discovery import (5 EndpointDiscoveryHandler,6 EndpointDiscoveryManager,7 EndpointDiscoveryRefreshFailed,8 EndpointDiscoveryRequired,9 block_endpoint_discovery_required_operations,10)11from botocore.exceptions import ConnectionError12from botocore.handlers import inject_api_version_header_if_needed13from botocore.hooks import HierarchicalEmitter14from botocore.model import ServiceModel15from tests import mock, unittest16class BaseEndpointDiscoveryTest(unittest.TestCase):17 def setUp(self):18 self.service_description = {19 'version': '2.0',20 'metadata': {21 'apiVersion': '2018-08-31',22 'endpointPrefix': 'fooendpoint',23 'jsonVersion': '1.1',24 'protocol': 'json',25 'serviceAbbreviation': 'FooService',26 'serviceId': 'FooService',27 'serviceFullName': 'AwsFooService',28 'signatureVersion': 'v4',29 'signingName': 'awsfooservice',30 'targetPrefix': 'awsfooservice',31 },32 'operations': {33 'DescribeEndpoints': {34 'name': 'DescribeEndpoints',35 'http': {'method': 'POST', 'requestUri': '/'},36 'input': {'shape': 'DescribeEndpointsRequest'},37 'output': {'shape': 'DescribeEndpointsResponse'},38 'endpointoperation': True,39 },40 'TestDiscoveryRequired': {41 'name': 'TestDiscoveryRequired',42 'http': {'method': 'POST', 'requestUri': '/'},43 'input': {'shape': 'TestDiscoveryIdsRequest'},44 'output': {'shape': 'EmptyStruct'},45 'endpointdiscovery': {'required': True},46 },47 'TestDiscoveryOptional': {48 'name': 'TestDiscoveryOptional',49 'http': {'method': 'POST', 'requestUri': '/'},50 'input': {'shape': 'TestDiscoveryIdsRequest'},51 'output': {'shape': 'EmptyStruct'},52 'endpointdiscovery': {},53 },54 'TestDiscovery': {55 'name': 'TestDiscovery',56 'http': {'method': 'POST', 'requestUri': '/'},57 'input': {'shape': 'EmptyStruct'},58 'output': {'shape': 'EmptyStruct'},59 'endpointdiscovery': {},60 },61 },62 'shapes': {63 'Boolean': {'type': 'boolean'},64 'DescribeEndpointsRequest': {65 'type': 'structure',66 'members': {67 'Operation': {'shape': 'String'},68 'Identifiers': {'shape': 'Identifiers'},69 },70 },71 'DescribeEndpointsResponse': {72 'type': 'structure',73 'required': ['Endpoints'],74 'members': {'Endpoints': {'shape': 'Endpoints'}},75 },76 'Endpoint': {77 'type': 'structure',78 'required': ['Address', 'CachePeriodInMinutes'],79 'members': {80 'Address': {'shape': 'String'},81 'CachePeriodInMinutes': {'shape': 'Long'},82 },83 },84 'Endpoints': {'type': 'list', 'member': {'shape': 'Endpoint'}},85 'Identifiers': {86 'type': 'map',87 'key': {'shape': 'String'},88 'value': {'shape': 'String'},89 },90 'Long': {'type': 'long'},91 'String': {'type': 'string'},92 'TestDiscoveryIdsRequest': {93 'type': 'structure',94 'required': ['Foo', 'Nested'],95 'members': {96 'Foo': {97 'shape': 'String',98 'endpointdiscoveryid': True,99 },100 'Baz': {'shape': 'String'},101 'Nested': {'shape': 'Nested'},102 },103 },104 'EmptyStruct': {'type': 'structure', 'members': {}},105 'Nested': {106 'type': 'structure',107 'required': 'Bar',108 'members': {109 'Bar': {110 'shape': 'String',111 'endpointdiscoveryid': True,112 }113 },114 },115 },116 }117class TestEndpointDiscoveryManager(BaseEndpointDiscoveryTest):118 def setUp(self):119 super().setUp()120 self.construct_manager()121 def construct_manager(self, cache=None, time=None, side_effect=None):122 self.service_model = ServiceModel(self.service_description)123 self.meta = mock.Mock(spec=ClientMeta)124 self.meta.service_model = self.service_model125 self.client = mock.Mock()126 if side_effect is None:127 side_effect = [128 {129 'Endpoints': [130 {131 'Address': 'new.com',132 'CachePeriodInMinutes': 2,133 }134 ]135 }136 ]137 self.client.describe_endpoints.side_effect = side_effect138 self.client.meta = self.meta139 self.manager = EndpointDiscoveryManager(140 self.client, cache=cache, current_time=time141 )142 def test_injects_api_version_if_endpoint_operation(self):143 model = self.service_model.operation_model('DescribeEndpoints')144 params = {'headers': {}}145 inject_api_version_header_if_needed(model, params)146 self.assertEqual(147 params['headers'].get('x-amz-api-version'), '2018-08-31'148 )149 def test_no_inject_api_version_if_not_endpoint_operation(self):150 model = self.service_model.operation_model('TestDiscoveryRequired')151 params = {'headers': {}}152 inject_api_version_header_if_needed(model, params)153 self.assertNotIn('x-amz-api-version', params['headers'])154 def test_gather_identifiers(self):155 params = {'Foo': 'value1', 'Nested': {'Bar': 'value2'}}156 operation = self.service_model.operation_model('TestDiscoveryRequired')157 ids = self.manager.gather_identifiers(operation, params)158 self.assertEqual(ids, {'Foo': 'value1', 'Bar': 'value2'})159 def test_gather_identifiers_none(self):160 operation = self.service_model.operation_model('TestDiscovery')161 ids = self.manager.gather_identifiers(operation, {})162 self.assertEqual(ids, {})163 def test_describe_endpoint(self):164 kwargs = {165 'Operation': 'FooBar',166 'Identifiers': {'Foo': 'value1', 'Bar': 'value2'},167 }168 self.manager.describe_endpoint(**kwargs)169 self.client.describe_endpoints.assert_called_with(**kwargs)170 def test_describe_endpoint_no_input(self):171 describe = self.service_description['operations']['DescribeEndpoints']172 del describe['input']173 self.construct_manager()174 self.manager.describe_endpoint(Operation='FooBar', Identifiers={})175 self.client.describe_endpoints.assert_called_with()176 def test_describe_endpoint_empty_input(self):177 describe = self.service_description['operations']['DescribeEndpoints']178 describe['input'] = {'shape': 'EmptyStruct'}179 self.construct_manager()180 self.manager.describe_endpoint(Operation='FooBar', Identifiers={})181 self.client.describe_endpoints.assert_called_with()182 def test_describe_endpoint_ids_and_operation(self):183 cache = {}184 self.construct_manager(cache=cache)185 ids = {'Foo': 'value1', 'Bar': 'value2'}186 kwargs = {187 'Operation': 'TestDiscoveryRequired',188 'Identifiers': ids,189 }190 self.manager.describe_endpoint(**kwargs)191 self.client.describe_endpoints.assert_called_with(**kwargs)192 key = ((('Bar', 'value2'), ('Foo', 'value1')), 'TestDiscoveryRequired')193 self.assertIn(key, cache)194 self.assertEqual(cache[key][0]['Address'], 'new.com')195 self.manager.describe_endpoint(**kwargs)196 call_count = self.client.describe_endpoints.call_count197 self.assertEqual(call_count, 1)198 def test_describe_endpoint_no_ids_or_operation(self):199 cache = {}200 describe = self.service_description['operations']['DescribeEndpoints']201 describe['input'] = {'shape': 'EmptyStruct'}202 self.construct_manager(cache=cache)203 self.manager.describe_endpoint(204 Operation='TestDiscoveryRequired', Identifiers={}205 )206 self.client.describe_endpoints.assert_called_with()207 key = ()208 self.assertIn(key, cache)209 self.assertEqual(cache[key][0]['Address'], 'new.com')210 self.manager.describe_endpoint(211 Operation='TestDiscoveryRequired', Identifiers={}212 )213 call_count = self.client.describe_endpoints.call_count214 self.assertEqual(call_count, 1)215 def test_describe_endpoint_expired_entry(self):216 current_time = time.time()217 key = ()218 cache = {219 key: [{'Address': 'old.com', 'Expiration': current_time - 10}]220 }221 self.construct_manager(cache=cache)222 kwargs = {223 'Identifiers': {},224 'Operation': 'TestDiscoveryRequired',225 }226 self.manager.describe_endpoint(**kwargs)227 self.client.describe_endpoints.assert_called_with()228 self.assertIn(key, cache)229 self.assertEqual(cache[key][0]['Address'], 'new.com')230 self.manager.describe_endpoint(**kwargs)231 call_count = self.client.describe_endpoints.call_count232 self.assertEqual(call_count, 1)233 def test_describe_endpoint_cache_expiration(self):234 def _time():235 return float(0)236 cache = {}237 self.construct_manager(cache=cache, time=_time)238 self.manager.describe_endpoint(239 Operation='TestDiscoveryRequired', Identifiers={}240 )241 key = ()242 self.assertIn(key, cache)243 self.assertEqual(cache[key][0]['Expiration'], float(120))244 def test_delete_endpoints_present(self):245 key = ()246 cache = {key: [{'Address': 'old.com', 'Expiration': 0}]}247 self.construct_manager(cache=cache)248 kwargs = {249 'Identifiers': {},250 'Operation': 'TestDiscoveryRequired',251 }252 self.manager.delete_endpoints(**kwargs)253 self.assertEqual(cache, {})254 def test_delete_endpoints_absent(self):255 cache = {}256 self.construct_manager(cache=cache)257 kwargs = {258 'Identifiers': {},259 'Operation': 'TestDiscoveryRequired',260 }261 self.manager.delete_endpoints(**kwargs)262 self.assertEqual(cache, {})263 def test_describe_endpoint_optional_fails_no_cache(self):264 side_effect = [ConnectionError(error=None)]265 self.construct_manager(side_effect=side_effect)266 kwargs = {'Operation': 'TestDiscoveryOptional'}267 endpoint = self.manager.describe_endpoint(**kwargs)268 self.assertIsNone(endpoint)269 # This second call should be blocked as we just failed270 endpoint = self.manager.describe_endpoint(**kwargs)271 self.assertIsNone(endpoint)272 self.client.describe_endpoints.call_args_list == [mock.call()]273 def test_describe_endpoint_optional_fails_stale_cache(self):274 key = ()275 cache = {key: [{'Address': 'old.com', 'Expiration': 0}]}276 side_effect = [ConnectionError(error=None)] * 2277 self.construct_manager(cache=cache, side_effect=side_effect)278 kwargs = {'Operation': 'TestDiscoveryOptional'}279 endpoint = self.manager.describe_endpoint(**kwargs)280 self.assertEqual(endpoint, 'old.com')281 # This second call shouldn't go through as we just failed282 endpoint = self.manager.describe_endpoint(**kwargs)283 self.assertEqual(endpoint, 'old.com')284 self.client.describe_endpoints.call_args_list == [mock.call()]285 def test_describe_endpoint_required_fails_no_cache(self):286 side_effect = [ConnectionError(error=None)] * 2287 self.construct_manager(side_effect=side_effect)288 kwargs = {'Operation': 'TestDiscoveryRequired'}289 with self.assertRaises(EndpointDiscoveryRefreshFailed):290 self.manager.describe_endpoint(**kwargs)291 # This second call should go through, as we have no cache292 with self.assertRaises(EndpointDiscoveryRefreshFailed):293 self.manager.describe_endpoint(**kwargs)294 describe_count = self.client.describe_endpoints.call_count295 self.assertEqual(describe_count, 2)296 def test_describe_endpoint_required_fails_stale_cache(self):297 key = ()298 cache = {key: [{'Address': 'old.com', 'Expiration': 0}]}299 side_effect = [ConnectionError(error=None)] * 2300 self.construct_manager(cache=cache, side_effect=side_effect)301 kwargs = {'Operation': 'TestDiscoveryRequired'}302 endpoint = self.manager.describe_endpoint(**kwargs)303 self.assertEqual(endpoint, 'old.com')304 # We have a stale endpoint, so this shouldn't fail or force a refresh305 endpoint = self.manager.describe_endpoint(**kwargs)306 self.assertEqual(endpoint, 'old.com')307 self.client.describe_endpoints.call_args_list == [mock.call()]308 def test_describe_endpoint_required_force_refresh_success(self):309 side_effect = [310 ConnectionError(error=None),311 {312 'Endpoints': [313 {314 'Address': 'new.com',315 'CachePeriodInMinutes': 2,316 }317 ]318 },319 ]320 self.construct_manager(side_effect=side_effect)321 kwargs = {'Operation': 'TestDiscoveryRequired'}322 # First call will fail323 with self.assertRaises(EndpointDiscoveryRefreshFailed):324 self.manager.describe_endpoint(**kwargs)325 self.client.describe_endpoints.call_args_list == [mock.call()]326 # Force a refresh if the cache is empty but discovery is required327 endpoint = self.manager.describe_endpoint(**kwargs)328 self.assertEqual(endpoint, 'new.com')329 def test_describe_endpoint_retries_after_failing(self):330 fake_time = mock.Mock()331 fake_time.side_effect = [0, 100, 200]332 side_effect = [333 ConnectionError(error=None),334 {335 'Endpoints': [336 {337 'Address': 'new.com',338 'CachePeriodInMinutes': 2,339 }340 ]341 },342 ]343 self.construct_manager(side_effect=side_effect, time=fake_time)344 kwargs = {'Operation': 'TestDiscoveryOptional'}345 endpoint = self.manager.describe_endpoint(**kwargs)346 self.assertIsNone(endpoint)347 self.client.describe_endpoints.call_args_list == [mock.call()]348 # Second time should try again as enough time has elapsed349 endpoint = self.manager.describe_endpoint(**kwargs)350 self.assertEqual(endpoint, 'new.com')351class TestEndpointDiscoveryHandler(BaseEndpointDiscoveryTest):352 def setUp(self):353 super().setUp()354 self.manager = mock.Mock(spec=EndpointDiscoveryManager)355 self.handler = EndpointDiscoveryHandler(self.manager)356 self.service_model = ServiceModel(self.service_description)357 def test_register_handler(self):358 events = mock.Mock(spec=HierarchicalEmitter)359 self.handler.register(events, 'foo-bar')360 events.register.assert_any_call(361 'before-parameter-build.foo-bar', self.handler.gather_identifiers362 )363 events.register.assert_any_call(364 'needs-retry.foo-bar', self.handler.handle_retries365 )366 events.register_first.assert_called_with(367 'request-created.foo-bar', self.handler.discover_endpoint368 )369 def test_discover_endpoint(self):370 request = AWSRequest()371 request.context = {'discovery': {'identifiers': {}}}372 self.manager.describe_endpoint.return_value = 'https://new.foo'373 self.handler.discover_endpoint(request, 'TestOperation')374 self.assertEqual(request.url, 'https://new.foo')375 self.manager.describe_endpoint.assert_called_with(376 Operation='TestOperation', Identifiers={}377 )378 def test_discover_endpoint_fails(self):379 request = AWSRequest()380 request.context = {'discovery': {'identifiers': {}}}381 request.url = 'old.com'382 self.manager.describe_endpoint.return_value = None383 self.handler.discover_endpoint(request, 'TestOperation')384 self.assertEqual(request.url, 'old.com')385 self.manager.describe_endpoint.assert_called_with(386 Operation='TestOperation', Identifiers={}387 )388 def test_discover_endpoint_no_protocol(self):389 request = AWSRequest()390 request.context = {'discovery': {'identifiers': {}}}391 self.manager.describe_endpoint.return_value = 'new.foo'392 self.handler.discover_endpoint(request, 'TestOperation')393 self.assertEqual(request.url, 'https://new.foo')394 self.manager.describe_endpoint.assert_called_with(395 Operation='TestOperation', Identifiers={}396 )397 def test_inject_no_context(self):398 request = AWSRequest(url='https://original.foo')399 self.handler.discover_endpoint(request, 'TestOperation')400 self.assertEqual(request.url, 'https://original.foo')401 self.manager.describe_endpoint.assert_not_called()402 def test_gather_identifiers(self):403 context = {}404 params = {'Foo': 'value1', 'Nested': {'Bar': 'value2'}}405 ids = {'Foo': 'value1', 'Bar': 'value2'}406 model = self.service_model.operation_model('TestDiscoveryRequired')407 self.manager.gather_identifiers.return_value = ids408 self.handler.gather_identifiers(params, model, context)409 self.assertEqual(context['discovery']['identifiers'], ids)410 def test_gather_identifiers_not_discoverable(self):411 context = {}412 model = self.service_model.operation_model('DescribeEndpoints')413 self.handler.gather_identifiers({}, model, context)414 self.assertEqual(context, {})415 def test_discovery_disabled_but_required(self):416 model = self.service_model.operation_model('TestDiscoveryRequired')417 with self.assertRaises(EndpointDiscoveryRequired):418 block_endpoint_discovery_required_operations(model)419 def test_discovery_disabled_but_optional(self):420 context = {}421 model = self.service_model.operation_model('TestDiscoveryOptional')422 block_endpoint_discovery_required_operations(model, context=context)423 self.assertEqual(context, {})424 def test_does_not_retry_no_response(self):425 retry = self.handler.handle_retries(None, None, None)426 self.assertIsNone(retry)427 def test_does_not_retry_other_errors(self):428 parsed_response = {'ResponseMetadata': {'HTTPStatusCode': 200}}429 response = (None, parsed_response)430 retry = self.handler.handle_retries(None, response, None)431 self.assertIsNone(retry)432 def test_does_not_retry_if_no_context(self):433 request_dict = {'context': {}}434 parsed_response = {'ResponseMetadata': {'HTTPStatusCode': 421}}435 response = (None, parsed_response)436 retry = self.handler.handle_retries(request_dict, response, None)437 self.assertIsNone(retry)438 def _assert_retries(self, parsed_response):439 request_dict = {'context': {'discovery': {'identifiers': {}}}}440 response = (None, parsed_response)441 model = self.service_model.operation_model('TestDiscoveryOptional')442 retry = self.handler.handle_retries(request_dict, response, model)443 self.assertEqual(retry, 0)444 self.manager.delete_endpoints.assert_called_with(445 Operation='TestDiscoveryOptional', Identifiers={}446 )447 def test_retries_421_status_code(self):448 parsed_response = {'ResponseMetadata': {'HTTPStatusCode': 421}}449 self._assert_retries(parsed_response)450 def test_retries_invalid_endpoint_exception(self):451 parsed_response = {'Error': {'Code': 'InvalidEndpointException'}}...
mediaconvert_utilities.py
Source:mediaconvert_utilities.py
...36 "mediaconvert", profile_name=profile_name, endpoint_url=endpoint_url37 )38 self.sm_client = None39 self.env_name = utils.get_environment_name()40 def describe_endpoints(self, **kwargs):41 """42 This is an one-off use method to get your account MediaConvert API endpoint and store it in43 SecretsManager44 https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mediaconvert.html#MediaConvert.Client.describe_endpoints45 """46 response = self.client.describe_endpoints(**kwargs)47 assert (48 response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK49 ), f"Call to describe_endpoints failed with response: {response}"50 first_endpoint = response["Endpoints"][0]51 if self.sm_client is None:52 self.sm_client = utils.SecretsManager()53 self.sm_client.create_or_update_secret(ENDPOINT_SECRET_NAME, first_endpoint)54 def create_audio_extraction_job(55 self, input_bucket_name, input_file_s3_key, **kwargs56 ):57 folders = os.path.split(input_file_s3_key)[0]58 aws_account_number = utils.get_secret("aws-account")["number"]59 response = self.client.create_job(60 Role=f"arn:aws:iam::{aws_account_number}:role/MediaConvert_Default_Role",61 Settings={62 "OutputGroups": [63 {64 "Name": "File Group",65 "Outputs": [66 {67 "ContainerSettings": {"Container": "RAW"},68 "AudioDescriptions": [69 {70 "AudioTypeControl": "FOLLOW_INPUT",71 "AudioSourceName": "Audio Selector 1",72 "CodecSettings": {73 "Codec": "MP3",74 "Mp3Settings": {75 "Bitrate": 64000,76 "Channels": 1,77 "RateControlMode": "CBR",78 "SampleRate": 44100,79 },80 },81 "LanguageCodeControl": "FOLLOW_INPUT",82 }83 ],84 }85 ],86 "OutputGroupSettings": {87 "Type": "FILE_GROUP_SETTINGS",88 "FileGroupSettings": {89 "Destination": f"s3://{STACK_NAME}-{self.env_name}-interview-audio/{folders}/$fn$"90 },91 },92 }93 ],94 "AdAvailOffset": 0,95 "Inputs": [96 {97 "AudioSelectors": {98 "Audio Selector 1": {99 "Offset": 0,100 "DefaultSelection": "DEFAULT",101 "ProgramSelection": 1,102 }103 },104 "FilterEnable": "AUTO",105 "PsiControl": "USE_PSI",106 "FilterStrength": 0,107 "DeblockFilter": "DISABLED",108 "DenoiseFilter": "DISABLED",109 "TimecodeSource": "EMBEDDED",110 "FileInput": f"s3://{input_bucket_name}/{input_file_s3_key}",111 }112 ],113 },114 StatusUpdateInterval="SECONDS_60",115 **kwargs,116 )117 assert (118 response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.CREATED119 ), f"MediaConvert create_job call failed with response: {response}"120 return response121 def list_jobs(self, **kwargs):122 response = self.client.list_jobs(**kwargs)123 assert (124 response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK125 ), f"MediaConvert list_jobs call failed with response: {response}"126 return response127if __name__ == "__main__":128 mediaconvert_client = MediaConvertClient(endpoint_url=None)...
aws_timestream_query_info.py
Source:aws_timestream_query_info.py
...49 if client.can_paginate('describe_endpoints'):50 paginator = client.get_paginator('describe_endpoints')51 return paginator.paginate(), True52 else:53 return client.describe_endpoints(), False54 else:55 return None, False56 except (BotoCoreError, ClientError) as e:57 module.fail_json_aws(e, msg='Failed to fetch Amazon Timestream Query details')58def main():59 argument_spec = dict(60 describe_endpoints=dict(required=False, type=bool),61 )62 module = AnsibleAWSModule(63 argument_spec=argument_spec,64 required_if=(),65 mutually_exclusive=[],66 )67 client = module.client('timestream-query', retry_decorator=AWSRetry.exponential_backoff())...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!