Best Python code snippet using localstack_python
provider.py
Source:provider.py
...282 context: RequestContext,283 delivery_stream_name: DeliveryStreamName,284 records: PutRecordBatchRequestEntryList,285 ) -> PutRecordBatchOutput:286 records = self._reencode_records(records)287 return PutRecordBatchOutput(288 FailedPutCount=0, RequestResponses=self._put_records(delivery_stream_name, records)289 )290 def tag_delivery_stream(291 self,292 context: RequestContext,293 delivery_stream_name: DeliveryStreamName,294 tags: TagDeliveryStreamInputTagList,295 ) -> TagDeliveryStreamOutput:296 delivery_stream_description = _get_description_or_raise_not_found(297 context, delivery_stream_name298 )299 FirehoseBackend.TAGS.tag_resource(delivery_stream_description["DeliveryStreamARN"], tags)300 return ListTagsForDeliveryStreamOutput()301 def list_tags_for_delivery_stream(302 self,303 context: RequestContext,304 delivery_stream_name: DeliveryStreamName,305 exclusive_start_tag_key: TagKey = None,306 limit: ListTagsForDeliveryStreamInputLimit = None,307 ) -> ListTagsForDeliveryStreamOutput:308 delivery_stream_description = _get_description_or_raise_not_found(309 context, delivery_stream_name310 )311 # The tagging service returns a dictionary with the given root name312 tags = FirehoseBackend.TAGS.list_tags_for_resource(313 arn=delivery_stream_description["DeliveryStreamARN"], root_name="root"314 )315 # Extract the actual list of tags for the typed response316 tag_list: ListTagsForDeliveryStreamOutputTagList = tags["root"]317 return ListTagsForDeliveryStreamOutput(Tags=tag_list, HasMoreTags=False)318 def untag_delivery_stream(319 self,320 context: RequestContext,321 delivery_stream_name: DeliveryStreamName,322 tag_keys: TagKeyList,323 ) -> UntagDeliveryStreamOutput:324 delivery_stream_description = _get_description_or_raise_not_found(325 context, delivery_stream_name326 )327 # The tagging service returns a dictionary with the given root name328 FirehoseBackend.TAGS.untag_resource(329 arn=delivery_stream_description["DeliveryStreamARN"], tag_names=tag_keys330 )331 return UntagDeliveryStreamOutput()332 def update_destination(333 self,334 context: RequestContext,335 delivery_stream_name: DeliveryStreamName,336 current_delivery_stream_version_id: DeliveryStreamVersionId,337 destination_id: DestinationId,338 s3_destination_update: S3DestinationUpdate = None,339 extended_s3_destination_update: ExtendedS3DestinationUpdate = None,340 redshift_destination_update: RedshiftDestinationUpdate = None,341 elasticsearch_destination_update: ElasticsearchDestinationUpdate = None,342 amazonopensearchservice_destination_update: AmazonopensearchserviceDestinationUpdate = None,343 splunk_destination_update: SplunkDestinationUpdate = None,344 http_endpoint_destination_update: HttpEndpointDestinationUpdate = None,345 ) -> UpdateDestinationOutput:346 delivery_stream_description = _get_description_or_raise_not_found(347 context, delivery_stream_name348 )349 destinations = delivery_stream_description["Destinations"]350 try:351 destination = next(filter(lambda d: d["DestinationId"] == destination_id, destinations))352 except StopIteration:353 destination = DestinationDescription(DestinationId=destination_id)354 delivery_stream_description["Destinations"].append(destination)355 if elasticsearch_destination_update:356 destination["ElasticsearchDestinationDescription"] = convert_es_update_to_desc(357 elasticsearch_destination_update358 )359 if amazonopensearchservice_destination_update:360 destination[361 "AmazonopensearchserviceDestinationDescription"362 ] = convert_opensearch_update_to_desc(amazonopensearchservice_destination_update)363 if s3_destination_update:364 destination["S3DestinationDescription"] = convert_s3_update_to_desc(365 s3_destination_update366 )367 if extended_s3_destination_update:368 destination["ExtendedS3DestinationDescription"] = convert_extended_s3_update_to_desc(369 extended_s3_destination_update370 )371 if http_endpoint_destination_update:372 destination["HttpEndpointDestinationDescription"] = convert_http_update_to_desc(373 http_endpoint_destination_update374 )375 return UpdateDestinationOutput()376 def _reencode_record(self, record: Record) -> Record:377 """378 The ASF decodes the record's data automatically. But most of the service integrations (kinesis, lambda, http)379 are working with the base64 encoded data.380 """381 if "Data" in record:382 record["Data"] = base64.b64encode(record["Data"])383 return record384 def _reencode_records(self, records: List[Record]) -> List[Record]:385 return [self._reencode_record(r) for r in records]386 def _process_records(self, records: List[Record], shard_id: str, fh_d_stream: str):387 """Process the given records from the underlying Kinesis stream"""388 return self._put_records(fh_d_stream, records)389 def _put_record(self, delivery_stream_name: str, record: Record) -> PutRecordOutput:390 """Put a record to the firehose stream from a PutRecord API call"""391 result = self._put_records(delivery_stream_name, [record])392 return PutRecordOutput(RecordId=result[0]["RecordId"])393 def _put_records(394 self, delivery_stream_name: str, unprocessed_records: List[Record]395 ) -> List[PutRecordBatchResponseEntry]:396 """Put a list of records to the firehose stream - either directly from a PutRecord API call, or397 received from an underlying Kinesis stream (if 'KinesisStreamAsSource' is configured)"""398 region = FirehoseBackend.get()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!