How to use list_tags_for_delivery_stream method in localstack

Best Python code snippet using localstack_python

test_firehose_tags.py

Source:test_firehose_tags.py Github

copy

Full Screen

...8from moto.firehose.models import MAX_TAGS_PER_DELIVERY_STREAM9from tests.test_firehose.test_firehose import TEST_REGION10from tests.test_firehose.test_firehose import sample_s3_dest_config11@mock_firehose12def test_list_tags_for_delivery_stream():13 """Test invocations of list_tags_for_delivery_stream()."""14 client = boto3.client("firehose", region_name=TEST_REGION)15 stream_name = f"test_list_tags_{get_random_hex(6)}"16 number_of_tags = 5017 tags = [{"Key": f"{x}_k", "Value": f"{x}_v"} for x in range(1, number_of_tags + 1)]18 # Create a delivery stream to work with.19 client.create_delivery_stream(20 DeliveryStreamName=stream_name,21 S3DestinationConfiguration=sample_s3_dest_config(),22 Tags=tags,23 )24 # Verify limit works.25 result = client.list_tags_for_delivery_stream(26 DeliveryStreamName=stream_name, Limit=127 )28 assert len(result["Tags"]) == 129 assert result["Tags"] == [{"Key": "1_k", "Value": "1_v"}]30 assert result["HasMoreTags"] is True31 result = client.list_tags_for_delivery_stream(32 DeliveryStreamName=stream_name, Limit=number_of_tags33 )34 assert len(result["Tags"]) == number_of_tags35 assert result["HasMoreTags"] is False36 # Verify exclusive_start_tag_key returns truncated list.37 result = client.list_tags_for_delivery_stream(38 DeliveryStreamName=stream_name, ExclusiveStartTagKey="30_k"39 )40 assert len(result["Tags"]) == number_of_tags - 3041 expected_tags = [42 {"Key": f"{x}_k", "Value": f"{x}_v"} for x in range(31, number_of_tags + 1)43 ]44 assert result["Tags"] == expected_tags45 assert result["HasMoreTags"] is False46 result = client.list_tags_for_delivery_stream(47 DeliveryStreamName=stream_name, ExclusiveStartTagKey=f"{number_of_tags}_k"48 )49 assert len(result["Tags"]) == 050 assert result["HasMoreTags"] is False51 # boto3 ignores bad stream names for ExclusiveStartTagKey.52 result = client.list_tags_for_delivery_stream(53 DeliveryStreamName=stream_name, ExclusiveStartTagKey="foo"54 )55 assert len(result["Tags"]) == number_of_tags56 assert result["Tags"] == tags57 assert result["HasMoreTags"] is False58 # Verify no parameters returns entire list.59 client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name)60 assert len(result["Tags"]) == number_of_tags61 assert result["Tags"] == tags62 assert result["HasMoreTags"] is False63@mock_firehose64def test_tag_delivery_stream():65 """Test successful, failed invocations of tag_delivery_stream()."""66 client = boto3.client("firehose", region_name=TEST_REGION)67 # Create a delivery stream for testing purposes.68 stream_name = f"test_tags_{get_random_hex(6)}"69 client.create_delivery_stream(70 DeliveryStreamName=stream_name,71 ExtendedS3DestinationConfiguration=sample_s3_dest_config(),72 )73 # Unknown stream name.74 unknown_name = "foo"75 with pytest.raises(ClientError) as exc:76 client.tag_delivery_stream(77 DeliveryStreamName=unknown_name, Tags=[{"Key": "foo", "Value": "bar"}]78 )79 err = exc.value.response["Error"]80 assert err["Code"] == "ResourceNotFoundException"81 assert (82 f"Firehose {unknown_name} under account {ACCOUNT_ID} not found"83 in err["Message"]84 )85 # Too many tags.86 with pytest.raises(ClientError) as exc:87 client.tag_delivery_stream(88 DeliveryStreamName=stream_name,89 Tags=[{"Key": f"{x}", "Value": f"{x}"} for x in range(51)],90 )91 err = exc.value.response["Error"]92 assert err["Code"] == "ValidationException"93 assert (94 f"failed to satisify contstraint: Member must have length "95 f"less than or equal to {MAX_TAGS_PER_DELIVERY_STREAM}"96 ) in err["Message"]97 # Bad tags.98 with pytest.raises(ClientError) as exc:99 client.tag_delivery_stream(100 DeliveryStreamName=stream_name, Tags=[{"Key": "foo!", "Value": "bar"}]101 )102 err = exc.value.response["Error"]103 assert err["Code"] == "ValidationException"104 assert (105 "1 validation error detected: Value 'foo!' at 'tags.1.member.key' "106 "failed to satisfy constraint: Member must satisfy regular "107 "expression pattern"108 ) in err["Message"]109 # Successful addition of tags.110 added_tags = [111 {"Key": f"{x}", "Value": f"{x}"} for x in range(MAX_TAGS_PER_DELIVERY_STREAM)112 ]113 client.tag_delivery_stream(DeliveryStreamName=stream_name, Tags=added_tags)114 results = client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name)115 assert len(results["Tags"]) == MAX_TAGS_PER_DELIVERY_STREAM116 assert results["Tags"] == added_tags117@mock_firehose118def test_untag_delivery_stream():119 """Test successful, failed invocations of untag_delivery_stream()."""120 client = boto3.client("firehose", region_name=TEST_REGION)121 # Create a delivery stream for testing purposes.122 stream_name = f"test_untag_{get_random_hex(6)}"123 tag_list = [124 {"Key": "one", "Value": "1"},125 {"Key": "two", "Value": "2"},126 {"Key": "three", "Value": "3"},127 ]128 client.create_delivery_stream(129 DeliveryStreamName=stream_name,130 ExtendedS3DestinationConfiguration=sample_s3_dest_config(),131 Tags=tag_list,132 )133 # Untag all of the tags. Verify there are no more tags.134 tag_keys = [x["Key"] for x in tag_list]135 client.untag_delivery_stream(DeliveryStreamName=stream_name, TagKeys=tag_keys)136 results = client.list_tags_for_delivery_stream(DeliveryStreamName=stream_name)137 assert not results["Tags"]...

Full Screen

Full Screen

responses.py

Source:responses.py Github

copy

Full Screen

...46 self._get_param("DeliveryStreamType"),47 self._get_param("ExclusiveStartDeliveryStreamName"),48 )49 return json.dumps(stream_list)50 def list_tags_for_delivery_stream(self):51 """Prepare arguments and respond to ListTagsForDeliveryStream()."""52 result = self.firehose_backend.list_tags_for_delivery_stream(53 self._get_param("DeliveryStreamName"),54 self._get_param("ExclusiveStartTagKey"),55 self._get_param("Limit"),56 )57 return json.dumps(result)58 def put_record(self):59 """Prepare arguments and response to PutRecord()."""60 result = self.firehose_backend.put_record(61 self._get_param("DeliveryStreamName"), self._get_param("Record")62 )63 return json.dumps(result)64 def put_record_batch(self):65 """Prepare arguments and response to PutRecordBatch()."""66 result = self.firehose_backend.put_record_batch(...

Full Screen

Full Screen

firehose.py

Source:firehose.py Github

copy

Full Screen

1# Licensed under the Apache License, Version 2.0 (the "License"). You2# may not use this file except in compliance with the License. A copy of3# the License is located at4#5# http://aws.amazon.com/apache2.0/6#7# or in the "license" file accompanying this file. This file is8# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF9# ANY KIND, either express or implied. See the License for the specific10# language governing permissions and limitations under the License.11from skew.resources.aws import AWSResource12import jmespath13class DeliveryStream(AWSResource):14 class Meta(object):15 service = 'firehose'16 type = 'deliverystream'17 enum_spec = ('list_delivery_streams', 'DeliveryStreamNames', None)18 detail_spec = ('describe_delivery_stream', 'DeliveryStreamName', 'DeliveryStreamDescription')19 id = 'DeliveryStreamName'20 filter_name = None21 filter_type = None22 name = 'DeliveryStreamName'23 date = 'CreateTimestamp'24 dimension = 'DeliveryStreamName'25 tags_spec = ('list_tags_for_delivery_stream', 'Tags[]', 'DeliveryStreamName', 'id')26 def __init__(self, client, data, query=None):27 super(DeliveryStream, self).__init__(client, data, query)28 self._id = data29 detail_op, param_name, detail_path = self.Meta.detail_spec30 params = {param_name: self.id}31 data = client.call(detail_op, **params)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful