Best Python code snippet using localstack_python
awslambda.py
Source:awslambda.py
1# Copyright 2016-2017 Capital One Services, LLC2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14from __future__ import absolute_import, division, print_function, unicode_literals15import functools16import jmespath17import json18import six19from botocore.exceptions import ClientError20from botocore.paginate import Paginator21from concurrent.futures import as_completed22from c7n.actions import BaseAction, RemovePolicyBase23from c7n.filters import CrossAccountAccessFilter, ValueFilter24import c7n.filters.vpc as net_filters25from c7n.manager import resources26from c7n import query27from c7n.resources.iam import CheckPermissions28from c7n.tags import universal_augment29from c7n.utils import local_session, type_schema, generate_arn30ErrAccessDenied = "AccessDeniedException"31@resources.register('lambda')32class AWSLambda(query.QueryResourceManager):33 class resource_type(object):34 service = 'lambda'35 type = 'function'36 enum_spec = ('list_functions', 'Functions', None)37 name = id = 'FunctionName'38 filter_name = None39 date = 'LastModified'40 dimension = 'FunctionName'41 config_type = "AWS::Lambda::Function"42 universal_taggable = object()43 @property44 def generate_arn(self):45 """ Generates generic arn if ID is not already arn format.46 """47 if self._generate_arn is None:48 self._generate_arn = functools.partial(49 generate_arn,50 self.get_model().service,51 region=self.config.region,52 account_id=self.account_id,53 resource_type=self.get_model().type,54 separator=':')55 return self._generate_arn56 def get_source(self, source_type):57 if source_type == 'describe':58 return DescribeLambda(self)59 elif source_type == 'config':60 return ConfigLambda(self)61 raise ValueError("Unsupported source: %s for %s" % (62 source_type, self.resource_type.config_type))63class DescribeLambda(query.DescribeSource):64 def augment(self, resources):65 return universal_augment(66 self.manager, super(DescribeLambda, self).augment(resources))67class ConfigLambda(query.ConfigSource):68 def load_resource(self, item):69 resource = super(ConfigLambda, self).load_resource(item)70 resource['Tags'] = [71 {u'Key': k, u'Value': v} for k, v in item[72 'supplementaryConfiguration'].get('Tags', {}).items()]73 resource['c7n:Policy'] = item[74 'supplementaryConfiguration'].get('Policy')75 return resource76@AWSLambda.filter_registry.register('security-group')77class SecurityGroupFilter(net_filters.SecurityGroupFilter):78 RelatedIdsExpression = "VpcConfig.SecurityGroupIds[]"79@AWSLambda.filter_registry.register('subnet')80class SubnetFilter(net_filters.SubnetFilter):81 RelatedIdsExpression = "VpcConfig.SubnetIds[]"82@AWSLambda.filter_registry.register('vpc')83class VpcFilter(net_filters.VpcFilter):84 RelatedIdsExpression = "VpcConfig.VpcId"85AWSLambda.filter_registry.register('network-location', net_filters.NetworkLocation)86@AWSLambda.filter_registry.register('check-permissions')87class LambdaPermissions(CheckPermissions):88 def get_iam_arns(self, resources):89 return [r['Role'] for r in resources]90@AWSLambda.filter_registry.register('reserved-concurrency')91class ReservedConcurrency(ValueFilter):92 annotation_key = "c7n:FunctionInfo"93 value_key = '"c7n:FunctionInfo".Concurrency.ReservedConcurrentExecutions'94 schema = type_schema('reserved-concurrency', rinherit=ValueFilter.schema)95 permissions = ('lambda:GetFunction',)96 def validate(self):97 self.data['key'] = self.value_key98 return super(ReservedConcurrency, self).validate()99 def process(self, resources, event=None):100 self.data['key'] = self.value_key101 client = local_session(self.manager.session_factory).client('lambda')102 def _augment(r):103 try:104 r[self.annotation_key] = self.manager.retry(105 client.get_function, FunctionName=r['FunctionArn'])106 r[self.annotation_key].pop('ResponseMetadata')107 except ClientError as e:108 if e.response['Error']['Code'] == ErrAccessDenied:109 self.log.warning(110 "Access denied getting lambda:%s",111 r['FunctionName'])112 raise113 return r114 with self.executor_factory(max_workers=3) as w:115 resources = list(filter(None, w.map(_augment, resources)))116 return super(ReservedConcurrency, self).process(resources, event)117def get_lambda_policies(client, executor_factory, resources, log):118 def _augment(r):119 try:120 r['c7n:Policy'] = client.get_policy(121 FunctionName=r['FunctionName'])['Policy']122 except client.exceptions.ResourceNotFoundException:123 return None124 except ClientError as e:125 if e.response['Error']['Code'] == 'AccessDeniedException':126 log.warning(127 "Access denied getting policy lambda:%s",128 r['FunctionName'])129 return r130 results = []131 futures = {}132 with executor_factory(max_workers=3) as w:133 for r in resources:134 if 'c7n:Policy' in r:135 results.append(r)136 continue137 futures[w.submit(_augment, r)] = r138 for f in as_completed(futures):139 if f.exception():140 log.warning("Error getting policy for:%s err:%s",141 r['FunctionName'], f.exception())142 r = futures[f]143 continue144 results.append(f.result())145 return filter(None, results)146@AWSLambda.filter_registry.register('event-source')147class LambdaEventSource(ValueFilter):148 # this uses iam policy, it should probably use149 # event source mapping api150 annotation_key = "c7n:EventSources"151 schema = type_schema('event-source', rinherit=ValueFilter.schema)152 permissions = ('lambda:GetPolicy',)153 def process(self, resources, event=None):154 client = local_session(self.manager.session_factory).client('lambda')155 self.log.debug("fetching policy for %d lambdas" % len(resources))156 resources = get_lambda_policies(157 client, self.executor_factory, resources, self.log)158 self.data['key'] = self.annotation_key159 return super(LambdaEventSource, self).process(resources, event)160 def __call__(self, r):161 if 'c7n:Policy' not in r:162 return False163 sources = set()164 data = json.loads(r['c7n:Policy'])165 for s in data.get('Statement', ()):166 if s['Effect'] != 'Allow':167 continue168 if 'Service' in s['Principal']:169 sources.add(s['Principal']['Service'])170 if sources:171 r[self.annotation_key] = list(sources)172 return self.match(r)173@AWSLambda.filter_registry.register('cross-account')174class LambdaCrossAccountAccessFilter(CrossAccountAccessFilter):175 """Filters lambda functions with cross-account permissions176 The whitelist parameter can be used to prevent certain accounts177 from being included in the results (essentially stating that these178 accounts permissions are allowed to exist)179 This can be useful when combining this filter with the delete action.180 :example:181 .. code-block:: yaml182 policies:183 - name: lambda-cross-account184 resource: lambda185 filters:186 - type: cross-account187 whitelist:188 - 'IAM-Policy-Cross-Account-Access'189 """190 permissions = ('lambda:GetPolicy',)191 policy_attribute = 'c7n:Policy'192 def process(self, resources, event=None):193 client = local_session(self.manager.session_factory).client('lambda')194 self.log.debug("fetching policy for %d lambdas" % len(resources))195 resources = get_lambda_policies(196 client, self.executor_factory, resources, self.log)197 return super(LambdaCrossAccountAccessFilter, self).process(198 resources, event)199@AWSLambda.action_registry.register('remove-statements')200class RemovePolicyStatement(RemovePolicyBase):201 """Action to remove policy/permission statements from lambda functions.202 :example:203 .. code-block:: yaml204 policies:205 - name: lambda-remove-cross-accounts206 resource: lambda207 filters:208 - type: cross-account209 actions:210 - type: remove-statements211 statement_ids: matched212 """213 schema = type_schema(214 'remove-statements',215 required=['statement_ids'],216 statement_ids={'oneOf': [217 {'enum': ['matched']},218 {'type': 'array', 'items': {'type': 'string'}}]})219 permissions = ("lambda:GetPolicy", "lambda:RemovePermission")220 def process(self, resources):221 results = []222 client = local_session(self.manager.session_factory).client('lambda')223 for r in resources:224 try:225 if self.process_resource(client, r):226 results.append(r)227 except Exception:228 self.log.exception(229 "Error processing lambda %s", r['FunctionArn'])230 return results231 def process_resource(self, client, resource):232 if 'c7n:Policy' not in resource:233 try:234 resource['c7n:Policy'] = client.get_policy(235 FunctionName=resource['FunctionName']).get('Policy')236 except ClientError as e:237 if e.response['Error']['Code'] != ErrAccessDenied:238 raise239 resource['c7n:Policy'] = None240 if not resource['c7n:Policy']:241 return242 p = json.loads(resource['c7n:Policy'])243 statements, found = self.process_policy(244 p, resource, CrossAccountAccessFilter.annotation_key)245 if not found:246 return247 for f in found:248 client.remove_permission(249 FunctionName=resource['FunctionName'],250 StatementId=f['Sid'])251@AWSLambda.action_registry.register('set-concurrency')252class SetConcurrency(BaseAction):253 """Set lambda function concurrency to the desired level.254 Can be used to set the reserved function concurrency to an exact value,255 to delete reserved concurrency, or to set the value to an attribute of256 the resource.257 """258 schema = type_schema(259 'set-concurrency',260 required=('value',),261 **{'expr': {'type': 'boolean'},262 'value': {'oneOf': [263 {'type': 'string'},264 {'type': 'integer'},265 {'type': 'null'}]}})266 permissions = ('lambda:DeleteFunctionConcurrency',267 'lambda:PutFunctionConcurrency')268 def validate(self):269 if self.data.get('expr', False) and not isinstance(self.data['value'], six.text_type):270 raise ValueError("invalid value expression %s" % self.data['value'])271 return self272 def process(self, functions):273 client = local_session(self.manager.session_factory).client('lambda')274 is_expr = self.data.get('expr', False)275 value = self.data['value']276 if is_expr:277 value = jmespath.compile(value)278 none_type = type(None)279 for function in functions:280 fvalue = value281 if is_expr:282 fvalue = value.search(function)283 if isinstance(fvalue, float):284 fvalue = int(fvalue)285 if isinstance(value, int) or isinstance(value, none_type):286 self.policy.log.warning(287 "Function: %s Invalid expression value for concurrency: %s",288 function['FunctionName'], fvalue)289 continue290 if fvalue is None:291 client.delete_function_concurrency(292 FunctionName=function['FunctionName'])293 else:294 client.put_function_concurrency(295 FunctionName=function['FunctionName'],296 ReservedConcurrentExecutions=fvalue)297@AWSLambda.action_registry.register('delete')298class Delete(BaseAction):299 """Delete a lambda function (including aliases and older versions).300 :example:301 .. code-block:: yaml302 policies:303 - name: lambda-delete-dotnet-functions304 resource: lambda305 filters:306 - Runtime: dotnetcore1.0307 actions:308 - delete309 """310 schema = type_schema('delete')311 permissions = ("lambda:DeleteFunction",)312 def process(self, functions):313 client = local_session(self.manager.session_factory).client('lambda')314 for function in functions:315 try:316 client.delete_function(FunctionName=function['FunctionName'])317 except ClientError as e:318 if e.response['Error']['Code'] == "ResourceNotFoundException":319 continue320 raise321 self.log.debug("Deleted %d functions", len(functions))322@resources.register('lambda-layer')323class LambdaLayerVersion(query.QueryResourceManager):324 """Note custodian models the lambda layer version.325 Layers end up being a logical asset, the physical asset for use326 and management is the layer verison.327 To ease that distinction, we support querying just the latest328 layer version or having a policy against all layer versions.329 By default we query all versions, the following is an example330 to query just the latest.331 .. code-block:: yaml332 policies:333 - name: lambda-layer334 resource: lambda335 query:336 - version: latest337 """338 class resource_type(object):339 service = 'lambda'340 type = 'function'341 enum_spec = ('list_layers', 'Layers', None)342 name = id = 'LayerName'343 filter_name = None344 date = 'CreatedDate'345 dimension = None346 config_type = None347 def augment(self, resources):348 versions = {}349 for r in resources:350 versions[r['LayerName']] = v = r['LatestMatchingVersion']351 v['LayerName'] = r['LayerName']352 if {'version': 'latest'} in self.data.get('query', []):353 return list(versions.values())354 layer_names = list(versions)355 client = local_session(self.session_factory).client('lambda')356 versions = []357 for layer_name in layer_names:358 pager = get_layer_version_paginator(client)359 for v in pager.paginate(360 LayerName=layer_name).build_full_result().get('LayerVersions'):361 v['LayerName'] = layer_name362 versions.append(v)363 return versions364def get_layer_version_paginator(client):365 pager = Paginator(366 client.list_layer_versions,367 {'input_token': 'NextToken',368 'output_token': 'NextToken',369 'result_key': 'LayerVersions'},370 client.meta.service_model.operation_model('ListLayerVersions'))371 pager.PAGE_ITERATOR_CLS = query.RetryPageIterator372 return pager373@LambdaLayerVersion.filter_registry.register('cross-account')374class LayerCrossAccount(CrossAccountAccessFilter):375 permissions = ('lambda:GetLayerVersionPolicy',)376 def process(self, resources, event=None):377 client = local_session(self.manager.session_factory).client('lambda')378 for r in resources:379 r['c7n:Policy'] = self.manager.retry(380 client.get_layer_version_policy,381 LayerName=r['LayerName'],382 VersionNumber=r['Version']).get('Policy')383 return super(LayerCrossAccount, self).process(resources)384 def get_resource_policy(self, r):385 return r['c7n:Policy']386@LambdaLayerVersion.action_registry.register('remove-statements')387class LayerRemovePermissions(RemovePolicyBase):388 schema = type_schema(389 'remove-statements',390 required=['statement_ids'],391 statement_ids={'oneOf': [392 {'enum': ['matched']},393 {'type': 'array', 'items': {'type': 'string'}}]})394 permissions = (395 "lambda:GetLayerVersionPolicy",396 "lambda:RemoveLayerVersionPermission")397 def process(self, resources):398 client = local_session(self.manager.session_factory).client('lambda')399 for r in resources:400 self.process_resource(client, r)401 def process_resource(self, client, r):402 if 'c7n:Policy' not in r:403 try:404 r['c7n:Policy'] = self.manager.retry(405 client.get_layer_version_policy,406 LayerName=r['LayerName'],407 VersionNumber=r['Version'])408 except client.exceptions.ResourceNotFound:409 return410 p = json.loads(r['c7n:Policy'])411 statements, found = self.process_policy(412 p, r, CrossAccountAccessFilter.annotation_key)413 if not found:414 return415 for f in found:416 self.manager.retry(417 client.remove_layer_version_permission,418 LayerName=r['LayerName'],419 StatementId=f['Sid'],420 VersionNumber=r['Version'])421@LambdaLayerVersion.action_registry.register('delete')422class DeleteLayerVersion(BaseAction):423 schema = type_schema('delete')424 permissions = ('lambda:DeleteLayerVersion',)425 def process(self, resources):426 client = local_session(427 self.manager.session_factory).client('lambda')428 for r in resources:429 try:430 self.manager.retry(431 client.delete_layer_version,432 LayerName=r['LayerName'],433 VersionNumber=r['Version'])434 except client.exceptions.ResourceNotFound:...
inventory-lambdas.py
Source:inventory-lambdas.py
...118 try:119 resource_item['supplementaryConfiguration']['LayerVersions'] = []120 response = client.list_layer_versions(LayerName=layer['LayerName'], MaxItems=50)121 for version in response['LayerVersions']:122 version['Policy'] = client.get_layer_version_policy(LayerName=layer['LayerName'], VersionNumber=version['Version'])123 resource_item['supplementaryConfiguration']['LayerVersions'].append(version)124 except ClientError as e:125 message = f"Error getting the Policy for layer {layer['LayerName']} in {region} for {target_account.account_name}: {e}"126 resource_item['errors']['LayerVersions'] = message127 logger.warning(message)...
lambdax.py
Source:lambdax.py
1import json2import logging3from typing import Any, Dict, Generator, Optional, Tuple4from botocore.exceptions import ClientError5from introspector.aws.fetch import ServiceProxy6from introspector.aws.svc import RegionalService, ServiceSpec, resource_gate7_log = logging.getLogger(__name__)8HAS_TAGS = {'list_functions': 'FunctionArn'}9def _get_policy(proxy: ServiceProxy, arn: str) -> Optional[Dict]:10 try:11 policy_resp = proxy.get('get_policy', FunctionName=arn)12 if policy_resp is not None:13 return json.loads(policy_resp['Policy'])14 except ClientError as e:15 if e.response.get('Error', {}).get('Code') != 'ResourceNotFoundException':16 raise17 return None18def _import_function(proxy: ServiceProxy, function: Dict, spec: ServiceSpec):19 name = function['FunctionName']20 arn = function['FunctionArn']21 if resource_gate(spec, 'FunctionVersion'):22 versions_resp = proxy.list('list_versions_by_function', FunctionName=name)23 if versions_resp is not None:24 versions = versions_resp[1]['Versions']25 for version in versions:26 version['ParentFunctionArn'] = arn27 version['Policy'] = _get_policy(proxy, version['FunctionArn'])28 yield 'FunctionVersion', version29 if resource_gate(spec, 'Alias'):30 aliases_resp = proxy.list('list_aliases', FunctionName=name)31 if aliases_resp is not None:32 aliases = aliases_resp[1]['Aliases']33 for alias in aliases:34 alias['FunctionArn'] = arn35 alias['Policy'] = _get_policy(proxy, alias['AliasArn'])36 yield 'Alias', alias37def _import_functions(proxy: ServiceProxy, spec: ServiceSpec):38 functions_resp = proxy.list('list_functions')39 if functions_resp is not None:40 functions = functions_resp[1]['Functions']41 for function in functions:42 arn_for_tags = HAS_TAGS.get('list_functions')43 if arn_for_tags is not None:44 arn = function[arn_for_tags]45 tags_result = proxy.list('list_tags', Resource=arn)46 if tags_result is not None:47 function['Tags'] = tags_result[1].get('Tags', [])48 function['Policy'] = _get_policy(proxy, function['FunctionArn'])49 yield 'Function', function50 yield from _import_function(proxy, function, spec)51def _import_layer(proxy: ServiceProxy, layer_arn: str, layer_name: str):52 layer_versions_resp = proxy.list('list_layer_versions', LayerName=layer_arn)53 if layer_versions_resp is not None:54 layer_versions = layer_versions_resp[1]['LayerVersions']55 for layer_version in layer_versions:56 policy = proxy.get('get_layer_version_policy', LayerName=layer_arn, VersionNumber=layer_version['Version'])57 if policy is not None:58 policy_string = policy.get('Policy')59 if policy_string is not None:60 layer_version['Policy'] = json.loads(policy_string)61 else:62 layer_version['Policy'] = None63 layer_version['PolicyRevisionId'] = policy.get('RevisionId')64 layer_version['Name'] = layer_name65 yield 'LayerVersion', layer_version66def _import_layers(proxy: ServiceProxy):67 layers_resp = proxy.list('list_layers')68 if layers_resp is not None:69 layer_versions = layers_resp[1]['Layers']70 for layer_version in layer_versions:71 layer_arn = layer_version['LayerArn']72 yield from _import_layer(proxy, layer_arn, layer_version['LayerName'])73def _import_lambda_region(74 proxy: ServiceProxy, region: str,75 spec: ServiceSpec) -> Generator[Tuple[str, Any], None, None]:76 if resource_gate(spec, 'Function'):77 _log.info(f'Importing functions in {region}')78 yield from _import_functions(proxy, spec)79 if resource_gate(spec, 'LayerVersion'):80 yield from _import_layers(proxy)81 # TODO: event sources...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!