How to use check_stream_state method in localstack

Best Python code snippet using localstack_python

streams.py

Source:streams.py Github

copy

Full Screen

...292 start_time = dict(parse_qsl(urlparse(response.json().get(self.next_page_field), "").query)).get("start_time")293 if start_time != self.prev_start_time:294 self.prev_start_time = start_time295 return {self.cursor_field: int(start_time)}296 def check_stream_state(self, stream_state: Mapping[str, Any] = None):297 """298 Returns the state value, if exists. Otherwise, returns user defined `Start Date`.299 """300 state = stream_state.get(self.cursor_field) or self._start_date if stream_state else self._start_date301 return calendar.timegm(pendulum.parse(state).utctimetuple())302 def request_params(303 self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs304 ) -> MutableMapping[str, Any]:305 next_page_token = next_page_token or {}306 parsed_state = self.check_stream_state(stream_state)307 if self.cursor_field:308 params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)}309 else:310 params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())}311 return params312class SourceZendeskIncrementalExportStream(SourceZendeskSupportCursorPaginationStream):313 """Incremental Export from Tickets stream:314 https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based315 @ param response_list_name: the main nested entity to look at inside of response, default = response_list_name316 @ param sideload_param : parameter variable to include various information to response317 more info: https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints318 """319 response_list_name: str = None320 sideload_param: str = None321 @staticmethod322 def check_start_time_param(requested_start_time: int, value: int = 1):323 """324 Requesting tickets in the future is not allowed, hits 400 - bad request.325 We get current UNIX timestamp minus `value` from now(), default = 1 (minute).326 Returns: either close to now UNIX timestamp or previously requested UNIX timestamp.327 """328 now = calendar.timegm(pendulum.now().subtract(minutes=value).utctimetuple())329 return now if requested_start_time > now else requested_start_time330 def path(self, **kwargs) -> str:331 return f"incremental/{self.response_list_name}.json"332 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:333 """334 Returns next_page_token based on `end_of_stream` parameter inside of response335 """336 next_page_token = super().next_page_token(response)337 return None if response.json().get(END_OF_STREAM_KEY, False) else next_page_token338 def request_params(339 self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs340 ) -> MutableMapping[str, Any]:341 params = super().request_params(stream_state, next_page_token, **kwargs)342 # check "start_time" is not in the future343 params["start_time"] = self.check_start_time_param(params["start_time"])344 if self.sideload_param:345 params["include"] = self.sideload_param346 return params347 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:348 for record in response.json().get(self.response_list_name, []):349 yield record350class SourceZendeskSupportTicketEventsExportStream(SourceZendeskIncrementalExportStream):351 """Incremental Export from TicketEvents stream:352 https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-event-export353 @ param response_list_name: the main nested entity to look at inside of response, default = "ticket_events"354 @ param response_target_entity: nested property inside of `response_list_name`, default = "child_events"355 @ param list_entities_from_event : the list of nested child_events entities to include from parent record356 @ param event_type : specific event_type to check ["Audit", "Change", "Comment", etc]357 """358 cursor_field = "created_at"359 response_list_name: str = "ticket_events"360 response_target_entity: str = "child_events"361 list_entities_from_event: List[str] = None362 event_type: str = None363 @property364 def update_event_from_record(self) -> bool:365 """Returns True/False based on list_entities_from_event property"""366 return True if len(self.list_entities_from_event) > 0 else False367 def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:368 for record in super().parse_response(response, **kwargs):369 for event in record.get(self.response_target_entity, []):370 if event.get("event_type") == self.event_type:371 if self.update_event_from_record:372 for prop in self.list_entities_from_event:373 event[prop] = record.get(prop)374 yield event375class Users(SourceZendeskIncrementalExportStream):376 """Users stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-user-export"""377 response_list_name: str = "users"378class Organizations(SourceZendeskSupportStream):379 """Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/"""380class Tickets(SourceZendeskIncrementalExportStream):381 """Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based"""382 response_list_name: str = "tickets"383 transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)384class TicketComments(SourceZendeskSupportTicketEventsExportStream):385 """386 Fetch the TicketComments incrementaly from TicketEvents Export stream387 """388 list_entities_from_event = ["via_reference_id", "ticket_id", "timestamp"]389 sideload_param = "comment_events"390 event_type = "Comment"391class Groups(SourceZendeskSupportStream):392 """Groups stream: https://developer.zendesk.com/api-reference/ticketing/groups/groups/"""393class GroupMemberships(SourceZendeskSupportCursorPaginationStream):394 """GroupMemberships stream: https://developer.zendesk.com/api-reference/ticketing/groups/group_memberships/"""395 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:396 next_page = self._parse_next_page_number(response)397 return next_page if next_page else None398 def request_params(399 self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs400 ) -> MutableMapping[str, Any]:401 params = {"page": 1, "per_page": self.page_size, "sort_by": "asc"}402 start_time = self.str2unixtime((stream_state or {}).get(self.cursor_field))403 params["start_time"] = start_time if start_time else self.str2unixtime(self._start_date)404 if next_page_token:405 params["page"] = next_page_token406 return params407class SatisfactionRatings(SourceZendeskSupportCursorPaginationStream):408 """409 SatisfactionRatings stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/satisfaction_ratings/410 """411 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:412 next_page = self._parse_next_page_number(response)413 return next_page if next_page else None414 def request_params(415 self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs416 ) -> MutableMapping[str, Any]:417 params = {"page": 1, "per_page": self.page_size, "sort_by": "asc"}418 start_time = self.str2unixtime((stream_state or {}).get(self.cursor_field))419 params["start_time"] = start_time if start_time else self.str2unixtime(self._start_date)420 if next_page_token:421 params["page"] = next_page_token422 return params423class TicketFields(SourceZendeskSupportStream):424 """TicketFields stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_fields/"""425class TicketForms(SourceZendeskSupportCursorPaginationStream):426 """TicketForms stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_forms/"""427class TicketMetrics(SourceZendeskSupportCursorPaginationStream):428 """TicketMetric stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metrics/"""429 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:430 next_page = self._parse_next_page_number(response)431 return next_page if next_page else None432 def request_params(433 self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs434 ) -> MutableMapping[str, Any]:435 params = {436 "start_time": self.check_stream_state(stream_state),437 "page": 1,438 "per_page": self.page_size,439 }440 if next_page_token:441 params["page"] = next_page_token442 return params443class TicketMetricEvents(SourceZendeskSupportCursorPaginationStream):444 """445 TicketMetricEvents stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metric_events/446 """447 cursor_field = "time"448 def path(self, **kwargs):449 return "incremental/ticket_metric_events"450class Macros(SourceZendeskSupportStream):...

Full Screen

Full Screen

test_firehose.py

Source:test_firehose.py Github

copy

Full Screen

...177 KinesisStreamSourceConfiguration=kinesis_stream_source_def,178 ElasticsearchDestinationConfiguration=elasticsearch_destination_configuration,179 )180 # wait for delivery stream to be ready181 def check_stream_state():182 stream = firehose_client.describe_delivery_stream(183 DeliveryStreamName=delivery_stream_name184 )185 return stream["DeliveryStreamDescription"]["DeliveryStreamStatus"] == "ACTIVE"186 assert poll_condition(check_stream_state, 45, 1)187 # wait for ES cluster to be ready188 def check_domain_state():189 result = es_client.describe_elasticsearch_domain(DomainName=domain_name)190 return not result["DomainStatus"]["Processing"]191 assert poll_condition(check_domain_state, 30, 1)192 # put kinesis stream record193 kinesis_record = {"target": "hello"}194 kinesis_client.put_record(195 StreamName=stream_name, Data=to_bytes(json.dumps(kinesis_record)), PartitionKey="1"196 )197 firehose_record = {"target": "world"}198 firehose_client.put_record(199 DeliveryStreamName=delivery_stream_name,200 Record={"Data": to_bytes(json.dumps(firehose_record))},201 )202 def assert_elasticsearch_contents():203 response = requests.get(f"{es_url}/activity/_search")204 response_bod = response.json()205 assert "hits" in response_bod206 response_bod_hits = response_bod["hits"]207 assert "hits" in response_bod_hits208 result = response_bod_hits["hits"]209 assert len(result) == 2210 sources = [item["_source"] for item in result]211 assert firehose_record in sources212 assert kinesis_record in sources213 retry(assert_elasticsearch_contents)214 def assert_s3_contents():215 result = s3_client.list_objects(Bucket=s3_bucket)216 contents = []217 for o in result.get("Contents"):218 data = s3_client.get_object(Bucket=s3_bucket, Key=o.get("Key"))219 content = data["Body"].read()220 contents.append(content)221 assert len(contents) == 2222 assert to_bytes(json.dumps(firehose_record)) in contents223 assert to_bytes(json.dumps(kinesis_record)) in contents224 retry(assert_s3_contents)225 finally:226 firehose_client.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)227 es_client.delete_elasticsearch_domain(DomainName=domain_name)228 @pytest.mark.skip_offline229 @pytest.mark.parametrize("opensearch_endpoint_strategy", ["domain", "path"])230 def test_kinesis_firehose_opensearch_s3_backup(231 self,232 firehose_client,233 kinesis_client,234 opensearch_client,235 s3_client,236 s3_bucket,237 kinesis_create_stream,238 monkeypatch,239 opensearch_endpoint_strategy,240 ):241 domain_name = f"test-domain-{short_uid()}"242 stream_name = f"test-stream-{short_uid()}"243 role_arn = "arn:aws:iam::000000000000:role/Firehose-Role"244 delivery_stream_name = f"test-delivery-stream-{short_uid()}"245 monkeypatch.setattr(config, "OPENSEARCH_ENDPOINT_STRATEGY", opensearch_endpoint_strategy)246 try:247 opensearch_create_response = opensearch_client.create_domain(DomainName=domain_name)248 opensearch_url = f"http://{opensearch_create_response['DomainStatus']['Endpoint']}"249 opensearch_arn = opensearch_create_response["DomainStatus"]["ARN"]250 # create s3 backup bucket arn251 bucket_arn = aws_stack.s3_bucket_arn(s3_bucket)252 # create kinesis stream253 kinesis_create_stream(StreamName=stream_name, ShardCount=2)254 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)[255 "StreamDescription"256 ]["StreamARN"]257 kinesis_stream_source_def = {258 "KinesisStreamARN": stream_arn,259 "RoleARN": role_arn,260 }261 opensearch_destination_configuration = {262 "RoleARN": role_arn,263 "DomainARN": opensearch_arn,264 "IndexName": "activity",265 "TypeName": "activity",266 "S3BackupMode": "AllDocuments",267 "S3Configuration": {268 "RoleARN": role_arn,269 "BucketARN": bucket_arn,270 },271 }272 firehose_client.create_delivery_stream(273 DeliveryStreamName=delivery_stream_name,274 DeliveryStreamType="KinesisStreamAsSource",275 KinesisStreamSourceConfiguration=kinesis_stream_source_def,276 AmazonopensearchserviceDestinationConfiguration=opensearch_destination_configuration,277 )278 # wait for delivery stream to be ready279 def check_stream_state():280 stream = firehose_client.describe_delivery_stream(281 DeliveryStreamName=delivery_stream_name282 )283 return stream["DeliveryStreamDescription"]["DeliveryStreamStatus"] == "ACTIVE"284 assert poll_condition(check_stream_state, 30, 1)285 # wait for opensearch cluster to be ready286 def check_domain_state():287 result = opensearch_client.describe_domain(DomainName=domain_name)["DomainStatus"][288 "Processing"289 ]290 return not result291 assert poll_condition(check_domain_state, 30, 1)292 # put kinesis stream record293 kinesis_record = {"target": "hello"}294 kinesis_client.put_record(295 StreamName=stream_name, Data=to_bytes(json.dumps(kinesis_record)), PartitionKey="1"296 )297 firehose_record = {"target": "world"}298 firehose_client.put_record(299 DeliveryStreamName=delivery_stream_name,300 Record={"Data": to_bytes(json.dumps(firehose_record))},301 )302 def assert_opensearch_contents():303 response = requests.get(f"{opensearch_url}/activity/_search")304 response_bod = response.json()305 assert "hits" in response_bod306 response_bod_hits = response_bod["hits"]307 assert "hits" in response_bod_hits308 result = response_bod_hits["hits"]309 assert len(result) == 2310 sources = [item["_source"] for item in result]311 assert firehose_record in sources312 assert kinesis_record in sources313 retry(assert_opensearch_contents)314 def assert_s3_contents():315 result = s3_client.list_objects(Bucket=s3_bucket)316 contents = []317 for o in result.get("Contents"):318 data = s3_client.get_object(Bucket=s3_bucket, Key=o.get("Key"))319 content = data["Body"].read()320 contents.append(content)321 assert len(contents) == 2322 assert to_bytes(json.dumps(firehose_record)) in contents323 assert to_bytes(json.dumps(kinesis_record)) in contents324 retry(assert_s3_contents)325 finally:326 firehose_client.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)327 opensearch_client.delete_domain(DomainName=domain_name)328 def test_delivery_stream_with_kinesis_as_source(329 self,330 firehose_client,331 kinesis_client,332 s3_client,333 s3_bucket,334 kinesis_create_stream,335 cleanups,336 ):337 bucket_arn = aws_stack.s3_bucket_arn(s3_bucket)338 stream_name = f"test-stream-{short_uid()}"339 log_group_name = f"group{short_uid()}"340 role_arn = "arn:aws:iam::000000000000:role/Firehose-Role"341 delivery_stream_name = f"test-delivery-stream-{short_uid()}"342 kinesis_create_stream(StreamName=stream_name, ShardCount=2)343 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][344 "StreamARN"345 ]346 response = firehose_client.create_delivery_stream(347 DeliveryStreamName=delivery_stream_name,348 DeliveryStreamType="KinesisStreamAsSource",349 KinesisStreamSourceConfiguration={350 "KinesisStreamARN": stream_arn,351 "RoleARN": role_arn,352 },353 ExtendedS3DestinationConfiguration={354 "BucketARN": bucket_arn,355 "RoleARN": role_arn,356 "BufferingHints": {"IntervalInSeconds": 60, "SizeInMBs": 64},357 "DynamicPartitioningConfiguration": {"Enabled": True},358 "ProcessingConfiguration": {359 "Enabled": True,360 "Processors": [361 {362 "Type": "MetadataExtraction",363 "Parameters": [364 {365 "ParameterName": "MetadataExtractionQuery",366 "ParameterValue": "{s3Prefix: .tableName}",367 },368 {"ParameterName": "JsonParsingEngine", "ParameterValue": "JQ-1.6"},369 ],370 },371 ],372 },373 "DataFormatConversionConfiguration": {"Enabled": True},374 "CompressionFormat": "GZIP",375 "Prefix": "firehoseTest/!{partitionKeyFromQuery:s3Prefix}/!{partitionKeyFromLambda:companyId}/!{partitionKeyFromLambda:year}/!{partitionKeyFromLambda:month}/",376 "ErrorOutputPrefix": "firehoseTest-errors/!{firehose:error-output-type}/",377 "CloudWatchLoggingOptions": {378 "Enabled": True,379 "LogGroupName": log_group_name,380 },381 },382 )383 cleanups.append(384 lambda: firehose_client.delete_delivery_stream(DeliveryStreamName=delivery_stream_name)385 )386 assert response["ResponseMetadata"]["HTTPStatusCode"] == 200387 # make sure the stream will come up at some point, for cleaner cleanup388 def check_stream_state():389 stream = firehose_client.describe_delivery_stream(390 DeliveryStreamName=delivery_stream_name391 )392 return stream["DeliveryStreamDescription"]["DeliveryStreamStatus"] == "ACTIVE"...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful