Best Python code snippet using localstack_python
test_lambda_integration.py
Source:test_lambda_integration.py
...51 role=lambda_su_role,52 )53 queue_url_1 = sqs_create_queue(QueueName=queue_name_1)54 queue_arn_1 = sqs_queue_arn(queue_url_1)55 rs = lambda_client.create_event_source_mapping(56 EventSourceArn=queue_arn_1, FunctionName=function_name57 )58 assert BATCH_SIZE_RANGES["sqs"][0] == rs["BatchSize"]59 uuid = rs["UUID"]60 def wait_for_event_source_mapping():61 return lambda_client.get_event_source_mapping(UUID=uuid)["State"] == "Enabled"62 assert poll_condition(wait_for_event_source_mapping, timeout=30)63 with pytest.raises(ClientError) as e:64 # Update batch size with invalid value65 lambda_client.update_event_source_mapping(66 UUID=uuid,67 FunctionName=function_name,68 BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,69 )70 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)71 queue_url_2 = sqs_create_queue(QueueName=queue_name_2)72 queue_arn_2 = sqs_queue_arn(queue_url_2)73 with pytest.raises(ClientError) as e:74 # Create event source mapping with invalid batch size value75 lambda_client.create_event_source_mapping(76 EventSourceArn=queue_arn_2,77 FunctionName=function_name,78 BatchSize=BATCH_SIZE_RANGES["sqs"][1] + 1,79 )80 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)81 table_description = dynamodb_create_table(82 table_name=ddb_table,83 partition_key="id",84 stream_view_type="NEW_IMAGE",85 )["TableDescription"]86 # table ARNs are not sufficient as event source, needs to be a dynamodb stream arn87 if not is_old_provider():88 with pytest.raises(ClientError) as e:89 lambda_client.create_event_source_mapping(90 EventSourceArn=table_description["TableArn"],91 FunctionName=function_name,92 StartingPosition="LATEST",93 )94 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)95 # check if event source mapping can be created with latest stream ARN96 rs = lambda_client.create_event_source_mapping(97 EventSourceArn=table_description["LatestStreamArn"],98 FunctionName=function_name,99 StartingPosition="LATEST",100 )101 assert BATCH_SIZE_RANGES["dynamodb"][0] == rs["BatchSize"]102 def test_disabled_event_source_mapping_with_dynamodb(103 self,104 create_lambda_function,105 lambda_client,106 dynamodb_resource,107 dynamodb_client,108 dynamodb_create_table,109 logs_client,110 dynamodbstreams_client,111 lambda_su_role,112 ):113 function_name = f"lambda_func-{short_uid()}"114 ddb_table = f"ddb_table-{short_uid()}"115 create_lambda_function(116 func_name=function_name,117 handler_file=TEST_LAMBDA_PYTHON_ECHO,118 runtime=LAMBDA_RUNTIME_PYTHON36,119 role=lambda_su_role,120 )121 latest_stream_arn = dynamodb_create_table(122 table_name=ddb_table, partition_key="id", stream_view_type="NEW_IMAGE"123 )["TableDescription"]["LatestStreamArn"]124 rs = lambda_client.create_event_source_mapping(125 FunctionName=function_name,126 EventSourceArn=latest_stream_arn,127 StartingPosition="TRIM_HORIZON",128 MaximumBatchingWindowInSeconds=1,129 )130 uuid = rs["UUID"]131 def wait_for_table_created():132 return (133 dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]134 == "ACTIVE"135 )136 assert poll_condition(wait_for_table_created, timeout=30)137 def wait_for_stream_created():138 return (139 dynamodbstreams_client.describe_stream(StreamArn=latest_stream_arn)[140 "StreamDescription"141 ]["StreamStatus"]142 == "ENABLED"143 )144 assert poll_condition(wait_for_stream_created, timeout=30)145 table = dynamodb_resource.Table(ddb_table)146 items = [147 {"id": short_uid(), "data": "data1"},148 {"id": short_uid(), "data": "data2"},149 ]150 table.put_item(Item=items[0])151 def assert_events():152 events = get_lambda_log_events(function_name, logs_client=logs_client)153 # lambda was invoked 1 time154 assert 1 == len(events[0]["Records"])155 # might take some time against AWS156 retry(assert_events, sleep=3, retries=10)157 # disable event source mapping158 lambda_client.update_event_source_mapping(UUID=uuid, Enabled=False)159 table.put_item(Item=items[1])160 events = get_lambda_log_events(function_name, logs_client=logs_client)161 # lambda no longer invoked, still have 1 event162 assert 1 == len(events[0]["Records"])163 # TODO invalid test against AWS, this behavior just is not correct164 def test_deletion_event_source_mapping_with_dynamodb(165 self, create_lambda_function, lambda_client, dynamodb_client, lambda_su_role166 ):167 function_name = f"lambda_func-{short_uid()}"168 ddb_table = f"ddb_table-{short_uid()}"169 create_lambda_function(170 func_name=function_name,171 handler_file=TEST_LAMBDA_PYTHON_ECHO,172 runtime=LAMBDA_RUNTIME_PYTHON36,173 role=lambda_su_role,174 )175 latest_stream_arn = aws_stack.create_dynamodb_table(176 table_name=ddb_table,177 partition_key="id",178 client=dynamodb_client,179 stream_view_type="NEW_IMAGE",180 )["TableDescription"]["LatestStreamArn"]181 lambda_client.create_event_source_mapping(182 FunctionName=function_name,183 EventSourceArn=latest_stream_arn,184 StartingPosition="TRIM_HORIZON",185 )186 def wait_for_table_created():187 return (188 dynamodb_client.describe_table(TableName=ddb_table)["Table"]["TableStatus"]189 == "ACTIVE"190 )191 assert poll_condition(wait_for_table_created, timeout=30)192 dynamodb_client.delete_table(TableName=ddb_table)193 result = lambda_client.list_event_source_mappings(EventSourceArn=latest_stream_arn)194 assert 1 == len(result["EventSourceMappings"])195 def test_event_source_mapping_with_sqs(196 self,197 create_lambda_function,198 lambda_client,199 sqs_client,200 sqs_create_queue,201 sqs_queue_arn,202 logs_client,203 lambda_su_role,204 ):205 function_name = f"lambda_func-{short_uid()}"206 queue_name_1 = f"queue-{short_uid()}-1"207 create_lambda_function(208 func_name=function_name,209 handler_file=TEST_LAMBDA_PYTHON_ECHO,210 runtime=LAMBDA_RUNTIME_PYTHON36,211 role=lambda_su_role,212 )213 queue_url_1 = sqs_create_queue(QueueName=queue_name_1)214 queue_arn_1 = sqs_queue_arn(queue_url_1)215 lambda_client.create_event_source_mapping(216 EventSourceArn=queue_arn_1, FunctionName=function_name, MaximumBatchingWindowInSeconds=1217 )218 sqs_client.send_message(QueueUrl=queue_url_1, MessageBody=json.dumps({"foo": "bar"}))219 def assert_lambda_log_events():220 events = get_lambda_log_events(function_name=function_name, logs_client=logs_client)221 # lambda was invoked 1 time222 assert 1 == len(events[0]["Records"])223 retry(assert_lambda_log_events, sleep_before=3, retries=30)224 rs = sqs_client.receive_message(QueueUrl=queue_url_1)225 assert rs.get("Messages") is None226 def test_create_kinesis_event_source_mapping(227 self,228 create_lambda_function,229 lambda_client,230 kinesis_client,231 kinesis_create_stream,232 lambda_su_role,233 wait_for_stream_ready,234 logs_client,235 ):236 function_name = f"lambda_func-{short_uid()}"237 stream_name = f"test-foobar-{short_uid()}"238 create_lambda_function(239 func_name=function_name,240 handler_file=TEST_LAMBDA_PYTHON_ECHO,241 runtime=LAMBDA_RUNTIME_PYTHON36,242 role=lambda_su_role,243 )244 kinesis_create_stream(StreamName=stream_name, ShardCount=1)245 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][246 "StreamARN"247 ]248 # only valid against AWS / new provider (once implemented)249 if not is_old_provider():250 with pytest.raises(ClientError) as e:251 lambda_client.create_event_source_mapping(252 EventSourceArn=stream_arn, FunctionName=function_name253 )254 e.match(INVALID_PARAMETER_VALUE_EXCEPTION)255 wait_for_stream_ready(stream_name=stream_name)256 lambda_client.create_event_source_mapping(257 EventSourceArn=stream_arn, FunctionName=function_name, StartingPosition="TRIM_HORIZON"258 )259 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)260 assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]261 num_events_kinesis = 10262 kinesis_client.put_records(263 Records=[264 {"Data": "{}", "PartitionKey": f"test_{i}"} for i in range(0, num_events_kinesis)265 ],266 StreamName=stream_name,267 )268 def get_lambda_events():269 events = get_lambda_log_events(function_name, logs_client=logs_client)270 assert events271 return events272 events = retry(get_lambda_events, retries=30)273 assert 10 == len(events[0]["Records"])274 assert "eventID" in events[0]["Records"][0]275 assert "eventSourceARN" in events[0]["Records"][0]276 assert "eventSource" in events[0]["Records"][0]277 assert "eventVersion" in events[0]["Records"][0]278 assert "eventName" in events[0]["Records"][0]279 assert "invokeIdentityArn" in events[0]["Records"][0]280 assert "awsRegion" in events[0]["Records"][0]281 assert "kinesis" in events[0]["Records"][0]282 def test_python_lambda_subscribe_sns_topic(283 self,284 create_lambda_function,285 sns_client,286 lambda_su_role,287 sns_topic,288 logs_client,289 lambda_client,290 ):291 function_name = f"{TEST_LAMBDA_FUNCTION_PREFIX}-{short_uid()}"292 permission_id = f"test-statement-{short_uid()}"293 lambda_creation_response = create_lambda_function(294 func_name=function_name,295 handler_file=TEST_LAMBDA_PYTHON_ECHO,296 runtime=LAMBDA_RUNTIME_PYTHON36,297 role=lambda_su_role,298 )299 lambda_arn = lambda_creation_response["CreateFunctionResponse"]["FunctionArn"]300 topic_arn = sns_topic["Attributes"]["TopicArn"]301 lambda_client.add_permission(302 FunctionName=function_name,303 StatementId=permission_id,304 Action="lambda:InvokeFunction",305 Principal="sns.amazonaws.com",306 SourceArn=topic_arn,307 )308 sns_client.subscribe(309 TopicArn=topic_arn,310 Protocol="lambda",311 Endpoint=lambda_arn,312 )313 subject = "[Subject] Test subject"314 message = "Hello world."315 sns_client.publish(TopicArn=topic_arn, Subject=subject, Message=message)316 events = retry(317 check_expected_lambda_log_events_length,318 retries=10,319 sleep=1,320 function_name=function_name,321 expected_length=1,322 regex_filter="Records.*Sns",323 logs_client=logs_client,324 )325 notification = events[0]["Records"][0]["Sns"]326 assert "Subject" in notification327 assert subject == notification["Subject"]328class TestLambdaHttpInvocation:329 def test_http_invocation_with_apigw_proxy(self, create_lambda_function):330 lambda_name = f"test_lambda_{short_uid()}"331 lambda_resource = "/api/v1/{proxy+}"332 lambda_path = "/api/v1/hello/world"333 lambda_request_context_path = "/" + TEST_STAGE_NAME + lambda_path334 lambda_request_context_resource_path = lambda_resource335 # create lambda function336 create_lambda_function(337 func_name=lambda_name,338 handler_file=TEST_LAMBDA_PYTHON,339 libs=TEST_LAMBDA_LIBS,340 )341 # create API Gateway and connect it to the Lambda proxy backend342 lambda_uri = aws_stack.lambda_function_arn(lambda_name)343 target_uri = f"arn:aws:apigateway:{aws_stack.get_region()}:lambda:path/2015-03-31/functions/{lambda_uri}/invocations"344 result = testutil.connect_api_gateway_to_http_with_lambda_proxy(345 "test_gateway2",346 target_uri,347 path=lambda_resource,348 stage_name=TEST_STAGE_NAME,349 )350 api_id = result["id"]351 url = path_based_url(api_id=api_id, stage_name=TEST_STAGE_NAME, path=lambda_path)352 result = safe_requests.post(353 url, data=b"{}", headers={"User-Agent": "python-requests/testing"}354 )355 content = json.loads(result.content)356 assert lambda_path == content["path"]357 assert lambda_resource == content["resource"]358 assert lambda_request_context_path == content["requestContext"]["path"]359 assert lambda_request_context_resource_path == content["requestContext"]["resourcePath"]360class TestKinesisSource:361 @patch.object(config, "SYNCHRONOUS_KINESIS_EVENTS", False)362 def test_kinesis_lambda_parallelism(363 self,364 lambda_client,365 kinesis_client,366 create_lambda_function,367 kinesis_create_stream,368 wait_for_stream_ready,369 logs_client,370 lambda_su_role,371 ):372 function_name = f"lambda_func-{short_uid()}"373 stream_name = f"test-foobar-{short_uid()}"374 create_lambda_function(375 handler_file=TEST_LAMBDA_PARALLEL_FILE,376 func_name=function_name,377 runtime=LAMBDA_RUNTIME_PYTHON36,378 role=lambda_su_role,379 )380 kinesis_create_stream(StreamName=stream_name, ShardCount=1)381 stream_arn = kinesis_client.describe_stream(StreamName=stream_name)["StreamDescription"][382 "StreamARN"383 ]384 wait_for_stream_ready(stream_name=stream_name)385 lambda_client.create_event_source_mapping(386 EventSourceArn=stream_arn,387 FunctionName=function_name,388 StartingPosition="TRIM_HORIZON",389 BatchSize=10,390 )391 stream_summary = kinesis_client.describe_stream_summary(StreamName=stream_name)392 assert 1 == stream_summary["StreamDescriptionSummary"]["OpenShardCount"]393 num_events_kinesis = 10394 # assure async call395 start = time.perf_counter()396 kinesis_client.put_records(397 Records=[398 {"Data": '{"batch": 0}', "PartitionKey": f"test_{i}"}399 for i in range(0, num_events_kinesis)...
test_subscribers.py
Source:test_subscribers.py
1from os import path2from mock import patch, Mock3import nose.tools as nt4import botocore5from lambda_uploader import subscribers, config6EX_CONFIG = path.normpath(path.join(path.dirname(__file__),7 '../tests/configs'))8class TestKinesisSubscriber(object):9 @patch('lambda_uploader.subscribers.boto3.session.Session')10 def test_successfully_adds_kinesis_subscription(self, mocked_session):11 _mocked_lambda = Mock()12 _mocked_session = Mock()13 _mocked_session.client = Mock()14 _mocked_session.client.return_value = _mocked_lambda15 mocked_session.return_value = _mocked_session16 conf = config.Config(path.dirname(__file__),17 config_file=path.join(EX_CONFIG, 'lambda-with-subscription.json'))18 subscribers.create_subscriptions(conf, None)19 nt.assert_equals(True, _mocked_lambda.create_event_source_mapping.called)20 @patch('lambda_uploader.subscribers.boto3.session.Session')21 def test_successfully_updates_kinesis_subscription(self, mocked_session):22 resonse = {"Error": {"Code": "ResourceConflictException", "Message": ""}}23 err = botocore.exceptions.ClientError(resonse, "create_event_source_mapping")24 _mocked_lambda = Mock()25 _mocked_lambda.create_event_source_mapping.side_effect = err26 _mocked_lambda.list_event_source_mappings.return_value = {27 'EventSourceMappings': [{'UUID': 'myuuid'}]28 }29 _mocked_session = Mock()30 _mocked_session.client = Mock()31 _mocked_session.client.return_value = _mocked_lambda32 mocked_session.return_value = _mocked_session33 conf = config.Config(path.dirname(__file__),34 config_file=path.join(EX_CONFIG, 'lambda-with-subscription.json'))35 subscribers.create_subscriptions(conf, None)...
register_sqs.py
Source:register_sqs.py
1# client.create_event_source_mapping() works only if this 2# code is not run in a private VPC (either on ec2 or lambda).3# Might work in a private VPC if private lambda endpoints are available.4import json5import boto36def mylambda(event, context):7 print('Hello World from register_sqs!')8 print(event)9 register_sqs()10def register_sqs(): 11 print('checkpoint 0')12 client = boto3.client("sts") # account_id = client.get_caller_identity()#["Account"]13 caller = client.get_caller_identity()14 print(caller)15 print('checkpoint 1')16 client = boto3.client('lambda', 'eu-west-1')17 print('checkpoint 2')18 response = client.create_event_source_mapping(19 EventSourceArn = 'arn:aws:sqs:eu-west-1:094033154904:mysqs',20 FunctionName = 'arn:aws:lambda:eu-west-1:094033154904:function:mylambda',21 Enabled = True,22 BatchSize = 1)23 print('checkpoint 3')24 25 return {26 'statusCode': 200,27 'body': json.dumps('Hello from Lambda!')28 }29 30if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!