Best Python code snippet using Kiwi_python
database.py
Source:database.py
1import mysql2from mysql.connector import connect, errorcode3def escape(value):4 return value.replace('<', '<').replace('>', '>')5def escape_dict(dictionary):6 if isinstance(dictionary, list):7 for item in dictionary:8 escape_dict(item)9 elif dictionary is not None:10 for key, value in dictionary.items():11 if isinstance(value, dict):12 dictionary[key] = escape_dict(value)13 elif isinstance(value, str):14 dictionary[key] = escape(value)15 return dictionary16class Database:17 def __init__(self):18 self.config = {19 'user': 'magic_pad',20 'password': 'password',21 'host': '127.0.0.1',22 'database': 'magic_pad',23 'auth_plugin': 'mysql_native_password'24 }25 # Open connection26 self.conn = connect(**self.config)27 self.cursor = self.conn.cursor(prepared=True)28 def close(self):29 self.conn.close()30 def commit(self):31 self.conn.commit()32 def delete(self, query, custom_data=None, auto_commit=True):33 return self._execute(query, custom_data, auto_commit)34 def _execute(self, query, custom_data=None, require_commit=True):35 try:36 if custom_data is not None:37 self.cursor.execute(query, custom_data)38 else:39 self.cursor.execute(query)40 if require_commit:41 self.commit()42 return self.cursor43 except mysql.connector.Error as e:44 if e.errno == errorcode.ER_ACCESS_DENIED_ERROR:45 print("Could not connect to database: Incorrect credentials")46 elif e.errno == errorcode.ER_BAD_DB_ERROR:47 print("Could not locate database")48 else:49 print(e)50 return False51 def fetchall(self):52 results = self.cursor.fetchall()53 if results is None:54 return None55 return escape_dict([dict(zip([col[0] for col in self.cursor.description], row)) for row in results])56 def fetchmany(self, size):57 results = self.cursor.fetchmany(size)58 if results is None:59 return None60 return escape_dict([dict(zip([col[0] for col in self.cursor.description], row)) for row in results])61 def fetchone(self):62 result = self.cursor.fetchone()63 if result is None:64 return None65 return escape_dict(dict(zip([col[0] for col in self.cursor.description], result)))66 def flush(self):67 self.close()68 self.conn = connect(**self.config)69 self.cursor = self.conn.cursor(prepared=True)70 return self71 def insert(self, query, custom_data=None, auto_commit=True):72 return self._execute(query, custom_data, auto_commit)73 def select(self, query, custom_data=None):74 return self._execute(query, custom_data, False)75 def update(self, query, custom_data=None, auto_commit=True):...
handlers.py
Source:handlers.py
...3from datetime import timedelta4from modernrpc.handlers import JSONRPCHandler, XMLRPCHandler5class KiwiTCMSJsonRpcHandler(JSONRPCHandler):6 @staticmethod7 def escape_dict(result_dict):8 for (key, value) in result_dict.items():9 if isinstance(value, str):10 result_dict[key] = html.escape(value)11 elif isinstance(value, timedelta):12 result_dict[key] = value.total_seconds()13 @staticmethod14 def escape_list(result_list):15 for (index, item) in enumerate(result_list):16 if isinstance(item, str):17 result_list[index] = html.escape(item)18 elif isinstance(item, timedelta):19 result_list[index] = item.total_seconds()20 elif isinstance(item, dict):21 __class__.escape_dict(item)22 def execute_procedure(self, name, args=None, kwargs=None):23 """24 HTML escape every string before returning it to25 the client, which may as well be the webUI. This will26 prevent XSS attacks for pages which display whatever27 is in the DB (e.g. tags, components)28 """29 result = super().execute_procedure(name, args, kwargs)30 if isinstance(result, str):31 result = html.escape(result)32 elif isinstance(result, timedelta):33 result = result.total_seconds()34 elif isinstance(result, dict):35 self.escape_dict(result)36 elif isinstance(result, list):37 self.escape_list(result)38 return result39class KiwiTCMSXmlRpcHandler(XMLRPCHandler):40 @staticmethod41 def escape_dict(result_dict):42 for (key, value) in result_dict.items():43 if isinstance(value, timedelta):44 result_dict[key] = value.total_seconds()45 @staticmethod46 def escape_list(result_list):47 for (index, item) in enumerate(result_list):48 if isinstance(item, timedelta):49 result_list[index] = item.total_seconds()50 elif isinstance(item, dict):51 __class__.escape_dict(item)52 def execute_procedure(self, name, args=None, kwargs=None):53 result = super().execute_procedure(name, args, kwargs)54 if isinstance(result, timedelta):55 result = result.total_seconds()56 elif isinstance(result, dict):57 self.escape_dict(result)58 elif isinstance(result, list):59 self.escape_list(result)...
tr.py
Source:tr.py
1# Jayden 2020-09-14 13:34 AEST2import sys3# a function that deal with escape key4escape_dict = [['\\n', '\\r', '\\t', '\\b'],5 ['\n', '\r', '\t', '\b']]6 7def escape_key(arr):8 i = 09 while i < 4:10 arr = arr.replace(escape_dict[0][i], escape_dict[1][i])11 i += 112 return arr13# defensive checking 14if len(sys.argv) == 1:15 print("No arguments")16 sys.exit(3)17elif len(sys.argv) < 3:18 print("Not enough arguments")19 sys.exit(2)20elif len(sys.argv) > 3:21 print("Too many arguments")22 sys.exit(1)23# validation 24raw, new = escape_key(sys.argv[1]), escape_key(sys.argv[2])25if (len(raw) != len(new)):26 print("Invalid arguments")27 sys.exit(0)28# main part29output = []30while True:31 try:32 string = input()33 except EOFError:34 break35 i = 036 while(i < len(raw)):37 string = escape_key(string).replace(raw[i], new[i])38 i += 139 output.append(string)40if output:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!