Best Python code snippet using pyresttest_python
DBConnector.py
Source:DBConnector.py
1# Standard Library2import json3import logging as logger4import time5# Third Party Libraries6import cx_Oracle7## Message Model8from Producer.message import Message 9class DBConnector :10 def __init__(self,name,dsn,username,passwd) -> None:11 self.name = name12 self.dsn = dsn13 self.username = username14 self.passwd = passwd15 self.encoding = "UTF-8"16 try: 17 self.pool = cx_Oracle.SessionPool(18 self.username,19 self.passwd,20 self.dsn,21 min=2,22 max=5,23 increment=3,24 getmode=cx_Oracle.SPOOL_ATTRVAL_TIMEDWAIT,25 encoding=self.encoding26 )27 # Acquire a connection from the pool28 connection = self.pool.acquire()29 self.connection = connection30 print("connected to : ",connection.version)31 logger.info(f"Connected to {connection.version} database")32 except cx_Oracle.DatabaseError as e:33 print("There is a problem with Oracle", e)34 finally:35 print("init session...")36 def query(self, sql, bind_variables=None) :37 try:38 conn = self.pool.acquire()39 cursor = conn.cursor()40 if bind_variables is None:41 logger.debug(f"Query: {sql}")42 cursor.execute(sql)43 else:44 logger.debug(f"Query: {sql} value: {bind_variables}")45 cursor.execute(sql, bind_variables)46 columns = [description[0] for description in cursor.description]47 results = []48 for row in cursor:49 results.append(dict(zip(columns, row)))50 self.pool.release(conn)51 return results52 except Exception as err:53 logger.exception(err) 54 def getCurrentSCN(self) :55 if(self.connection) :56 conn = self.pool.cursor()57 cursor = conn.cursor()58 query = cursor.execute('select current_scn from v$database')59 result = query.fetchone()60 return result[0]61 def dumpTable(self, table) :62 records=[]63 getTable = table.split(".")[1]64 getSegOwner = table.split(".")[0]65 conn = self.pool.acquire()66 cursor = conn.cursor()67 cursor.prefetchrows = 250068 cursor.arraysize = 300069 sql = 'SELECT * FROM (select * from {0}) WHERE rownum <= 20'.format(table)70 try:71 query = cursor.execute(sql)72 res = cursor.fetchall()73 # Get header 74 row_headers=[column[0] for column in query.description]75 # Get records76 for row in res :77 record = dict(zip(row_headers,row))78 message = Message(79 scn=0,80 seg_owner=getSegOwner,81 table_name=getTable,82 sql_redo=sql,83 operation="READ",84 data=record,85 timestamp=int(time.time()))86 ## add all to records87 records.append(message.dict())88 # Return all records 89 return records90 # return json.dumps(records,default=str)91 except Exception as err:...
api.py
Source:api.py
1import logging2import flask3import json4from flask import request, make_response5from datetime import datetime6import rds_config;7import pymysql;8logger = logging.getLogger()9logger.setLevel(logging.INFO)10#rds settings11host = rds_config.DB_HOST12port = rds_config.DB_PORT13db_name = rds_config.DB_NAME14password = rds_config.DB_PASSWORD15user = rds_config.DB_USER16conn = pymysql.connect(host=host, user=user, passwd=password, port=int(port),17 db=db_name, connect_timeout=5, autocommit=True)18app = flask.Flask(__name__)19app.config["DEBUG"] = True20headers = {"Content-Type": "application/json"}21def execute_select(days, CardType , CountryOrigin , AmountFrom , AmountTo ):22 select_statement = 'select * from transactions where TransactionDate > UNIX_TIMESTAMP(DATE_ADD(CURDATE(),INTERVAL - %(days)s DAY)) '23 bind_variables = {"days":days}24 if CardType is not None:25 select_statement += ' and CardType = %(card_type)s'26 bind_variables["card_type"] = CardType27 if CountryOrigin is not None:28 select_statement += ' and CountryOrigin = %(country)s'29 bind_variables["country"] = CountryOrigin30 31 if AmountFrom is not None and AmountTo is not None:32 select_statement += ' and Amount between %(from)s and %(to)s'33 bind_variables["from"] = AmountFrom34 bind_variables["to"] = AmountTo35 36 with conn.cursor() as cur:37 cur.execute(select_statement, bind_variables)38 result = cur.fetchall()39 40 return result41@app.route('/transactions', methods=['GET'])42def home():43 #first try to get the numbers from the args property44 days = request.args.get("n", type=int)45 card_type = request.args.get("CardType")46 country = request.args.get("CountryOrigin")47 amount_from = request.args.get("AmountFrom", type=float)48 amount_to = request.args.get("AmountTo", type=float)49 50 if days is None:51 raise Exception('Argument "n" is not found')52 53 result = execute_select(days, card_type, country, amount_from, amount_to)54 return make_response({"result":result}, 200)55if __name__ == '__main__':...
app.py
Source:app.py
1import json2import os3import logging4import pymysql5import boto36import rds_config7logger = logging.getLogger()8logger.setLevel(logging.INFO)9#rds settings10host = os.environ['DB_HOST']11port = os.environ['DB_PORT']12db_name = os.environ['DB_NAME']13password = rds_config.DB_PASSWORD14user = rds_config.DB_USER15try:16 conn = pymysql.connect(host=host, user=user, passwd=password, port=int(port),17 db=db_name, connect_timeout=5)18except pymysql.MySQLError as e:19 logger.error("ERROR: Unexpected error: Could not connect to MySQL instance.")20 raise e21logger.info("SUCCESS: Connection to RDS MySQL instance succeeded")22def lambda_handler(event, context):23 params = event['queryStringParameters']24 select_statement = 'select * from transactions where TransactionDate > UNIX_TIMESTAMP(DATE_ADD(CURDATE(),INTERVAL - %(days)s DAY)) '25 if 'n' not in params:26 raise Exception('Please provide the n parameter for the last n days')27 bind_variables = {"days": params['n']}28 29 if 'CardType' in params:30 select_statement += ' and CardType = %(card_type)s'31 bind_variables['card_type'] = params['CardType']32 33 if 'CountryOrigin' in params:34 select_statement += ' and CountryOrigin = %(country)s'35 bind_variables['country'] = params['CountryOrigin']36 37 if 'AmountFrom' in params and 'AmountTo' in params:38 select_statement += ' and Amount between %(from)s and %(to)s'39 bind_variables['from'] = params['AmountFrom']40 bind_variables['to'] = params['AmountTo']41 42 result = None43 try:44 with conn.cursor() as cur:45 cur.execute(select_statement, bind_variables)46 result = cur.fetchall()47 except:48 logger.error({"ERROR": "Unexpected error: Could not query MySQL instance.", "statement":select_statement, "bind_variables":bind_variables, "event":event})49 raise50 51 return {52 "statusCode": 200,53 "body": json.dumps({"result":result}),...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!