Best Python code snippet using localstack_python
test_kinesis.py
Source:test_kinesis.py
...75 stream_name = "my_stream"76 conn.create_stream(stream_name, 1)77 response = conn.describe_stream(stream_name)78 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]79 response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")80 shard_iterator = response["ShardIterator"]81 response = conn.get_records(shard_iterator)82 shard_iterator = response["NextShardIterator"]83 response["Records"].should.equal([])84 response["MillisBehindLatest"].should.equal(0)85@mock_kinesis_deprecated86def test_get_invalid_shard_iterator():87 conn = boto.kinesis.connect_to_region("us-west-2")88 stream_name = "my_stream"89 conn.create_stream(stream_name, 1)90 conn.get_shard_iterator.when.called_with(91 stream_name, "123", "TRIM_HORIZON"92 ).should.throw(ResourceNotFoundException)93@mock_kinesis_deprecated94def test_put_records():95 conn = boto.kinesis.connect_to_region("us-west-2")96 stream_name = "my_stream"97 conn.create_stream(stream_name, 1)98 data = "hello world"99 partition_key = "1234"100 conn.put_record.when.called_with(stream_name, data, 1234).should.throw(101 InvalidArgumentException102 )103 conn.put_record(stream_name, data, partition_key)104 response = conn.describe_stream(stream_name)105 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]106 response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")107 shard_iterator = response["ShardIterator"]108 response = conn.get_records(shard_iterator)109 shard_iterator = response["NextShardIterator"]110 response["Records"].should.have.length_of(1)111 record = response["Records"][0]112 record["Data"].should.equal("hello world")113 record["PartitionKey"].should.equal("1234")114 record["SequenceNumber"].should.equal("1")115@mock_kinesis_deprecated116def test_get_records_limit():117 conn = boto.kinesis.connect_to_region("us-west-2")118 stream_name = "my_stream"119 conn.create_stream(stream_name, 1)120 # Create some data121 data = "hello world"122 for index in range(5):123 conn.put_record(stream_name, data, str(index))124 # Get a shard iterator125 response = conn.describe_stream(stream_name)126 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]127 response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")128 shard_iterator = response["ShardIterator"]129 # Retrieve only 3 records130 response = conn.get_records(shard_iterator, limit=3)131 response["Records"].should.have.length_of(3)132 # Then get the rest of the results133 next_shard_iterator = response["NextShardIterator"]134 response = conn.get_records(next_shard_iterator)135 response["Records"].should.have.length_of(2)136@mock_kinesis_deprecated137def test_get_records_at_sequence_number():138 # AT_SEQUENCE_NUMBER - Start reading exactly from the position denoted by139 # a specific sequence number.140 conn = boto.kinesis.connect_to_region("us-west-2")141 stream_name = "my_stream"142 conn.create_stream(stream_name, 1)143 # Create some data144 for index in range(1, 5):145 conn.put_record(stream_name, str(index), str(index))146 # Get a shard iterator147 response = conn.describe_stream(stream_name)148 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]149 response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")150 shard_iterator = response["ShardIterator"]151 # Get the second record152 response = conn.get_records(shard_iterator, limit=2)153 second_sequence_id = response["Records"][1]["SequenceNumber"]154 # Then get a new iterator starting at that id155 response = conn.get_shard_iterator(156 stream_name, shard_id, "AT_SEQUENCE_NUMBER", second_sequence_id157 )158 shard_iterator = response["ShardIterator"]159 response = conn.get_records(shard_iterator)160 # And the first result returned should be the second item161 response["Records"][0]["SequenceNumber"].should.equal(second_sequence_id)162 response["Records"][0]["Data"].should.equal("2")163@mock_kinesis_deprecated164def test_get_records_after_sequence_number():165 # AFTER_SEQUENCE_NUMBER - Start reading right after the position denoted166 # by a specific sequence number.167 conn = boto.kinesis.connect_to_region("us-west-2")168 stream_name = "my_stream"169 conn.create_stream(stream_name, 1)170 # Create some data171 for index in range(1, 5):172 conn.put_record(stream_name, str(index), str(index))173 # Get a shard iterator174 response = conn.describe_stream(stream_name)175 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]176 response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")177 shard_iterator = response["ShardIterator"]178 # Get the second record179 response = conn.get_records(shard_iterator, limit=2)180 second_sequence_id = response["Records"][1]["SequenceNumber"]181 # Then get a new iterator starting after that id182 response = conn.get_shard_iterator(183 stream_name, shard_id, "AFTER_SEQUENCE_NUMBER", second_sequence_id184 )185 shard_iterator = response["ShardIterator"]186 response = conn.get_records(shard_iterator)187 # And the first result returned should be the third item188 response["Records"][0]["Data"].should.equal("3")189 response["MillisBehindLatest"].should.equal(0)190@mock_kinesis_deprecated191def test_get_records_latest():192 # LATEST - Start reading just after the most recent record in the shard,193 # so that you always read the most recent data in the shard.194 conn = boto.kinesis.connect_to_region("us-west-2")195 stream_name = "my_stream"196 conn.create_stream(stream_name, 1)197 # Create some data198 for index in range(1, 5):199 conn.put_record(stream_name, str(index), str(index))200 # Get a shard iterator201 response = conn.describe_stream(stream_name)202 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]203 response = conn.get_shard_iterator(stream_name, shard_id, "TRIM_HORIZON")204 shard_iterator = response["ShardIterator"]205 # Get the second record206 response = conn.get_records(shard_iterator, limit=2)207 second_sequence_id = response["Records"][1]["SequenceNumber"]208 # Then get a new iterator starting after that id209 response = conn.get_shard_iterator(210 stream_name, shard_id, "LATEST", second_sequence_id211 )212 shard_iterator = response["ShardIterator"]213 # Write some more data214 conn.put_record(stream_name, "last_record", "last_record")215 response = conn.get_records(shard_iterator)216 # And the only result returned should be the new item217 response["Records"].should.have.length_of(1)218 response["Records"][0]["PartitionKey"].should.equal("last_record")219 response["Records"][0]["Data"].should.equal("last_record")220 response["MillisBehindLatest"].should.equal(0)221@mock_kinesis222def test_get_records_at_timestamp():223 # AT_TIMESTAMP - Read the first record at or after the specified timestamp224 conn = boto3.client("kinesis", region_name="us-west-2")225 stream_name = "my_stream"226 conn.create_stream(StreamName=stream_name, ShardCount=1)227 # Create some data228 for index in range(1, 5):229 conn.put_record(230 StreamName=stream_name, Data=str(index), PartitionKey=str(index)231 )232 # When boto3 floors the timestamp that we pass to get_shard_iterator to233 # second precision even though AWS supports ms precision:234 # http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html235 # To test around this limitation we wait until we well into the next second236 # before capturing the time and storing the records we expect to retrieve.237 time.sleep(1.0)238 timestamp = datetime.datetime.utcnow()239 keys = [str(i) for i in range(5, 10)]240 for k in keys:241 conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)242 # Get a shard iterator243 response = conn.describe_stream(StreamName=stream_name)244 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]245 response = conn.get_shard_iterator(246 StreamName=stream_name,247 ShardId=shard_id,248 ShardIteratorType="AT_TIMESTAMP",249 Timestamp=timestamp,250 )251 shard_iterator = response["ShardIterator"]252 response = conn.get_records(ShardIterator=shard_iterator)253 response["Records"].should.have.length_of(len(keys))254 partition_keys = [r["PartitionKey"] for r in response["Records"]]255 partition_keys.should.equal(keys)256 response["MillisBehindLatest"].should.equal(0)257@mock_kinesis258def test_get_records_at_very_old_timestamp():259 conn = boto3.client("kinesis", region_name="us-west-2")260 stream_name = "my_stream"261 conn.create_stream(StreamName=stream_name, ShardCount=1)262 # Create some data263 keys = [str(i) for i in range(1, 5)]264 for k in keys:265 conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)266 # Get a shard iterator267 response = conn.describe_stream(StreamName=stream_name)268 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]269 response = conn.get_shard_iterator(270 StreamName=stream_name,271 ShardId=shard_id,272 ShardIteratorType="AT_TIMESTAMP",273 Timestamp=1,274 )275 shard_iterator = response["ShardIterator"]276 response = conn.get_records(ShardIterator=shard_iterator)277 response["Records"].should.have.length_of(len(keys))278 partition_keys = [r["PartitionKey"] for r in response["Records"]]279 partition_keys.should.equal(keys)280 response["MillisBehindLatest"].should.equal(0)281@mock_kinesis282def test_get_records_timestamp_filtering():283 conn = boto3.client("kinesis", region_name="us-west-2")284 stream_name = "my_stream"285 conn.create_stream(StreamName=stream_name, ShardCount=1)286 conn.put_record(StreamName=stream_name, Data="0", PartitionKey="0")287 time.sleep(1.0)288 timestamp = datetime.datetime.now(tz=tzlocal())289 conn.put_record(StreamName=stream_name, Data="1", PartitionKey="1")290 response = conn.describe_stream(StreamName=stream_name)291 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]292 response = conn.get_shard_iterator(293 StreamName=stream_name,294 ShardId=shard_id,295 ShardIteratorType="AT_TIMESTAMP",296 Timestamp=timestamp,297 )298 shard_iterator = response["ShardIterator"]299 response = conn.get_records(ShardIterator=shard_iterator)300 response["Records"].should.have.length_of(1)301 response["Records"][0]["PartitionKey"].should.equal("1")302 response["Records"][0]["ApproximateArrivalTimestamp"].should.be.greater_than(303 timestamp304 )305 response["MillisBehindLatest"].should.equal(0)306@mock_kinesis307def test_get_records_millis_behind_latest():308 conn = boto3.client("kinesis", region_name="us-west-2")309 stream_name = "my_stream"310 conn.create_stream(StreamName=stream_name, ShardCount=1)311 conn.put_record(StreamName=stream_name, Data="0", PartitionKey="0")312 time.sleep(1.0)313 conn.put_record(StreamName=stream_name, Data="1", PartitionKey="1")314 response = conn.describe_stream(StreamName=stream_name)315 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]316 response = conn.get_shard_iterator(317 StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON"318 )319 shard_iterator = response["ShardIterator"]320 response = conn.get_records(ShardIterator=shard_iterator, Limit=1)321 response["Records"].should.have.length_of(1)322 response["MillisBehindLatest"].should.be.greater_than(0)323@mock_kinesis324def test_get_records_at_very_new_timestamp():325 conn = boto3.client("kinesis", region_name="us-west-2")326 stream_name = "my_stream"327 conn.create_stream(StreamName=stream_name, ShardCount=1)328 # Create some data329 keys = [str(i) for i in range(1, 5)]330 for k in keys:331 conn.put_record(StreamName=stream_name, Data=k, PartitionKey=k)332 timestamp = datetime.datetime.utcnow() + datetime.timedelta(seconds=1)333 # Get a shard iterator334 response = conn.describe_stream(StreamName=stream_name)335 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]336 response = conn.get_shard_iterator(337 StreamName=stream_name,338 ShardId=shard_id,339 ShardIteratorType="AT_TIMESTAMP",340 Timestamp=timestamp,341 )342 shard_iterator = response["ShardIterator"]343 response = conn.get_records(ShardIterator=shard_iterator)344 response["Records"].should.have.length_of(0)345 response["MillisBehindLatest"].should.equal(0)346@mock_kinesis347def test_get_records_from_empty_stream_at_timestamp():348 conn = boto3.client("kinesis", region_name="us-west-2")349 stream_name = "my_stream"350 conn.create_stream(StreamName=stream_name, ShardCount=1)351 timestamp = datetime.datetime.utcnow()352 # Get a shard iterator353 response = conn.describe_stream(StreamName=stream_name)354 shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]355 response = conn.get_shard_iterator(356 StreamName=stream_name,357 ShardId=shard_id,358 ShardIteratorType="AT_TIMESTAMP",359 Timestamp=timestamp,360 )361 shard_iterator = response["ShardIterator"]362 response = conn.get_records(ShardIterator=shard_iterator)363 response["Records"].should.have.length_of(0)364 response["MillisBehindLatest"].should.equal(0)365@mock_kinesis_deprecated366def test_invalid_shard_iterator_type():367 conn = boto.kinesis.connect_to_region("us-west-2")368 stream_name = "my_stream"369 conn.create_stream(stream_name, 1)...
test_runner.py
Source:test_runner.py
1import pytest2from datetime import datetime3from unittest.mock import Mock, patch, call4from lambda_kinesis.runner import (5 get_records,6 run_handler_on_stream_records,7 format_kinesis_timestamp,8 IteratorType,9)10def test_get_records_parses_response():11 kinesis_client = Mock()12 arrival_time = datetime(2015, 1, 1)13 kinesis_client.get_records.return_value = {14 "Records": [15 {"ApproximateArrivalTimestamp": arrival_time, "Data": b"data", "PartitionKey": "xxx"}16 ],17 "NextShardIterator": "456",18 }19 records, shard_iterator = get_records(20 kinesis_client, stream_name="stream", shard_iterator="123"21 )22 assert records == (23 {24 "kinesis": {25 "data": "ZGF0YQ==",26 "approximateArrivalTimestamp": arrival_time,27 "partitionKey": "xxx",28 },29 "eventSource": "aws:kinesis",30 "eventName": "aws:kinesis:record",31 },32 )33 assert shard_iterator == "456"34@patch("lambda_kinesis.runner.get_records")35def test_run_handler_polls_stream(get_records):36 kinesis_client = Mock()37 handler = Mock()38 kinesis_client.list_shards.return_value = {"Shards": [{"ShardId": "shard"}]}39 kinesis_client.get_shard_iterator.return_value = {"ShardIterator": "iterator"}40 class PleaseStop(Exception):41 pass42 iterator_type = IteratorType.TrimHorizon43 records = [str(i) for i in range(30)]44 record_batches = [records[i : i + 10] for i in range(0, 30, 10)] # NOQA45 shard_iterators = [str(i) for i in range(3)]46 get_records.side_effect = list(zip(record_batches, shard_iterators)) + [PleaseStop]47 with pytest.raises(PleaseStop):48 run_handler_on_stream_records(49 stream_name="stream",50 kinesis_client=kinesis_client,51 shard_iterator_type=iterator_type,52 handler=handler,53 wait_seconds=0,54 )55 kinesis_client.get_shard_iterator.assert_called_once_with(56 StreamName="stream", ShardId="shard", ShardIteratorType="TRIM_HORIZON"57 )58 assert get_records.call_args_list == [59 call(kinesis_client, "stream", "iterator"),60 call(kinesis_client, "stream", "0"),61 call(kinesis_client, "stream", "1"),62 call(kinesis_client, "stream", "2"),63 ]64 assert handler.call_args_list == [65 call({"Records": record_batch}, None) for record_batch in record_batches66 ]67@patch("lambda_kinesis.runner.get_records")68def test_run_handler_supports_timestamp_iterator(get_records):69 kinesis_client = Mock()70 handler = Mock()71 kinesis_client.list_shards.return_value = {"Shards": [{"ShardId": "shard"}]}72 kinesis_client.get_shard_iterator.return_value = {"ShardIterator": "iterator"}73 class PleaseStop(Exception):74 pass75 iterator_type = IteratorType.AtTimestamp76 get_records.side_effect = [PleaseStop]77 with pytest.raises(PleaseStop):78 run_handler_on_stream_records(79 stream_name="stream",80 kinesis_client=kinesis_client,81 shard_iterator_type=iterator_type,82 timestamp=datetime(2019, 2, 5),83 handler=handler,84 wait_seconds=0,85 )86 kinesis_client.get_shard_iterator.assert_called_once_with(87 StreamName="stream",88 ShardId="shard",89 ShardIteratorType="AT_TIMESTAMP",90 Timestamp="2019-02-05T00:00:00.000+00:00",91 )92def test_format_kinesis_timestamp_returns_millis_and_tz():93 assert format_kinesis_timestamp(datetime(2019, 2, 5)) == "2019-02-05T00:00:00.000+00:00"94 assert (95 format_kinesis_timestamp(datetime(2019, 2, 5, 10, microsecond=605020))96 == "2019-02-05T10:00:00.605+00:00"...
consumer.py
Source:consumer.py
...8timestamp = datetime(2016, 12, 15)9##############################################10client = boto3.client('kinesis')11pp = pprint.PrettyPrinter(indent=4)12def get_shard_iterator(shard_iterator):13 return client.get_records(14 ShardIterator=shard_iterator,15 Limit=12316)17shard_iterator = client.get_shard_iterator(18 StreamName = stream_name,19 ShardId = shard_id,20 ShardIteratorType = shard_iterator_type,21 Timestamp = timestamp22)23shard_iterator = shard_iterator['ShardIterator']24flag = True25while (flag):26 new_shard_iterator = get_shard_iterator(shard_iterator)27 if len(new_shard_iterator['NextShardIterator']) > 15 and new_shard_iterator['MillisBehindLatest'] > 0:28 if len(new_shard_iterator['Records']) is 0:29 shard_iterator = get_shard_iterator(shard_iterator)['NextShardIterator']30 print pp.pprint(new_shard_iterator)31 else: 32 print "::::: RECEIVED DATA FROM THE STREAM ::::::"33 print pp.pprint(new_shard_iterator['Records'])34 flag = False35 else:36 print "::::::::::::::::::::::::::: NO MORE SHARDS TO LOOK INTO :::::::::::::::::::::::::::::::::"37 print "::::: THE NEXT SHARD ITERATOR IS %s ::::::" % str(new_shard_iterator['NextShardIterator'])38 print "::::: THE NEXT MILLIS-BEHIND-LATEST IS %s ::::::" % str(new_shard_iterator['MillisBehindLatest'])...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!