Best Python code snippet using localstack_python
app.py
Source: app.py
...10 aws_sqs11)12class ApigwDynamodbStepFunctionStack(core.Stack):13 LAMBDA_PYTHON_RUNTIME = lambda_.Runtime.PYTHON_3_814 def _create_lambda_function(15 self,16 function_name: str,17 environment: dict18 ):19 return lambda_.Function(20 scope=self,21 id=f"{function_name}_lambda_function",22 runtime=self.LAMBDA_PYTHON_RUNTIME,23 handler=f"lambda_handler.{function_name}",24 code=lambda_.Code.asset("./lambda_script"),25 environment=environment26 )27 def _dynamodb_update_in_sfn(28 self,29 table: aws_dynamodb.Table,30 status: str31 ):32 return aws_sfn_tasks.DynamoUpdateItem(33 scope=self,34 id=f"dynamodb_status_updated_as_{status}",35 # get id from StepFunctions state36 key={"id": aws_sfn_tasks.DynamoAttributeValue.from_string(aws_sfn.JsonPath.string_at("$.id"))},37 table=table,38 update_expression="set #status = :status",39 expression_attribute_names={40 "#status": "status"41 },42 expression_attribute_values={43 ":status": aws_sfn_tasks.DynamoAttributeValue.from_string(status)44 },45 result_path=f"$.status_{status}"46 )47 def __init__(self, scope: core.App, id_: str, stack_env: str, **kwargs) -> None:48 super().__init__(scope, id_, **kwargs)49 # create dynamo table50 demo_table = aws_dynamodb.Table(51 scope=self,52 id="demo_table",53 partition_key=aws_dynamodb.Attribute(54 name="id",55 type=aws_dynamodb.AttributeType.STRING56 ),57 write_capacity=3,58 read_capacity=3,59 removal_policy=core.RemovalPolicy.DESTROY60 )61 queue = aws_sqs.Queue(self, f"{id_}-SQSQueue")62 # create producer lambda function63 producer_lambda = self._create_lambda_function(64 function_name="producer",65 environment={66 "TABLE_NAME": demo_table.table_name,67 "QUEUE_URL": queue.queue_url68 }69 )70 queue.grant_send_messages(producer_lambda)71 # grant permission to lambda to write to demo table72 demo_table.grant_write_data(producer_lambda)73 # create consumer lambda function74 consumer_lambda = self._create_lambda_function(75 function_name="consumer",76 environment={"TABLE_NAME": demo_table.table_name}77 )78 # grant permission to lambda to read from demo table79 demo_table.grant_read_data(consumer_lambda)80 # api_gateway for root81 base_api = apigw_.RestApi(82 scope=self,83 id=f"{id_}-{stack_env}-apigw",84 rest_api_name=f"{id_}-{stack_env}-apigw",85 deploy_options=apigw_.StageOptions(stage_name=stack_env)86 )87 # /example entity88 api_entity = base_api.root.add_resource("example")89 # GET /example90 api_entity.add_method(91 http_method="GET",92 integration=apigw_.LambdaIntegration(93 handler=consumer_lambda,94 integration_responses=[95 apigw_.IntegrationResponse(96 status_code="200"97 )98 ]99 )100 )101 # POST /example102 api_entity.add_method(103 http_method="POST",104 integration=apigw_.LambdaIntegration(105 handler=producer_lambda,106 integration_responses=[107 apigw_.IntegrationResponse(108 status_code="200"109 )110 ]111 )112 )113 # ============= #114 # StepFunctions #115 # ============= #116 dynamodb_update_running_task = self._dynamodb_update_in_sfn(table=demo_table, status="running")117 wait_1_min = aws_sfn.Wait(118 scope=self,119 id="Wait one minutes as heavy task",120 time=aws_sfn.WaitTime.duration(core.Duration.minutes(1)),121 )122 dynamodb_update_complete_task = self._dynamodb_update_in_sfn(table=demo_table, status="complete")123 dynamodb_update_failure_task = self._dynamodb_update_in_sfn(table=demo_table, status="failure")124 check_task_status = aws_sfn.Choice(scope=self, id="Job Complete?")\125 .when(aws_sfn.Condition.string_equals("$.job_status", "success"), dynamodb_update_complete_task) \126 .otherwise(dynamodb_update_failure_task)127 # StepFunctions128 definition = dynamodb_update_running_task \129 .next(wait_1_min) \130 .next(check_task_status)131 sfn_process = aws_sfn.StateMachine(132 scope=self,133 id=f"{id_}-{stack_env}",134 definition=definition135 )136 # Lambda to invoke StepFunction137 sfn_invoke_lambda = self._create_lambda_function(138 function_name="invoke_step_function",139 environment={140 "STEP_FUNCTION_ARN": sfn_process.state_machine_arn,141 "QUEUE_URL": queue.queue_url142 }143 )144 # grant145 queue.grant_consume_messages(sfn_invoke_lambda)146 sfn_process.grant_start_execution(sfn_invoke_lambda)147 # ================ #148 # CloudWatch Event #149 # ================ #150 # Runs every 2 hour151 invoke_automatically = aws_events.Rule(...
test_lambda_api.py
Source: test_lambda_api.py
...33def create_lambda_function_aws(34 lambda_client,35):36 lambda_arns = []37 def _create_lambda_function(**kwargs):38 def _create_function():39 resp = lambda_client.create_function(**kwargs)40 lambda_arns.append(resp["FunctionArn"])41 def _is_not_pending():42 try:43 result = (44 lambda_client.get_function(FunctionName=resp["FunctionName"])[45 "Configuration"46 ]["State"]47 != "Pending"48 )49 return result50 except Exception as e:51 LOG.error(e)...
_manager.py
Source: _manager.py
...12 return True13 def create(self, name: str, zip_file_path: str, handler_name: str,14 role: Role) -> LambdaFunctionMetadata:15 zip_file = _read_zip_file_content(zip_file_path)16 self._create_lambda_function(name, role, handler_name, zip_file)17 function_url = self._create_lambda_function_public_url(name)18 return LambdaFunctionMetadata(name, function_url, role)19 def delete(self, name: str):20 self._client.delete_function_url_config(21 FunctionName=name,22 )23 self._client.delete_function(24 FunctionName=name,25 )26 def get_lambda_function(self, name: str) -> Optional[LambdaFunctionMetadata]:27 try:28 response = self._client.get_function(29 FunctionName=name,30 )31 except self._client.exceptions.ResourceNotFoundException:32 return None33 role = get_role_from_arn(arn=response["Configuration"]["Role"])34 response = self._client.get_function_url_config(FunctionName=name)35 url = response["FunctionUrl"]36 return LambdaFunctionMetadata(name, url, role)37 def _create_lambda_function_public_url(self, name):38 response = self._client.create_function_url_config(39 FunctionName=name,40 AuthType="NONE",41 )42 function_url = response['FunctionUrl']43 self._client.add_permission(44 Action="lambda:InvokeFunctionUrl",45 FunctionName=name,46 Principal="*",47 StatementId="FunctionURLAllowPublicAccess",48 FunctionUrlAuthType="NONE",49 )50 return function_url51 def _create_lambda_function(self, name, role, handler_name, zip_file):52 self._client.create_function(53 FunctionName=name,54 Runtime='python3.9',55 Role=role.arn,56 Handler=handler_name,57 Code=dict(ZipFile=zip_file),58 )59def _read_zip_file_content(zip_file_path: str):60 with open(zip_file_path, 'rb') as file:61 zip_file = file.read()...
Check out the latest blogs from LambdaTest on this topic:
The fact is not alien to us anymore that cross browser testing is imperative to enhance your application’s user experience. Enhanced knowledge of popular and highly acclaimed testing frameworks goes a long way in developing a new app. It holds more significance if you are a full-stack developer or expert programmer.
QA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.
Having a good web design can empower business and make your brand stand out. According to a survey by Top Design Firms, 50% of users believe that website design is crucial to an organization’s overall brand. Therefore, businesses should prioritize website design to meet customer expectations and build their brand identity. Your website is the face of your business, so it’s important that it’s updated regularly as per the current web design trends.
Enterprise resource planning (ERP) is a form of business process management software—typically a suite of integrated applications—that assists a company in managing its operations, interpreting data, and automating various back-office processes. The introduction of a new ERP system is analogous to the introduction of a new product into the market. If the product is not handled appropriately, it will fail, resulting in significant losses for the business. Most significantly, the employees’ time, effort, and morale would suffer as a result of the procedure.
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!