Best Python code snippet using localstack_python
provider.py
Source:provider.py
...67 self.lambda_service = LambdaService()68 self.lock = threading.RLock()69 def on_before_stop(self) -> None:70 self.lambda_service.stop()71 def _map_config_out(self, version: FunctionVersion) -> FunctionConfiguration:72 return FunctionConfiguration(73 RevisionId=version.config_meta.revision_id,74 FunctionName=version.id.function_name,75 FunctionArn=version.id.qualified_arn(),76 LastModified=version.config_meta.last_modified,77 LastUpdateStatus=LastUpdateStatus.Successful,78 State=State.Active,79 Version=version.id.qualifier,80 Description=version.config.description,81 Role=version.config.role,82 Timeout=version.config.timeout,83 Runtime=version.config.runtime,84 Handler=version.config.handler,85 Environment=EnvironmentResponse(Variables=version.config.environment, Error={}),86 CodeSize=version.config_meta.code_size,87 CodeSha256=version.config_meta.coda_sha256,88 MemorySize=version.config.memory_size,89 PackageType=version.config.package_type,90 TracingConfig=TracingConfig(Mode=version.config.tracing_config_mode),91 Architectures=version.config.architectures,92 )93 def create_function(94 self,95 context: RequestContext,96 function_name: FunctionName,97 role: RoleArn,98 code: FunctionCode,99 runtime: Runtime = None,100 handler: Handler = None,101 description: Description = None,102 timeout: Timeout = None,103 memory_size: MemorySize = None,104 publish: Boolean = None,105 vpc_config: VpcConfig = None, # TODO: ignored106 package_type: PackageType = None,107 dead_letter_config: DeadLetterConfig = None, # TODO: ignored108 environment: Environment = None,109 kms_key_arn: KMSKeyArn = None, # TODO: ignored110 tracing_config: TracingConfig = None, # TODO: ignored111 tags: Tags = None,112 layers: LayerList = None, # TODO: ignored113 file_system_configs: FileSystemConfigList = None, # TODO: ignored114 image_config: ImageConfig = None, # TODO: ignored115 code_signing_config_arn: CodeSigningConfigArn = None, # TODO: ignored116 architectures: ArchitecturesList = None,117 ) -> FunctionConfiguration:118 # TODO: initial validations119 if architectures and Architecture.arm64 in architectures:120 raise ServiceException("ARM64 is currently not supported by this provider")121 version = self.lambda_service.create_function(122 context.account_id,123 context.region,124 function_name=function_name,125 function_config=VersionFunctionConfiguration(126 description=description or "",127 role=role,128 timeout=timeout or 3,129 runtime=runtime,130 memory_size=memory_size or 128,131 handler=handler,132 package_type=PackageType.Zip, # TODO133 reserved_concurrent_executions=0,134 environment={}, # TODO135 # environment={k: v for k,v in environment['Variables'].items()},136 architectures=[Architecture.x86_64],137 tracing_config_mode=TracingMode.PassThrough,138 image_config=None,139 layers=[],140 ),141 code=Code(zip_file=code["ZipFile"]), # TODO: s3142 )143 return self._map_config_out(version)144 def _map_to_list_response(self, config: FunctionConfiguration) -> FunctionConfiguration:145 shallow_copy = config.copy()146 for k in [147 "State",148 "StateReason",149 "StateReasonCode",150 "LastUpdateStatus",151 "LastUpdateStatusReason",152 "LastUpdateStatusReasonCode",153 ]:154 if shallow_copy.get(k):155 del shallow_copy[k]156 return shallow_copy157 def list_functions(158 self,159 context: RequestContext,160 master_region: MasterRegion = None, # (only relevant for lambda@edge)161 function_version: FunctionVersion = None, # TODO162 marker: String = None, # TODO163 max_items: MaxListItems = None, # TODO164 ) -> ListFunctionsResponse:165 versions = self.lambda_service.list_function_versions(context.region)166 return ListFunctionsResponse(167 Functions=[self._map_to_list_response(self._map_config_out(fc)) for fc in versions]168 )169 def get_function(170 self,171 context: RequestContext,172 function_name: NamespacedFunctionName,173 qualifier: Qualifier = None, # TODO174 ) -> GetFunctionResponse:175 version = self.lambda_service.get_function_version(176 context.region, function_name, qualifier or "$LATEST"177 )178 return GetFunctionResponse(179 Configuration=self._map_config_out(version),180 Code=FunctionCodeLocation(Location=""), # TODO181 Tags={}, # TODO182 Concurrency={}, # TODO183 )184 def invoke(185 self,186 context: RequestContext,187 function_name: NamespacedFunctionName,188 invocation_type: InvocationType = None,189 log_type: LogType = None,190 client_context: String = None,191 payload: Blob = None,192 qualifier: Qualifier = None,193 ) -> InvocationResponse:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!