Best Python code snippet using autotest_python
test_sql_common.py
Source:test_sql_common.py
...51 for key in payload.keys():52 assert parameters[key] == payload[key]53 finally:54 if hasattr(connection, "_close_connection"):55 connection._close_connection()56 def test_createtable(self):57 connection = self.baseClass(None, self.app_config)58 try:59 # find all the columns types by column name we expect60 exected_col_types = {}61 for field in all_fields:62 field_name = field['name']63 field_type = field['input_type']64 col_type = connection.dialect.get_column_type(field_type)65 exected_col_types[field_name.lower()] = col_type.lower()66 connection._create_or_update_table(self.table_name, all_fields)67 # confirm that the table was created68 cursor = connection._start_transaction()69 try:70 get_cols_result = connection._execute_sql(71 cursor,72 self.inspect_cols_query)73 table_cols = {}74 for row in get_cols_result:75 col_name = row[0]76 col_type = row[1]77 table_cols[col_name.lower()] = col_type.lower()78 for col in exected_col_types.keys():79 if col in ('id', 'inc_id'):80 assert table_cols.get(col, '').startswith('int') or table_cols.get(col, '').startswith("number")81 else:82 db_col = table_cols.get(col, '')83 col_type = db_col.split("(")[0]84 col_type_simple = col_type.split(" ")[0]85 assert exected_col_types[col].startswith(col_type_simple)86 finally:87 cursor.close()88 finally:89 if hasattr(connection, "_close_connection"):90 connection._close_connection()91 def test_insert_row(self, result_payload):92 global flat_payload93 connection = self.baseClass(None, self.app_config)94 try:95 payload = flat_payload.copy()96 # convert data for the fields97 for field in all_fields:98 item_name = field['name']99 item_type = field['input_type']100 item_value = flat_payload[item_name]101 converted_value = TypeInfo.translate_value(None, field, item_value)102 payload[item_name] = converted_value103 # add data to table104 cursor = connection._start_transaction()105 if self.setup_stmt:106 connection._execute_sql(107 cursor,108 self.setup_stmt)109 cmd = connection.dialect.get_upsert(TABLE_NAME, self.all_field_names, self.all_field_types)110 params = connection.dialect.get_parameters(self.all_field_names, payload)111 insert_result = connection._execute_sql(112 cursor,113 cmd,114 params115 )116 connection._commit_transaction(cursor)117 if hasattr(insert_result, 'nextset'):118 while insert_result.nextset():119 row_count = insert_result.rowcount120 assert row_count == 1121 # get the row and confirm the values122 select_stmt = "select * from {} where id = {}".format(TABLE_NAME, payload["id"])123 select_result = connection._execute_sql(124 cursor,125 select_stmt)126 rows = cursor.fetchall()127 for row in rows:128 for ndx in range(len(select_result.description)):129 col_name = select_result.description[ndx][0].lower()130 assert row[ndx] == result_payload[col_name]131 finally:132 cursor.close()133 if hasattr(connection, "_close_connection"):134 connection._close_connection()135 def test_update_row(self, MAX_TEXT_SIZE, result_payload):136 global flat_payload137 connection = self.baseClass(None, self.app_config)138 try:139 payload = flat_payload.copy()140 # convert data for the fields141 for field in all_fields:142 item_name = field['name']143 item_type = field['input_type']144 item_value = flat_payload[item_name]145 converted_value = TypeInfo.translate_value(None, field, item_value)146 payload[item_name] = converted_value147 payload['test_text'] = u"a" * MAX_TEXT_SIZE148 payload_result = result_payload.copy()149 payload_result['test_text'] = payload['test_text']150 # add data to table151 cursor = connection._start_transaction()152 if self.setup_stmt:153 connection._execute_sql(154 cursor,155 self.setup_stmt)156 update_result = connection._execute_sql(157 cursor,158 connection.dialect.get_upsert(TABLE_NAME, self.all_field_names, self.all_field_types),159 connection.dialect.get_parameters(self.all_field_names, payload))160 connection._commit_transaction(cursor)161 #assert update_result.rowcount == 1162 # get the row and confirm the values163 select_stmt = "select * from {} where id = {}".format(TABLE_NAME, payload["id"])164 select_result = connection._execute_sql(165 cursor,166 select_stmt)167 rows = cursor.fetchall()168 for row in rows:169 for ndx in range(len(select_result.description)):170 col_name = select_result.description[ndx][0].lower()171 assert row[ndx] == payload_result[col_name]172 finally:173 cursor.close()174 if hasattr(connection, "_close_connection"):175 connection._close_connection()176 def test_delete_row(self):177 global flat_payload178 connection = self.baseClass(None, self.app_config)179 cursor = connection._start_transaction()180 try:181 # delete the row182 delete_result = connection._execute_sql(183 cursor,184 connection.dialect.get_delete(TABLE_NAME),185 [flat_payload['id']])186 connection._commit_transaction(cursor)187 assert delete_result.rowcount == 1188 finally:189 cursor.close()190 if hasattr(connection, "_close_connection"):191 connection._close_connection()192 def test_altertable(self, col_type='text'):193 global flat_payload194 ALTER_COL = "alter_col"195 connection = self.baseClass(None, self.app_config)196 cursor = connection._start_transaction()197 try:198 alter_result = connection._execute_sql(199 cursor,200 connection.dialect.get_add_column_to_table(TABLE_NAME, ALTER_COL, col_type))201 connection._commit_transaction(cursor)202 get_cols_result = connection._execute_sql(203 cursor,204 self.inspect_cols_query)205 rows = cursor.fetchall()206 assert len(rows) == 8 # number of columns207 finally:208 cursor.close()209 if hasattr(connection, "_close_connection"):210 connection._close_connection()211 def test_droptable(self):212 connection = self.baseClass(None, self.app_config)213 cursor = connection._start_transaction()214 try:215 alter_result = connection._execute_sql(216 cursor,217 "Drop table {}".format(TABLE_NAME))218 connection._commit_transaction(cursor)219 finally:220 cursor.close()221 if hasattr(connection, "_close_connection"):...
database_manager.py
Source:database_manager.py
...9 self.verify_table_exists('users')10 self.connection = None11 def _open_connection(self):12 self.connection = sqlite3.connect(self.config['db']['filename'])13 def _close_connection(self):14 self.connection.close()15 def create_users_table(self):16 self._open_connection()17 cur = self.connection.cursor()18 cur.execute(''' CREATE TABLE `users` (`user_id` INTEGER PRIMARY KEY, `positive_relationship` INTEGER, `negative_relationship` INTEGER) ''')19 self.connection.commit()20 self._close_connection()21 def insert_user(self, user_id, positive_rep: int = 0, negative_rep: int = 0):22 self._open_connection()23 cur = self.connection.cursor()24 cur.execute(''' INSERT INTO `users` (`user_id`, `positive_relationship`, `negative_relationship`) VALUES (?, ?, ?) ''', (user_id, positive_rep, negative_rep))25 print('Added entry {0}'.format(user_id))26 self.connection.commit()27 self._close_connection()28 def update_user(self, user_id: int, positive_rep: int, negative_rep: int):29 self._open_connection()30 cur = self.connection.cursor()31 cur.execute(''' UPDATE `users` SET `positive_relationship` = ?, `negative_relationship` = ? WHERE `user_id` = ? ''', (positive_rep, negative_rep, user_id))32 print('Updated entry {0}'.format(user_id))33 self.connection.commit()34 self._close_connection()35 def verify_user_exists(self, user_id: int):36 self._open_connection()37 cur = self.connection.cursor()38 cur.execute(''' SELECT * FROM `users` WHERE `user_id` = ? ''', (user_id,))39 result = cur.fetchone()40 self._close_connection()41 return result42 def verify_table_exists(self, table_name: str):43 self._open_connection()44 cur = self.connection.cursor()45 cur.execute(''' SELECT count(`name`) as 'exists' FROM `sqlite_master` WHERE `type` = 'table' AND `name` = 'users' ''')46 if cur.fetchone()[0] == 1:47 return48 self.connection.commit()49 self._close_connection()...
sqlite_database.py
Source:sqlite_database.py
...13 try:14 cursor.close()15 except sqlite3.Error as exception:16 pass17 def _close_connection(self, connection):18 try:19 connection.close()20 except sqlite3.Error as exception:21 pass22 def execute_update(self, statement, parameters):23 try:24 connection = self._create_connection()25 cursor = connection.cursor()26 cursor.execute(statement, parameters)27 connection.commit()28 success = True29 except sqlite3.Error as exception:30 print(exception)31 success = False32 finally:33 self._close_cursor(cursor)34 self._close_connection(connection)35 return success36 def execute_query(self, statement, parameters):37 try:38 connection = self._create_connection()39 cursor = connection.cursor()40 cursor.execute(statement, parameters)41 result = cursor.fetchall()42 except sqlite3.Error as exception:43 print(exception)44 result = None45 finally:46 self._close_cursor(cursor)47 self._close_connection(connection)48 return result49 50 def close_cursor_manually(self, cursor):51 self._close_cursor(cursor)52 53 def close_connection_manually(self, connection):54 self._close_connection(connection)55 56 def fetch_all_data(self, statement, parameters):57 try:58 connection = self._create_connection()59 cursor = connection.cursor()60 cursor.execute(statement, parameters)61 result = cursor.fetchall()62 except sqlite3.Error as exception:63 self._close_cursor(cursor)64 self._close_connection(connection)65 result = None66 cursor = None67 connection = None...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!