Best Python code snippet using localstack_python
test_updatableRunbookProvider.py
Source:test_updatableRunbookProvider.py
1#!/usr/bin/python2###############################################################################3# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. #4# #5# Licensed under the Apache License Version 2.0 (the "License"). You may not #6# use this file except in compliance with the License. A copy of the License #7# is located at #8# #9# http://www.apache.org/licenses/LICENSE-2.0/ #10# #11# or in the "license" file accompanying this file. This file is distributed #12# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express #13# or implied. See the License for the specific language governing permis- #14# sions and limitations under the License. #15###############################################################################16from updatableRunbookProvider import lambda_handler, UpdatableRunbook, get_ssm_client17import cfnresponse18import boto319from botocore.stub import Stubber, ANY20from botocore.exceptions import ClientError21from botocore.config import Config22import botocore.session23from pytest_mock import mocker24import pytest25from datetime import datetime26import random27import os28import copy29os.environ['AWS_REGION'] = 'us-east-1'30os.environ['AWS_PARTITION'] = 'aws'31@pytest.fixture()32def context():33 name = 'SO0111-SHARR-updatableRunbookProvider'34 version = 'v1.0.0'35 yield {36 'function_name': name,37 'function_version': version,38 'invoked_function_arn': f"arn:aws:lambda:us-east-1:123456789012:function:{name}:{version}",39 'memory_limit_in_mb': float('inf'),40 'log_group_name': 'test-group',41 'log_stream_name': 'test-stream',42 'client_context': None,43 'aws_request_id': '-'.join([''.join([random.choice('0123456789abcdef') for _ in range(0, n)]) for n in [8, 4, 4, 4, 12]])44 }45@pytest.fixture()46def mock_provider(mocker):47 mock = mocker.patch('updatableRunbookProvider.UpdatableRunbook')48 name = 'test-resource-name'49 mock.return_value.create.return_value = name50 mock.return_value.physical_resource_id.return_value = name51 mock.return_value.update.return_value = 152 yield mock53@pytest.fixture()54def mock_provider_exception(mocker, mock_provider):55 mock_provider.return_value.create.side_effect = Exception()56 mock_provider.return_value.update.side_effect = Exception()57 mock_provider.return_value.delete.side_effect = Exception()58 yield mock_provider59@pytest.fixture()60def mock_cfnresponse(mocker):61 yield mocker.patch('cfnresponse.send')62def event_from_action(action):63 return {64 'RequestType': action,65 'RequestId': 'request_id',66 'ResponseURL': 'https://bogus',67 'ResourceType': 'Custom::UpdatableRunbook',68 'LogicalResourceId': 'resource_id',69 'StackId': 'stack_id',70 'ResourceProperties': {71 'Content': 'runbook_content',72 'Name': 'runbook_name',73 'VersionName': 'version_name',74 'DocumentType': 'Automation',75 'DocumentFormat': 'YAML'76 }77 }78@pytest.fixture()79def event_create():80 yield event_from_action('Create')81@pytest.fixture()82def event_update():83 yield event_from_action('Update')84@pytest.fixture()85def event_delete():86 yield event_from_action('Delete')87@pytest.fixture()88def event_invalid():89 yield event_from_action('Invalid')90def test_lambda_handler_create(mock_provider, mock_cfnresponse, event_create, context):91 lambda_handler(event_create, context)92 mock_provider.assert_called_once()93 mock_provider.return_value.create.assert_called_once()94 resource_id = mock_provider.return_value.physical_resource_id.return_value95 response_data = {'Name': resource_id}96 mock_cfnresponse.assert_called_once_with(event_create, context, cfnresponse.SUCCESS, response_data, resource_id)97def test_lambda_handler_update(mock_provider, mock_cfnresponse, event_update, context):98 lambda_handler(event_update, context)99 mock_provider.assert_called_once()100 mock_provider.return_value.update.assert_called_once()101 resource_id = mock_provider.return_value.physical_resource_id.return_value102 version = mock_provider.return_value.update.return_value103 response_data = {'DocumentVersion': version}104 mock_cfnresponse.assert_called_once_with(event_update, context, cfnresponse.SUCCESS, response_data, resource_id)105def test_lambda_handler_delete(mock_provider, mock_cfnresponse, event_delete, context):106 lambda_handler(event_delete, context)107 mock_provider.assert_called_once()108 mock_provider.return_value.delete.assert_called_once()109 resource_id = mock_provider.return_value.physical_resource_id.return_value110 mock_cfnresponse.assert_called_once_with(event_delete, context, cfnresponse.SUCCESS, {}, resource_id)111def test_lambda_handler_invalid(mock_provider, mock_cfnresponse, event_invalid, context):112 lambda_handler(event_invalid, context)113 mock_cfnresponse.assert_called_once_with(event_invalid, context, cfnresponse.FAILED, {}, ANY)114def test_lambda_handler_exception(mock_provider_exception, mock_cfnresponse, event_create, context):115 lambda_handler(event_create, context)116 mock_cfnresponse.assert_called_once_with(event_create, context, cfnresponse.FAILED, {}, ANY, reason = ANY)117@pytest.fixture()118def ssm_stub(mocker):119 ssm_client = boto3.client('ssm')120 mocker.patch('updatableRunbookProvider.get_ssm_client', return_value = ssm_client)121 stub = Stubber(ssm_client)122 stub.activate()123 yield stub124 stub.deactivate()125@pytest.fixture()126def resource_properties():127 yield {128 'Content': 'runbook_content',129 'Name': 'runbook_name',130 'VersionName': 'version_name',131 'DocumentType': 'Automation',132 'DocumentFormat': 'YAML',133 'DisplayName': 'display_name',134 'TargetType': '/'135 }136def response_create_update(properties, version = '1'):137 return {138 'DocumentDescription': {139 'Sha1': 'string',140 'Hash': 'string',141 'HashType': 'Sha256',142 'Name': properties['Name'],143 'DisplayName': properties.get('DisplayName', ''),144 'VersionName': properties.get('VersionName', ''),145 'Owner': 'string',146 'CreatedDate': datetime.now(),147 'Status': 'Active',148 'StatusInformation': 'string',149 'DocumentVersion': version,150 'Description': 'string',151 'Parameters': [],152 'PlatformTypes': [153 'Linux',154 ],155 'DocumentType': properties['DocumentType'],156 'SchemaVersion': 'string',157 'LatestVersion': 'string',158 'DefaultVersion': 'string',159 'DocumentFormat': properties.get('DocumentFormat', 'YAML'),160 'TargetType': properties.get('TargetType', '/'),161 'Tags': [],162 'AttachmentsInformation': [],163 'Author': 'string',164 'ApprovedVersion': 'string',165 'PendingReviewVersion': 'string',166 'ReviewStatus': 'NOT_REVIEWED',167 }168 }169def response_update_version(properties, version):170 return {171 'Description': {172 'Name': properties['Name'],173 'DefaultVersion': version,174 'DefaultVersionName': 'string'175 }176 }177@pytest.fixture()178def valid_keys_create():179 yield {'Content', 'Name', 'DocumentType', 'Attachments', 'DisplayName', 'VersionName', 'DocumentFormat', 'TargetType'}180@pytest.fixture()181def valid_keys_update():182 yield {'Content', 'Name', 'Attachments', 'DisplayName', 'VersionName', 'DocumentFormat', 'TargetType'}183@pytest.fixture()184def valid_keys_update_version():185 yield {'Name'}186@pytest.fixture()187def valid_keys_delete():188 yield {'Name'}189def strip_properties(properties, valid_keys):190 result = {}191 for key in properties:192 if key in valid_keys:193 result[key] = properties[key]194 return result195def test_provider_create(ssm_stub, resource_properties, valid_keys_create):196 ssm_stub.add_response(197 'create_document',198 response_create_update(resource_properties),199 strip_properties(resource_properties, valid_keys_create))200 provider = UpdatableRunbook(resource_properties)201 result = provider.create()202 assert result == resource_properties['Name']203def test_provider_update(ssm_stub, resource_properties, valid_keys_update, valid_keys_update_version):204 update_properties = strip_properties(resource_properties, valid_keys_update)205 update_properties['DocumentVersion'] = '$LATEST'206 updated_version = '6'207 ssm_stub.add_response(208 'update_document',209 response_create_update(resource_properties, updated_version),210 update_properties)211 update_version_properties = strip_properties(resource_properties, valid_keys_update_version)212 update_version_properties['DocumentVersion'] = updated_version213 ssm_stub.add_response(214 'update_document_default_version',215 response_update_version(resource_properties, updated_version),216 update_version_properties)217 provider = UpdatableRunbook(resource_properties)218 result = provider.update()219 assert result == updated_version220def test_provider_delete(ssm_stub, resource_properties, valid_keys_delete):221 ssm_stub.add_response('delete_document', {}, strip_properties(resource_properties, valid_keys_delete))222 provider = UpdatableRunbook(resource_properties)223 provider.delete()224def test_provider_create_already_exists(ssm_stub, resource_properties, valid_keys_update, valid_keys_update_version):225 ssm_stub.add_client_error('create_document', 'DocumentAlreadyExists')226 update_properties = strip_properties(resource_properties, valid_keys_update)227 update_properties['DocumentVersion'] = '$LATEST'228 updated_version = '2'229 ssm_stub.add_response(230 'update_document',231 response_create_update(resource_properties, updated_version),232 update_properties)233 update_version_properties = strip_properties(resource_properties, valid_keys_update_version)234 update_version_properties['DocumentVersion'] = updated_version235 ssm_stub.add_response(236 'update_document_default_version',237 response_update_version(resource_properties, updated_version),238 update_version_properties)239 provider = UpdatableRunbook(resource_properties)240 result = provider.create()241 assert result == resource_properties['Name']242def test_provider_delete_nonexistent(ssm_stub, resource_properties):243 ssm_stub.add_client_error('delete_document', 'InvalidDocument')244 provider = UpdatableRunbook(resource_properties)245 provider.delete()246def test_provider_create_exception(ssm_stub, resource_properties):247 ssm_stub.add_client_error('create_document')248 provider = UpdatableRunbook(resource_properties)249 with pytest.raises(botocore.exceptions.ClientError):250 provider.create()251def test_provider_update_exception(ssm_stub, resource_properties):252 ssm_stub.add_client_error('update_document')253 provider = UpdatableRunbook(resource_properties)254 with pytest.raises(botocore.exceptions.ClientError):255 provider.update()256def test_provider_update_version_exception(ssm_stub, resource_properties, valid_keys_update):257 update_properties = strip_properties(resource_properties, valid_keys_update)258 update_properties['DocumentVersion'] = '$LATEST'259 updated_version = '14'260 ssm_stub.add_response(261 'update_document',262 response_create_update(resource_properties, updated_version),263 update_properties)264 ssm_stub.add_client_error('update_document_default_version')265 provider = UpdatableRunbook(resource_properties)266 with pytest.raises(botocore.exceptions.ClientError):267 provider.update()268def test_provider_delete_exception(ssm_stub, resource_properties):269 ssm_stub.add_client_error('delete_document')270 provider = UpdatableRunbook(resource_properties)271 with pytest.raises(botocore.exceptions.ClientError):272 provider.delete()273def test_provider_update_duplicate_content(ssm_stub, resource_properties):274 ssm_stub.add_client_error('update_document', 'DuplicateDocumentContent')275 provider = UpdatableRunbook(resource_properties)276 result = provider.update()277 assert result == '$LATEST'278def document_version(name, version_name = None):279 response = {280 'Name': name,281 'DisplayName': 'display_name',282 'DocumentVersion': 'document_version',283 'CreatedDate': datetime.now(),284 'IsDefaultVersion': False,285 'DocumentFormat': 'YAML',286 'Status': 'Active',287 'StatusInformation': 'string',288 'ReviewStatus': 'NOT_REVIEWED'289 }290 if version_name is not None:291 response['VersionName'] = version_name292 return response293def test_provider_update_duplicate_version_name(ssm_stub, resource_properties, valid_keys_update, valid_keys_update_version):294 name = resource_properties['Name']295 version_name = 'v1.5.0beta'296 resource_properties['VersionName'] = version_name297 ssm_stub.add_client_error('update_document', 'DuplicateDocumentVersionName')298 ssm_stub.add_response(299 'list_document_versions',300 {301 'DocumentVersions': [302 document_version(name, version_name)303 ],304 'NextToken': ''305 },306 {307 'Name': name308 }309 )310 expected_resource_properties = copy.deepcopy(resource_properties)311 expected_version_name = version_name + '_Rev_2'312 expected_resource_properties['VersionName'] = expected_version_name313 update_properties = strip_properties(expected_resource_properties, valid_keys_update)314 update_properties['DocumentVersion'] = '$LATEST'315 updated_version = '2'316 ssm_stub.add_response(317 'update_document',318 response_create_update(expected_resource_properties, updated_version),319 update_properties)320 update_version_properties = strip_properties(expected_resource_properties, valid_keys_update_version)321 update_version_properties['DocumentVersion'] = updated_version322 ssm_stub.add_response(323 'update_document_default_version',324 response_update_version(resource_properties, updated_version),325 update_version_properties)326 provider = UpdatableRunbook(resource_properties)327 result = provider.update()328 assert result == updated_version329def test_provider_update_duplicate_version_name_multiple(ssm_stub, resource_properties, valid_keys_update, valid_keys_update_version):330 name = resource_properties['Name']331 version_name = r'an arbitrary _\=.:/ version name'332 resource_properties['VersionName'] = version_name333 ssm_stub.add_client_error('update_document', 'DuplicateDocumentVersionName')334 token = 'a_token_of_some_sort'335 ssm_stub.add_response(336 'list_document_versions',337 {338 'DocumentVersions': [339 document_version(name, version_name),340 document_version(name, f'prefixed_{version_name}_Rev_10'),341 document_version(name, f'{version_name}_Rev_20.suffix'),342 document_version(name),343 document_version(name, f'{version_name}_Rev_5')344 ],345 'NextToken': token346 },347 {348 'Name': name349 }350 )351 ssm_stub.add_response(352 'list_document_versions',353 {354 'DocumentVersions': [355 document_version(name, f'{version_name}_something'),356 document_version(name, f'{version_name}_Rev_3')357 ]358 },359 {360 'Name': name,361 'NextToken': token362 }363 )364 expected_resource_properties = copy.deepcopy(resource_properties)365 expected_version_name = version_name + '_Rev_6'366 expected_resource_properties['VersionName'] = expected_version_name367 update_properties = strip_properties(expected_resource_properties, valid_keys_update)368 update_properties['DocumentVersion'] = '$LATEST'369 updated_version = '7'370 ssm_stub.add_response(371 'update_document',372 response_create_update(expected_resource_properties, updated_version),373 update_properties)374 update_version_properties = strip_properties(expected_resource_properties, valid_keys_update_version)375 update_version_properties['DocumentVersion'] = updated_version376 ssm_stub.add_response(377 'update_document_default_version',378 response_update_version(resource_properties, updated_version),379 update_version_properties)380 provider = UpdatableRunbook(resource_properties)381 result = provider.update()...
document.py
Source:document.py
...56 else:57 # document is already identical58 return59 document_version = response['DocumentDescription']['DocumentVersion']60 response = self.ssm_client.update_document_default_version(61 Name=ssm_doc.name,62 DocumentVersion=document_version,...
upload.py
Source:upload.py
...23 document_hash = document['Hash']24 if (content_hash != document_hash):25 try:26 response = ssm.update_document(Name=document_name, Content=content, DocumentVersion='$LATEST')['DocumentDescription']['DocumentVersion']27 ssm.update_document_default_version(Name=document_name, DocumentVersion=response)28 except ClientError as e:29 print(f'Err: {d} {e}')30 else:31 print(f'Inf: {d} updated.')32 else:33 print(f'Inf: {d} No Action.')34 else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!