Best Python code snippet using localstack_python
lambda_event.py
Source:lambda_event.py
1#!/usr/bin/python2# (c) 2016, Pierre Jodouin <pjodouin@virtualcomputing.solutions>3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)4from __future__ import absolute_import, division, print_function5__metaclass__ = type6ANSIBLE_METADATA = {'metadata_version': '1.1',7 'status': ['preview'],8 'supported_by': 'community'}9DOCUMENTATION = '''10---11module: lambda_event12short_description: Creates, updates or deletes AWS Lambda function event mappings.13description:14 - This module allows the management of AWS Lambda function event source mappings such as DynamoDB and Kinesis stream15 events via the Ansible framework. These event source mappings are relevant only in the AWS Lambda pull model, where16 AWS Lambda invokes the function.17 It is idempotent and supports "Check" mode. Use module M(lambda) to manage the lambda18 function itself and M(lambda_alias) to manage function aliases.19version_added: "2.2"20author: Pierre Jodouin (@pjodouin), Ryan Brown (@ryansb)21options:22 lambda_function_arn:23 description:24 - The name or ARN of the lambda function.25 required: true26 aliases: ['function_name', 'function_arn']27 state:28 description:29 - Describes the desired state.30 required: true31 default: "present"32 choices: ["present", "absent"]33 alias:34 description:35 - Name of the function alias. Mutually exclusive with C(version).36 required: true37 version:38 description:39 - Version of the Lambda function. Mutually exclusive with C(alias).40 required: false41 event_source:42 description:43 - Source of the event that triggers the lambda function.44 required: false45 default: stream46 choices: ['stream']47 source_params:48 description:49 - Sub-parameters required for event source.50 - I(== stream event source ==)51 - C(source_arn) The Amazon Resource Name (ARN) of the Kinesis or DynamoDB stream that is the event source.52 - C(enabled) Indicates whether AWS Lambda should begin polling the event source. Default is True.53 - C(batch_size) The largest number of records that AWS Lambda will retrieve from your event source at the54 time of invoking your function. Default is 100.55 - C(starting_position) The position in the stream where AWS Lambda should start reading.56 Choices are TRIM_HORIZON or LATEST.57 required: true58requirements:59 - boto360extends_documentation_fragment:61 - aws62'''63EXAMPLES = '''64---65# Example that creates a lambda event notification for a DynamoDB stream66- hosts: localhost67 gather_facts: no68 vars:69 state: present70 tasks:71 - name: DynamoDB stream event mapping72 lambda_event:73 state: "{{ state | default('present') }}"74 event_source: stream75 function_name: "{{ function_name }}"76 alias: Dev77 source_params:78 source_arn: arn:aws:dynamodb:us-east-1:123456789012:table/tableName/stream/2016-03-19T19:51:37.45779 enabled: True80 batch_size: 10081 starting_position: TRIM_HORIZON82 - name: Show source event83 debug:84 var: lambda_stream_events85'''86RETURN = '''87---88lambda_stream_events:89 description: list of dictionaries returned by the API describing stream event mappings90 returned: success91 type: list92'''93import re94import sys95try:96 import boto397 from botocore.exceptions import ClientError, ParamValidationError, MissingParametersError98 HAS_BOTO3 = True99except ImportError:100 HAS_BOTO3 = False101from ansible.module_utils.basic import AnsibleModule102from ansible.module_utils.ec2 import (HAS_BOTO3, boto3_conn, camel_dict_to_snake_dict, ec2_argument_spec,103 get_aws_connection_info)104# ---------------------------------------------------------------------------------------------------105#106# Helper Functions & classes107#108# ---------------------------------------------------------------------------------------------------109class AWSConnection:110 """111 Create the connection object and client objects as required.112 """113 def __init__(self, ansible_obj, resources, use_boto3=True):114 try:115 self.region, self.endpoint, aws_connect_kwargs = get_aws_connection_info(ansible_obj, boto3=use_boto3)116 self.resource_client = dict()117 if not resources:118 resources = ['lambda']119 resources.append('iam')120 for resource in resources:121 aws_connect_kwargs.update(dict(region=self.region,122 endpoint=self.endpoint,123 conn_type='client',124 resource=resource125 ))126 self.resource_client[resource] = boto3_conn(ansible_obj, **aws_connect_kwargs)127 # if region is not provided, then get default profile/session region128 if not self.region:129 self.region = self.resource_client['lambda'].meta.region_name130 except (ClientError, ParamValidationError, MissingParametersError) as e:131 ansible_obj.fail_json(msg="Unable to connect, authorize or access resource: {0}".format(e))132 # set account ID133 try:134 self.account_id = self.resource_client['iam'].get_user()['User']['Arn'].split(':')[4]135 except (ClientError, ValueError, KeyError, IndexError):136 self.account_id = ''137 def client(self, resource='lambda'):138 return self.resource_client[resource]139def pc(key):140 """141 Changes python key into Pascale case equivalent. For example, 'this_function_name' becomes 'ThisFunctionName'.142 :param key:143 :return:144 """145 return "".join([token.capitalize() for token in key.split('_')])146def ordered_obj(obj):147 """148 Order object for comparison purposes149 :param obj:150 :return:151 """152 if isinstance(obj, dict):153 return sorted((k, ordered_obj(v)) for k, v in obj.items())154 if isinstance(obj, list):155 return sorted(ordered_obj(x) for x in obj)156 else:157 return obj158def set_api_sub_params(params):159 """160 Sets module sub-parameters to those expected by the boto3 API.161 :param params:162 :return:163 """164 api_params = dict()165 for param in params.keys():166 param_value = params.get(param, None)167 if param_value:168 api_params[pc(param)] = param_value169 return api_params170def validate_params(module, aws):171 """172 Performs basic parameter validation.173 :param module:174 :param aws:175 :return:176 """177 function_name = module.params['lambda_function_arn']178 # validate function name179 if not re.search('^[\w\-:]+$', function_name):180 module.fail_json(181 msg='Function name {0} is invalid. Names must contain only alphanumeric characters and hyphens.'.format(function_name)182 )183 if len(function_name) > 64 and not function_name.startswith('arn:aws:lambda:'):184 module.fail_json(msg='Function name "{0}" exceeds 64 character limit'.format(function_name))185 elif len(function_name) > 140 and function_name.startswith('arn:aws:lambda:'):186 module.fail_json(msg='ARN "{0}" exceeds 140 character limit'.format(function_name))187 # check if 'function_name' needs to be expanded in full ARN format188 if not module.params['lambda_function_arn'].startswith('arn:aws:lambda:'):189 function_name = module.params['lambda_function_arn']190 module.params['lambda_function_arn'] = 'arn:aws:lambda:{0}:{1}:function:{2}'.format(aws.region, aws.account_id, function_name)191 qualifier = get_qualifier(module)192 if qualifier:193 function_arn = module.params['lambda_function_arn']194 module.params['lambda_function_arn'] = '{0}:{1}'.format(function_arn, qualifier)195 return196def get_qualifier(module):197 """198 Returns the function qualifier as a version or alias or None.199 :param module:200 :return:201 """202 qualifier = None203 if module.params['version'] > 0:204 qualifier = str(module.params['version'])205 elif module.params['alias']:206 qualifier = str(module.params['alias'])207 return qualifier208# ---------------------------------------------------------------------------------------------------209#210# Lambda Event Handlers211#212# This section defines a lambda_event_X function where X is an AWS service capable of initiating213# the execution of a Lambda function (pull only).214#215# ---------------------------------------------------------------------------------------------------216def lambda_event_stream(module, aws):217 """218 Adds, updates or deletes lambda stream (DynamoDb, Kinesis) event notifications.219 :param module:220 :param aws:221 :return:222 """223 client = aws.client('lambda')224 facts = dict()225 changed = False226 current_state = 'absent'227 state = module.params['state']228 api_params = dict(FunctionName=module.params['lambda_function_arn'])229 # check if required sub-parameters are present and valid230 source_params = module.params['source_params']231 source_arn = source_params.get('source_arn')232 if source_arn:233 api_params.update(EventSourceArn=source_arn)234 else:235 module.fail_json(msg="Source parameter 'source_arn' is required for stream event notification.")236 # check if optional sub-parameters are valid, if present237 batch_size = source_params.get('batch_size')238 if batch_size:239 try:240 source_params['batch_size'] = int(batch_size)241 except ValueError:242 module.fail_json(msg="Source parameter 'batch_size' must be an integer, found: {0}".format(source_params['batch_size']))243 # optional boolean value needs special treatment as not present does not imply False244 source_param_enabled = module.boolean(source_params.get('enabled', 'True'))245 # check if event mapping exist246 try:247 facts = client.list_event_source_mappings(**api_params)['EventSourceMappings']248 if facts:249 current_state = 'present'250 except ClientError as e:251 module.fail_json(msg='Error retrieving stream event notification configuration: {0}'.format(e))252 if state == 'present':253 if current_state == 'absent':254 starting_position = source_params.get('starting_position')255 if starting_position:256 api_params.update(StartingPosition=starting_position)257 else:258 module.fail_json(msg="Source parameter 'starting_position' is required for stream event notification.")259 if source_arn:260 api_params.update(Enabled=source_param_enabled)261 if source_params.get('batch_size'):262 api_params.update(BatchSize=source_params.get('batch_size'))263 try:264 if not module.check_mode:265 facts = client.create_event_source_mapping(**api_params)266 changed = True267 except (ClientError, ParamValidationError, MissingParametersError) as e:268 module.fail_json(msg='Error creating stream source event mapping: {0}'.format(e))269 else:270 # current_state is 'present'271 api_params = dict(FunctionName=module.params['lambda_function_arn'])272 current_mapping = facts[0]273 api_params.update(UUID=current_mapping['UUID'])274 mapping_changed = False275 # check if anything changed276 if source_params.get('batch_size') and source_params['batch_size'] != current_mapping['BatchSize']:277 api_params.update(BatchSize=source_params['batch_size'])278 mapping_changed = True279 if source_param_enabled is not None:280 if source_param_enabled:281 if current_mapping['State'] not in ('Enabled', 'Enabling'):282 api_params.update(Enabled=True)283 mapping_changed = True284 else:285 if current_mapping['State'] not in ('Disabled', 'Disabling'):286 api_params.update(Enabled=False)287 mapping_changed = True288 if mapping_changed:289 try:290 if not module.check_mode:291 facts = client.update_event_source_mapping(**api_params)292 changed = True293 except (ClientError, ParamValidationError, MissingParametersError) as e:294 module.fail_json(msg='Error updating stream source event mapping: {0}'.format(e))295 else:296 if current_state == 'present':297 # remove the stream event mapping298 api_params = dict(UUID=facts[0]['UUID'])299 try:300 if not module.check_mode:301 facts = client.delete_event_source_mapping(**api_params)302 changed = True303 except (ClientError, ParamValidationError, MissingParametersError) as e:304 module.fail_json(msg='Error removing stream source event mapping: {0}'.format(e))305 return camel_dict_to_snake_dict(dict(changed=changed, events=facts))306def main():307 """Produce a list of function suffixes which handle lambda events."""308 this_module = sys.modules[__name__]309 source_choices = ["stream"]310 argument_spec = ec2_argument_spec()311 argument_spec.update(312 dict(313 state=dict(required=False, default='present', choices=['present', 'absent']),314 lambda_function_arn=dict(required=True, default=None, aliases=['function_name', 'function_arn']),315 event_source=dict(required=False, default="stream", choices=source_choices),316 source_params=dict(type='dict', required=True, default=None),317 alias=dict(required=False, default=None),318 version=dict(type='int', required=False, default=0),319 )320 )321 module = AnsibleModule(322 argument_spec=argument_spec,323 supports_check_mode=True,324 mutually_exclusive=[['alias', 'version']],325 required_together=[]326 )327 # validate dependencies328 if not HAS_BOTO3:329 module.fail_json(msg='boto3 is required for this module.')330 aws = AWSConnection(module, ['lambda'])331 validate_params(module, aws)332 this_module_function = getattr(this_module, 'lambda_event_{0}'.format(module.params['event_source'].lower()))333 results = this_module_function(module, aws)334 module.exit_json(**results)335if __name__ == '__main__':...
lambda_last_used.py
Source:lambda_last_used.py
1"""2Identify when a lambda function was last modified or invoked. This script3helps identify lambda functions which can be removed from the AWS account4because nobody is using them.5This script is different from the others because it has a lot of external6dependencies and manual steps that need to be done before running it.7Requirements8 1. Enable CloudTrail logging to an S3 bucket9 2. Enable Lambda detailed logging in CloudTrail to get the Invoke calls10 3. Run cloudtrail-partitioner [0], no need to install it, just run the tool11 to get 90 day visibility.12 4. Use Athena to query the events and download the result as CSV. Use the13 this Athena query [1] to get all the data from the previously created14 partitions. Make sure you adjust the dates.15 5. Run this tool16[0] https://github.com/duo-labs/cloudtrail-partitioner/17[1] https://gist.github.com/andresriancho/512bfbae1ad8b175a36d6fdc32b8ccef18"""19import os20import sys21import csv22import json23import argparse24import boto325from dateutil.parser import parse26from datetime import datetime, timezone27from utils.regions import get_all_regions28from lambda_dump import get_lambda_functions_for_region29from utils.boto_error_handling import yield_handling_errors30DEFAULT_DATE = datetime(1970, 1, 1, tzinfo=timezone.utc)31def parse_arguments():32 parser = argparse.ArgumentParser()33 parser.add_argument(34 '--input',35 help='Athena-generated CSV file',36 required=True37 )38 parser.add_argument(39 '--profile',40 help='AWS profile from ~/.aws/credentials',41 required=True42 )43 args = parser.parse_args()44 try:45 session = boto3.Session(profile_name=args.profile)46 except Exception as e:47 print('%s' % e)48 sys.exit(1)49 csv_file = args.input50 if not os.path.exists(csv_file):51 print('%s is not a file' % csv_file)52 sys.exit(1)53 return csv_file, session54class LambdaData(object):55 def __init__(self, event_time, request_parameters, aws_region, event_source):56 self.event_time = event_time57 self.request_parameters = request_parameters58 self.aws_region = aws_region59 self.event_source = event_source60def parse_csv(csv_file):61 lambda_last_used_data = dict()62 with open(csv_file, newline='') as csv_file:63 reader = csv.reader(csv_file)64 headers = next(reader, None)65 for row in reader:66 (event_time, event_name, request_parameters, aws_region, event_source, resources) = row67 request_parameters = json.loads(request_parameters)68 event_time = parse(event_time)69 lambda_function_arn = request_parameters['functionName']70 if lambda_function_arn in lambda_last_used_data:71 # Might need to update the last used time72 if event_time > lambda_last_used_data[lambda_function_arn].event_time:73 lambda_last_used_data[lambda_function_arn] = LambdaData(event_time,74 request_parameters,75 aws_region,76 event_source)77 else:78 # New lambda function79 lambda_last_used_data[lambda_function_arn] = LambdaData(event_time,80 request_parameters,81 aws_region,82 event_source)83 return lambda_last_used_data84def sort_key(item):85 return item[1]86def print_output(lambda_last_used_data):87 data = []88 for lambda_function_arn, lambda_data in lambda_last_used_data.items():89 item = (lambda_function_arn, lambda_data.event_time,)90 data.append(item)91 data.sort(key=sort_key, reverse=True)92 for lambda_function_arn, event_time in data:93 if event_time is DEFAULT_DATE:94 msg = '%s NOT used during the tracking period'95 args = (lambda_function_arn,)96 print(msg % args)97 else:98 days_ago = datetime.now() - event_time.replace(tzinfo=None)99 days_ago = days_ago.days100 msg = '%s was last used %s days ago'101 args = (lambda_function_arn, days_ago)102 print(msg % args)103def dump_lambda_functions(session):104 all_lambda_functions = []105 for region in get_all_regions(session):106 client = session.client('lambda', region_name=region)107 iterator = yield_handling_errors(get_lambda_functions_for_region, client)108 for lambda_function in iterator:109 function_name = lambda_function['FunctionArn']110 all_lambda_functions.append(function_name)111 return all_lambda_functions112def merge_all_functions(lambda_last_used_data, all_lambda_functions):113 for lambda_function_arn in all_lambda_functions:114 if lambda_function_arn not in lambda_last_used_data:115 lambda_last_used_data[lambda_function_arn] = LambdaData(DEFAULT_DATE,116 None,117 None,118 None)119 return lambda_last_used_data120def main():121 csv_file, session = parse_arguments()122 print('Parsing CSV file...')123 lambda_last_used_data = parse_csv(csv_file)124 print('Getting all existing AWS Lambda functions...')125 all_lambda_functions = dump_lambda_functions(session)126 lambda_last_used_data = merge_all_functions(lambda_last_used_data, all_lambda_functions)127 print('')128 print('Result:')129 print('')130 print_output(lambda_last_used_data)131if __name__ == '__main__':...
test_tiingo_scheduler.py
Source:test_tiingo_scheduler.py
1import json2from datetime import datetime3from typing import List4import boto35import pytest6from .aws_helpers import get_matching_s3_keys7from .share import lambda_function_check_setup, lambda_function_setup_contain_env_var8def test_tiingo_scheduler_lambda_python_setup(version, environment, terraform_output):9 lambda_function_name = terraform_output["tiingo_scheduler_name"]["value"]10 region = terraform_output["region"]["value"]11 expected_timeout = 30012 lambda_function_check_setup(lambda_function_name, region, expected_timeout)13@pytest.mark.parametrize(14 "env_var",15 [16 ("AWS_S3_BUCKET"),17 ("TIINGO_FETCHER_FUNCTION_NAME"),18 ("LAMBDA_INVOCATION_TYPE"),19 ("TIINGO_TICKERS_FILE"),20 ],21)22def test_tiingo_scheduler_env_var_exist(23 version, environment, terraform_output, env_var24):25 lambda_function_name = terraform_output["tiingo_scheduler_name"]["value"]26 region = terraform_output["region"]["value"]27 lambda_function_setup_contain_env_var(lambda_function_name, region, env_var)28@pytest.mark.parametrize(29 "event_rule, json_param",30 [31 (32 "Monday_to_Friday_8pm_HKT",33 '{"filters": [{"exchange": "SHE"}, {"exchange": "SHG"}]}',34 )35 ],36)37def test_target_has_right_param_for_rule(38 terraform_output, environment, event_rule: str, json_param: str39):40 lambda_function_arn = terraform_output["tiingo_scheduler_arn"]["value"]41 region = terraform_output["region"]["value"]42 events_client = boto3.client("events", region_name=region)43 targets = events_client.list_rule_names_by_target(TargetArn=lambda_function_arn)44 assert len(targets["RuleNames"]) == 545 #46 # assert targets["Targets"][0]["Arn"] == lambda_function_arn47 # assert targets["Targets"][0]["Input"] == json_param48@pytest.mark.parametrize(49 "json_filter,expect_items",50 [51 (52 '[{"exchange": "NASDAQ", "asset_type": "Mutual Fund"}]',53 [54 "BVNSC",55 "CGVIC",56 "CIVEC",57 "CRUSC",58 "EMFN",59 "EMMT",60 "EVGBC",61 "EVLMC",62 "EVSTC",63 "EWJE",64 "FEFN",65 "FOANC",66 "IVENC",67 "IVFGC",68 "MOGLC",69 "NUCL",70 "PETZC",71 "RPIBC",72 "SRRIX",73 "XBGIOX",74 "XJEMDX",75 ],76 )77 ],78)79def test_invoke_tiingo_scheduler(80 terraform_output, json_filter: str, expect_items: List[str]81):82 region = terraform_output["region"]["value"]83 lambda_function_arn = terraform_output["tiingo_scheduler_name"]["value"]84 s3_bucket_name = terraform_output["s3_bucket_name"]["value"]85 base_path = "market_data_tiingo_scheduler"86 event = {"base_path": base_path, "filters": json.loads(json_filter)}87 payload = json.dumps(event)88 client_lambda = boto3.client("lambda", region_name=region)89 invoke_result = client_lambda.invoke(90 FunctionName=lambda_function_arn,91 InvocationType="RequestResponse",92 Payload=payload,93 )94 assert 200 == invoke_result["StatusCode"]95 assert "Handled" == invoke_result.get("FunctionError", "Handled")96 start_time = datetime.now()97 max_wait_time_s = 30098 result_file_keys = set(99 [f"{base_path}/{ticker}/1d/data.csv" for ticker in expect_items]100 )101 while True:102 file_keys = set(103 get_matching_s3_keys(s3_bucket_name, f"{base_path}/", "data.csv")104 )105 if len(file_keys) == len(result_file_keys):106 break107 loop_time = datetime.now()108 if (loop_time - start_time).seconds > max_wait_time_s:109 assert False, f"Max wait time greater than {max_wait_time_s}s"...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!