Best Python code snippet using localstack_python
sqs_api.py
Source:sqs_api.py
1from flask import Blueprint, request2from flask.json import jsonify3import server.sqs.sqs_service as sqs_service4sqs_api = Blueprint('sqs_api', __name__, template_folder='templates')5@sqs_api.route("/api/sqsListMessages")6def list_sqs_list_messages():7 print("====================================================")8 queue_name = request.args.get('queueName')9 queue_name = queue_name if queue_name else sqs_service.DEFAULT_QUEUE_NAME10 print('queue_name:{}'.format(queue_name))11 messages = sqs_service.list_messages(queue_name)12 print('num messages in queue: {}'.format(messages))13 return jsonify(messages)14@sqs_api.route("/api/sqsListQueues")15def list_sqs_queues():16 print("====================================================")17 queues_list = sqs_service.list_queues()18 return jsonify(queues_list)19@sqs_api.route("/api/sqsAddQueue")20def add_sqs_queue():21 print("====================================================")22 queue_name = request.args.get('queueName')23 if not queue_name:24 return 'queue name not informed'25 print('add queue_name:{}'.format(queue_name))26 sqs_service.add_queue(queue_name)27 return jsonify("{'queue added' : '" + queue_name + "'}")28@sqs_api.route("/api/sqsAddMessageToQueue")29def add_sqs_message():30 print("====================================================")31 queue_name = request.args.get('queueName')32 message = request.args.get('message')33 if not queue_name:34 return 'queue name not informed'35 if not message:36 return 'message not informed'37 print('add message:{}'.format(message))38 sqs_service.add_message_to_queue(message, queue_name)39 return jsonify("{'message added to queue' : '" + queue_name + "'}")40@sqs_api.route("/api/sqsDeleteQueue")41def delete_sqs_queue():42 print("====================================================")43 queue_url = request.args.get('queueName')44 if not queue_url:45 return 'queue url not informed'46 print('add queue_url:{}'.format(queue_url))47 sqs_service.delete_queue(queue_url)...
main.py
Source:main.py
1from configparser import ConfigParser2from science_api.online_queue.sqs_base_watcher import SqsBaseWatcher3from science_api.sqs.sqs_base import SqsBase4from science_api_examples.io.example_io import ExampleIO5if __name__ == '__main__':6 '''7 Runs a watcher to keep checking the sqs queue to run the API and post the result8 '''9 settings_path = './settings.cfg'10 parser = ConfigParser()11 parser.read(settings_path)12 sqs_setup = {13 'aws_region': parser.get('sqs_info', 'AWS_REGION'),14 'send_url': parser.get('sqs_info', 'SEND_URL'),15 'receive_url': parser.get('sqs_info', 'RECEIVE_URL'),16 'aws_access_key': parser.get('credentials', 'AWS_ACCESS_KEY_ID'),17 'aws_secret_key': parser.get('credentials', 'AWS_SECRET_ACCESS_KEY'),18 }19 watcher_setup = {20 'url': parser.get('post_info', 'URL')21 }22 sqs_api = SqsBase(region=sqs_setup['aws_region'],23 aws_access_key=sqs_setup['aws_access_key'],24 aws_secret_key=sqs_setup['aws_secret_key'],25 send_queue_url=sqs_setup['send_url'],26 receive_queue_url=sqs_setup['receive_url'])27 server_watcher = SqsBaseWatcher(sqs_api=sqs_api,28 io_class=ExampleIO,29 post_url=watcher_setup['url'])...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!