Best Python code snippet using pandera_python
db_connection.py
Source:db_connection.py
1import os2import json3from collections import defaultdict, OrderedDict4class BaseDBConnection(object):5 """ Base KG connection for database6 """7 def __init__(self, db_path, chunksize):8 """ Create an connection to database9 :param db_path: database path10 :type db_path: str11 :param chunksize: the chunksize to load/write database12 :type chunksize: int13 """14 self._conn = None15 self.chunksize = chunksize16 def close(self):17 """ Close the connection safely18 """19 raise NotImplementedError20 def __del__(self):21 self.close()22 def create_table(self, table_name, columns, column_types):23 """ Create a table with given columns and types24 :param table_name: the table name to create25 :type table_name: str26 :param columns: the columns to create27 :type columns: List[str]28 :param column_types: the corresponding column types29 :type column_types: List[str]30 """31 raise NotImplementedError32 def get_columns(self, table_name, columns):33 """ Get column information from a table34 :param table_name: the table name to retrieve35 :type table_name: str36 :param columns: the columns to retrieve37 :type columns: List[str]38 :return: a list of retrieved rows39 :rtype: List[Dict[str, object]]40 """41 raise NotImplementedError42 def select_row(self, table_name, _id, columns):43 """ Select a row from a table44 :param table_name: the table name to retrieve45 :type table_name: str46 :param _id: the row id47 :type _id: str48 :param columns: the columns to retrieve49 :type columns: List[str]50 :return: a retrieved row51 :rtype: Dict[str, object]52 """53 raise NotImplementedError54 def select_rows(self, table_name, _ids, columns):55 """ Select rows from a table56 :param table_name: the table name to retrieve57 :type table_name: str58 :param _ids: the row ids59 :type _ids: List[str]60 :param columns: the columns to retrieve61 :type columns: List[str]62 :return: retrieved rows63 :rtype: List[Dict[str, object]]64 """65 raise NotImplementedError66 def insert_row(self, table_name, row):67 """ Insert a row into a table68 :param table_name: the table name to insert69 :type table_name: str70 :param row: the row to insert71 :type row: Dict[str, object]72 """73 raise NotImplementedError74 def insert_rows(self, table_name, rows):75 """ Insert several rows into a table76 :param table_name: the table name to insert77 :type table_name: str78 :param rows: the rows to insert79 :type rows: List[Dict[str, object]]80 """81 raise NotImplementedError82 def get_update_op(self, update_columns, operator):83 """ Get an update operator based on columns and a operator84 :param update_columns: a list of columns to update85 :type update_columns: List[str]86 :param operator: an operator that applies to the columns, including "+", "-", "*", "/", "="87 :type operator: str88 :return: an operator that suits the backend database89 :rtype: object90 """91 raise NotImplementedError92 def update_row(self, table_name, row, update_op, update_columns):93 """ Update a row that exists in a table94 :param table_name: the table name to update95 :type table_name: str96 :param row: a new row97 :type row: Dict[str, object]98 :param update_op: an operator that returned by `get_update_op`99 :type update_op: object100 :param update_columns: the columns to update101 :type update_columns: List[str]102 """103 raise NotImplementedError104 def update_rows(self, table_name, rows, update_ops, update_columns):105 """ Update rows that exist in a table106 :param table_name: the table name to update107 :type table_name: str108 :param rows: new rows109 :type rows: List[Dict[str, object]]110 :param update_ops: operator(s) that returned by `get_update_op`111 :type update_ops: Union[List[object], object]112 :param update_columns: the columns to update113 :type update_columns: List[str]114 """115 raise NotImplementedError116 def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):117 """ Retrieve rows by specific keys in some order118 :param table_name: the table name to retrieve119 :type table_name: str120 :param bys: the given columns to match121 :type bys: List[str]122 :param keys: the given values to match123 :type keys: List[str]124 :param columns: the given columns to retrieve125 :type columns: List[str]126 :param order_bys: the columns whose value are used to sort rows127 :type order_bys: List[str]128 :param reverse: whether to sort in a reversed order129 :type reverse: bool130 :param top_n: how many rows to return, default `None` for all rows131 :type top_n: int132 :return: retrieved rows133 :rtype: List[Dict[str, object]]134 """135 raise NotImplementedError136class SqliteDBConnection(BaseDBConnection):137 """ KG connection for SQLite database138 """139 def __init__(self, db_path, chunksize):140 """ Create an connection to SQLite database141 :param db_path: database path, e.g., /home/xliucr/ASER/KG.db142 :type db_path: str143 :param chunksize: the chunksize to load/write database144 :type chunksize: int145 """146 import sqlite3147 super(SqliteDBConnection, self).__init__(db_path, chunksize)148 self._conn = sqlite3.connect(db_path)149 def close(self):150 """ Close the connection safely151 """152 if self._conn:153 self._conn.close()154 def create_table(self, table_name, columns, column_types):155 """ Create a table with given columns and types156 :param table_name: the table name to create157 :type table_name: str158 :param columns: the columns to create159 :type columns: List[str]160 :param column_types: the corresponding column types, please refer to https://www.sqlite.org/datatype3.html161 :type column_types: List[str]162 """163 create_table = "CREATE TABLE %s (%s);" % (164 table_name, ",".join([' '.join(x) for x in zip(columns, column_types)])165 )166 self._conn.execute(create_table)167 self._conn.commit()168 def get_columns(self, table_name, columns):169 """ Get column information from a table170 :param table_name: the table name to retrieve171 :type table_name: str172 :param columns: the columns to retrieve173 :type columns: List[str]174 :return: a list of retrieved rows175 :rtype: List[Dict[str, object]]176 """177 select_table = "SELECT %s FROM %s;" % (",".join(columns), table_name)178 result = list(map(lambda x: OrderedDict(zip(columns, x)), self._conn.execute(select_table)))179 return result180 def select_row(self, table_name, _id, columns):181 """ Select a row from a table182 (suggestion: consider to use `select_rows` if you want to retrieve multiple rows)183 :param table_name: the table name to retrieve184 :type table_name: str185 :param _id: the row id186 :type _id: str187 :param columns: the columns to retrieve188 :type columns: List[str]189 :return: a retrieved row190 :rtype: Dict[str, object]191 """192 select_table = "SELECT %s FROM %s WHERE _id=?;" % (",".join(columns), table_name)193 result = list(self._conn.execute(select_table, [_id]))194 if len(result) == 0:195 return None196 else:197 return OrderedDict(zip(columns, result[0]))198 def select_rows(self, table_name, _ids, columns):199 """ Select rows from a table200 :param table_name: the table name to retrieve201 :type table_name: str202 :param _ids: the row ids203 :type _ids: List[str]204 :param columns: the columns to retrieve205 :type columns: List[str]206 :return: retrieved rows207 :rtype: List[Dict[str, object]]208 """209 if len(_ids) > 0:210 row_cache = dict()211 result = []212 for idx in range(0, len(_ids), self.chunksize):213 select_table = "SELECT %s FROM %s WHERE _id IN ('%s');" % (214 ",".join(columns), table_name, "','".join(_ids[idx:idx + self.chunksize])215 )216 result.extend(list(self._conn.execute(select_table)))217 for x in result:218 exact_match_row = OrderedDict(zip(columns, x))219 row_cache[exact_match_row["_id"]] = exact_match_row220 exact_match_rows = []221 for _id in _ids:222 exact_match_rows.append(row_cache.get(_id, None))223 return exact_match_rows224 else:225 return []226 def insert_row(self, table_name, row):227 """ Insert a row into a table228 (suggestion: consider to use `insert_rows` if you want to insert multiple rows)229 :param table_name: the table name to insert230 :type table_name: str231 :param row: the row to insert232 :type row: Dict[str, object]233 """234 insert_table = "INSERT INTO %s VALUES (%s)" % (table_name, ",".join(['?'] * (len(row))))235 self._conn.execute(insert_table, list(row.values()))236 self._conn.commit()237 def insert_rows(self, table_name, rows):238 """ Insert several rows into a table239 :param table_name: the table name to insert240 :type table_name: str241 :param rows: the rows to insert242 :type rows: List[Dict[str, object]]243 """244 if len(rows) > 0:245 insert_table = "INSERT INTO %s VALUES (%s)" % (table_name, ",".join(['?'] * (len(next(iter(rows))))))246 self._conn.executemany(insert_table, [list(row.values()) for row in rows])247 self._conn.commit()248 def get_update_op(self, update_columns, operator):249 """ Get an update operator based on columns and a operator250 :param update_columns: a list of columns to update251 :type update_columns: List[str]252 :param operator: an operator that applies to the columns, including "+", "-", "*", "/", "="253 :type operator: str254 :return: an operator that suits the backend database255 :rtype: str256 """257 if operator in "+-*/":258 update_ops = []259 for update_column in update_columns:260 update_ops.append(update_column + "=" + update_column + operator + "?")261 return ",".join(update_ops)262 elif operator == "=":263 update_ops = []264 for update_column in update_columns:265 update_ops.append(update_column + "=?")266 return ",".join(update_ops)267 else:268 raise NotImplementedError269 def _update_update_op(self, row, update_op, update_columns):270 update_op_sp = update_op.split('?')271 while len(update_op_sp) >= 0 and update_op_sp[-1] == '':272 update_op_sp.pop()273 assert len(update_op_sp) == len(update_columns)274 new_update_op = []275 for i in range(len(update_op_sp)):276 new_update_op.append(update_op_sp[i])277 if isinstance(row[update_columns[i]], str):278 new_update_op.append("'" + row[update_columns[i]].replace("'", "''") + "'")279 else:280 new_update_op.append(str(row[update_columns[i]]))281 return ''.join(new_update_op)282 def update_row(self, table_name, row, update_op, update_columns):283 """ Update a row that exists in a table284 (suggestion: consider to use `update_rows` if you want to update multiple rows)285 :param table_name: the table name to update286 :type table_name: str287 :param row: a new row288 :type row: Dict[str, object]289 :param update_op: an operator that returned by `get_update_op`290 :type update_op: str291 :param update_columns: the columns to update292 :type update_columns: List[str]293 """294 update_table = "UPDATE %s SET %s WHERE _id=?" % (table_name, update_op)295 self._conn.execute(update_table, [row[k] for k in update_columns] + [row["_id"]])296 self._conn.commit()297 def update_rows(self, table_name, rows, update_ops, update_columns):298 """ Update rows that exist in a table299 :param table_name: the table name to update300 :type table_name: str301 :param rows: new rows302 :type rows: List[Dict[str, object]]303 :param update_ops: operator(s) that returned by `get_update_op`304 :type update_ops: Union[List[str], str]305 :param update_columns: the columns to update306 :type update_columns: List[str]307 """308 if len(rows) > 0:309 if isinstance(update_ops, (tuple, list)): # +-*/310 assert len(rows) == len(update_ops)311 # group rows by op to speed up312 update_op_collections = defaultdict(list) # key: _update_update_op313 for i, row in enumerate(rows):314 # self.update_row(row, table_name, update_ops[i], update_columns)315 new_update_op = self._update_update_op(row, update_ops[i], update_columns)316 update_op_collections[new_update_op].append(row)317 for new_update_op, op_rows in update_op_collections.items():318 _ids = [row["_id"] for row in op_rows]319 for idx in range(0, len(_ids), self.chunksize):320 update_table = "UPDATE %s SET %s WHERE _id IN ('%s');" % (321 table_name, new_update_op, "','".join(_ids[idx:idx + self.chunksize])322 )323 self._conn.execute(update_table)324 else: # =325 update_op = update_ops326 # group rows by new values to speed up327 value_collections = defaultdict(list) # key: values of new values328 for row in rows:329 # self.update_row(row, table_name, update_op, update_columns)330 value_collections[json.dumps([row[k] for k in update_columns])].append(row)331 for new_update_op, op_rows in value_collections.items():332 new_update_op = self._update_update_op(op_rows[0], update_op, update_columns)333 _ids = [row["_id"] for row in op_rows]334 for idx in range(0, len(_ids), self.chunksize):335 update_table = "UPDATE %s SET %s WHERE _id IN ('%s');" % (336 table_name, new_update_op, "','".join(_ids[idx:idx + self.chunksize])337 )338 self._conn.execute(update_table)339 self._conn.commit()340 def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):341 """ Retrieve rows by specific keys in some order342 :param table_name: the table name to retrieve343 :type table_name: str344 :param bys: the given columns to match345 :type bys: List[str]346 :param keys: the given values to match347 :type keys: List[str]348 :param columns: the given columns to retrieve349 :type columns: List[str]350 :param order_bys: the columns whose value are used to sort rows351 :type order_bys: List[str]352 :param reverse: whether to sort in a reversed order353 :type reverse: bool354 :param top_n: how many rows to return, default `None` for all rows355 :type top_n: int356 :return: retrieved rows357 :rtype: List[Dict[str, object]]358 """359 key_match_events = []360 select_table = "SELECT %s FROM %s WHERE %s" % (361 ",".join(columns), table_name, " AND ".join(["%s=?" % (by) for by in bys])362 )363 if order_bys:364 select_table += " ORDER BY %s %s" % (",".join(order_bys), "DESC" if reverse else "ASC")365 if top_n:366 select_table += " LIMIT %d" % (top_n)367 select_table += ";"368 for x in self._conn.execute(select_table, keys):369 key_match_event = OrderedDict(zip(columns, x))370 key_match_events.append(key_match_event)371 return key_match_events372class MongoDBConnection(BaseDBConnection):373 """ KG connection for MongoDB374 """375 def __init__(self, db_path, chunksize):376 """ Create an connection to SQLite database377 :param db_path: database path, e.g., mongodb://localhost:27017/ASER378 :type db_path: str379 :param chunksize: the chunksize to load/write database380 :type chunksize: int381 """382 import pymongo383 super(MongoDBConnection, self).__init__(db_path, chunksize)384 host_port, db_name = os.path.split(db_path)385 self._client = pymongo.MongoClient(host_port, document_class=OrderedDict)386 self._conn = self._client[db_name]387 def close(self):388 """ Close the connection safely389 """390 self._client.close()391 def create_table(self, table_name):392 """ Create a table without the necessary to provide column information393 :param table_name: the table name to create394 :type table_name: str395 """396 self._conn[table_name]397 def __get_projection(self, columns):398 projection = {"_id": 0}399 for k in columns:400 projection[k] = 1401 return projection402 def get_columns(self, table_name, columns):403 """ Get column information from a table404 :param table_name: the table name to retrieve405 :type table_name: str406 :param columns: the columns to retrieve407 :type columns: List[str]408 :return: a list of retrieved rows409 :rtype: List[Dict[str, object]]410 """411 projection = self.__get_projection(columns)412 results = list(self._conn[table_name].find({}, projection))413 return results414 def select_row(self, table_name, _id, columns):415 """ Select a row from a table416 (suggestion: consider to use `select_rows` if you want to retrieve multiple rows)417 :param table_name: the table name to retrieve418 :type table_name: str419 :param _id: the row id420 :type _id: str421 :param columns: the columns to retrieve422 :type columns: List[str]423 :return: a retrieved row424 :rtype: Dict[str, object]425 """426 projection = self.__get_projection(columns)427 return self._conn[table_name].find_one({"_id": _id}, projection)428 def select_rows(self, table_name, _ids, columns):429 """ Select rows from a table430 :param table_name: the table name to retrieve431 :type table_name: str432 :param _ids: the row ids433 :type _ids: List[str]434 :param columns: the columns to retrieve435 :type columns: List[str]436 :return: retrieved rows437 :rtype: List[Dict[str, object]]438 """439 table = self._conn[table_name]440 exact_match_rows = []441 projection = self.__get_projection(columns)442 for idx in range(0, len(_ids), self.chunksize):443 query = {"_id": {'$in': _ids[idx:idx + self.chunksize]}}444 exact_match_rows.extend(table.find(query, projection))445 row_cache = {x["_id"]: x for x in exact_match_rows}446 exact_match_rows = [row_cache.get(_id, None) for _id in _ids]447 return exact_match_rows448 def insert_row(self, table_name, row):449 """ Insert a row into a table450 (suggestion: consider to use `insert_rows` if you want to insert multiple rows)451 :param table_name: the table name to insert452 :type table_name: str453 :param row: the row to insert454 :type row: Dict[str, object]455 """456 self._conn[table_name].insert_one(row)457 def insert_rows(self, table_name, rows):458 """ Insert several rows into a table459 :param table_name: the table name to insert460 :type table_name: str461 :param rows: the rows to insert462 :type rows: List[Dict[str, object]]463 """464 self._conn[table_name].insert_many(rows)465 def get_update_op(self, update_columns, operator):466 """ Get an update operator based on columns and a operator467 :param update_columns: a list of columns to update468 :type update_columns: List[str]469 :param operator: an operator that applies to the columns, including "+", "-", "*", "/", "="470 :type operator: str471 :return: an operator that suits the backend database472 :rtype: Dict[str, Dict[str, float]]473 """474 if operator == "+":475 update_ops = {}476 for update_column in update_columns:477 update_ops[update_column] = 1 # placeholder478 return {"$inc": update_ops}479 elif operator == "-":480 update_ops = {}481 for update_column in update_columns:482 update_ops[update_column] = -1 # placeholder483 return {"$inc": update_ops}484 elif operator == "*":485 update_ops = {}486 for update_column in update_columns:487 update_ops[update_column] = 2 # placeholder488 return {"$mul": update_ops}489 elif operator == "/":490 update_ops = {}491 for update_column in update_columns:492 update_ops[update_column] = 0.5 # placeholder493 return {"$mul": update_ops}494 elif operator == "=":495 update_ops = {}496 for update_column in update_columns:497 update_ops[update_column] = 1 # placeholder498 return {"$set": update_ops}499 else:500 raise NotImplementedError501 def _update_update_op(self, row, update_op, update_columns):502 """ Update the operator for a single row503 :param row: a new row504 :type row: Dict[str, object]505 :param update_op: an operator that returned by `get_update_op`506 :type update_op: Dict[str, Dict[str, float]]507 :param update_columns: the columns to update508 :type update_columns: List[str]509 :return: Dict[str, Dict[str, float]]510 :rtype: Dict[str, Dict[str, float]]511 """512 new_update_op = update_op.copy()513 for k, v in new_update_op.items():514 if k == "$inc":515 for update_column in update_columns:516 if v[update_column] == 1:517 v[update_column] = row[update_column]518 else:519 v[update_column] = -row[update_column]520 elif k == "$mul":521 for update_column in update_columns:522 if v[update_column] == 2:523 v[update_column] = row[update_column]524 else:525 v[update_column] = 1.0 / row[update_column]526 elif k == "$set":527 for update_column in update_columns:528 v[update_column] = row[update_column]529 return new_update_op530 def update_row(self, table_name, row, update_op, update_columns):531 """ Update a row that exists in a table532 (suggestion: consider to use `update_rows` if you want to update multiple rows)533 :param table_name: the table name to update534 :type table_name: str535 :param row: a new row536 :type row: Dict[str, object]537 :param update_op: an operator that returned by `get_update_op`538 :type update_op: Dict[str, Dict[str, float]]539 :param update_columns: the columns to update540 :type update_columns: List[str]541 """542 self._conn[table_name].update_one({"_id": row["_id"]}, self._update_update_op(row, update_op, update_columns))543 def update_rows(self, table_name, rows, update_ops, update_columns):544 """ Update rows that exist in a table545 :param table_name: the table name to update546 :type table_name: str547 :param rows: new rows548 :type rows: List[Dict[str, object]]549 :param update_ops: operator(s) that returned by `get_update_op`550 :type update_ops: Union[List[Dict[str, Dict[str, float]]], Dict[str, Dict[str, float]]]551 :param update_columns: the columns to update552 :type update_columns: List[str]553 """554 if len(rows) > 0:555 if isinstance(update_ops, (tuple, list)): # +-*/556 assert len(rows) == len(update_ops)557 update_op_collections = defaultdict(list)558 for i, row in enumerate(rows):559 # self.update_row(row, table_name, update_ops[i], update_columns)560 new_update_op = self._update_update_op(row, update_ops[i], update_columns)561 update_op_collections[json.dumps(new_update_op)].append(row)562 for new_update_op, op_rows in update_op_collections.items():563 new_update_op = json.loads(new_update_op)564 _ids = [row["_id"] for row in op_rows]565 for idx in range(0, len(_ids), self.chunksize):566 query = {"_id": {'$in': _ids[idx:idx + self.chunksize]}}567 self._conn[table_name].update_many(query, new_update_op)568 else: # =569 update_op = update_ops570 value_collections = defaultdict(list)571 for row in rows:572 value_collections[json.dumps([row[k] for k in update_columns])].append(row)573 for new_update_op, op_rows in value_collections.items():574 new_update_op = self._update_update_op(op_rows[0], update_op, update_columns)575 _ids = [row["_id"] for row in op_rows]576 for idx in range(0, len(_ids), self.chunksize):577 query = {"_id": {'$in': _ids[idx:idx + self.chunksize]}}578 self._conn[table_name].update_many(query, new_update_op)579 def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):580 """ Retrieve rows by specific keys in some order581 :param table_name: the table name to retrieve582 :type table_name: str583 :param bys: the given columns to match584 :type bys: List[str]585 :param keys: the given values to match586 :type keys: List[str]587 :param columns: the given columns to retrieve588 :type columns: List[str]589 :param order_bys: the columns whose value are used to sort rows590 :type order_bys: List[str]591 :param reverse: whether to sort in a reversed order592 :type reverse: bool593 :param top_n: how many rows to return, default `None` for all rows594 :type top_n: int595 :return: retrieved rows596 :rtype: List[Dict[str, object]]597 """598 query = OrderedDict(zip(bys, keys))599 projection = self.__get_projection(columns)600 cursor = self._conn[table_name].find(query, projection)601 if order_bys:602 direction = -1 if reverse else 1603 cursor = cursor.sort([(k, direction) for k in order_bys])604 if top_n:605 result = []606 for x in cursor:607 result.append(x)608 if len(result) >= top_n:609 break610 return result611 else:...
base.py
Source:base.py
1try:2 import ujson as json3except:4 import json5from collections import defaultdict, OrderedDict6class BaseConnection(object):7 def __init__(self, db_path, chunksize):8 self._conn = None9 self.chunksize = chunksize10 def close(self):11 if self._conn:12 self._conn.close()13 def __del__(self):14 self.close()15 def create_table(self, table_name, columns):16 raise NotImplementedError17 def get_columns(self, table_name, columns):18 raise NotImplementedError19 def select_row(self, table_name, _id, columns):20 raise NotImplementedError21 def select_rows(self, table_name, _ids, columns):22 raise NotImplementedError23 def insert_row(self, table_name, row):24 raise NotImplementedError25 def insert_rows(self, table_name, rows):26 raise NotImplementedError27 def update_row(self, table_name, row, update_op, update_columns):28 raise NotImplementedError29 def update_rows(self, table_name, rows, update_ops, update_columns):30 raise NotImplementedError31 def get_update_op(self, update_columns, operator):32 raise NotImplementedError33 def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):34 raise NotImplementedError35class SqliteConnection(BaseConnection):36 def __init__(self, db_path, chunksize):37 import sqlite338 super(SqliteConnection, self).__init__(db_path, chunksize)39 self._conn = sqlite3.connect(db_path)40 def create_table(self, table_name, columns, columns_types):41 create_table = "CREATE TABLE %s (%s);" % (table_name, ",".join(42 [' '.join(x) for x in zip(columns, columns_types)]))43 self._conn.execute(create_table)44 self._conn.commit()45 def get_columns(self, table_name, columns):46 select_table = "SELECT %s FROM %s;" % (",".join(columns), table_name)47 result = list(map(lambda x: OrderedDict(zip(columns, x)), self._conn.execute(select_table)))48 return result49 def select_row(self, table_name, _id, columns):50 select_table = "SELECT %s FROM %s WHERE _id=?;" % (",".join(columns), table_name)51 result = list(self._conn.execute(select_table, [_id]))52 if len(result) == 0:53 return None54 else:55 return OrderedDict(zip(columns, result[0]))56 def select_rows(self, table_name, _ids, columns):57 if len(_ids) > 0:58 row_cache = dict()59 result = []60 for idx in range(0, len(_ids), self.chunksize):61 select_table = "SELECT %s FROM %s WHERE _id IN ('%s');" % (62 ",".join(columns), table_name, "','".join(_ids[idx:idx+self.chunksize]))63 result.extend(list(self._conn.execute(select_table)))64 for x in result:65 exact_match_row = OrderedDict(zip(columns, x))66 row_cache[exact_match_row["_id"]] = exact_match_row67 exact_match_rows = []68 for _id in _ids:69 exact_match_rows.append(row_cache.get(_id, None))70 return exact_match_rows71 else:72 return []73 def insert_row(self, table_name, row):74 insert_table = "INSERT INTO %s VALUES (%s)" % (table_name, ",".join(['?'] * (len(row))))75 self._conn.execute(insert_table, list(row.values()))76 self._conn.commit()77 def insert_rows(self, table_name, rows):78 if len(rows) > 0:79 insert_table = "INSERT INTO %s VALUES (%s)" % (table_name, ",".join(['?'] * (len(next(iter(rows))))))80 self._conn.executemany(insert_table, [list(row.values()) for row in rows])81 self._conn.commit()82 def _update_update_op(self, row, update_op, update_columns):83 update_op_sp = update_op.split('?')84 while len(update_op_sp) >= 0 and update_op_sp[-1] == '':85 update_op_sp.pop()86 assert len(update_op_sp) == len(update_columns)87 new_update_op = []88 for i in range(len(update_op_sp)):89 new_update_op.append(update_op_sp[i])90 if isinstance(row[update_columns[i]], str):91 new_update_op.append("'" + row[update_columns[i]].replace("'", "''") + "'")92 else:93 new_update_op.append(str(row[update_columns[i]]))94 return ''.join(new_update_op)95 def update_row(self, table_name, row, update_op, update_columns):96 update_table = "UPDATE %s SET %s WHERE _id=?" % (table_name, update_op)97 self._conn.execute(update_table, [row[k] for k in update_columns] + [row["_id"]])98 self._conn.commit()99 def update_rows(self, table_name, rows, update_ops, update_columns):100 if len(rows) > 0:101 if isinstance(update_ops, (tuple, list)): # +-*/102 assert len(rows) == len(update_ops)103 # group rows by op to speed up104 update_op_collections = defaultdict(list) # key: _update_update_op105 for i, row in enumerate(rows):106 # self.update_row(row, table_name, update_ops[i], update_columns)107 new_update_op = self._update_update_op(row, update_ops[i], update_columns)108 update_op_collections[new_update_op].append(row)109 for new_update_op, op_rows in update_op_collections.items():110 _ids = [row["_id"] for row in op_rows]111 for idx in range(0, len(_ids), self.chunksize):112 update_table = "UPDATE %s SET %s WHERE _id IN ('%s');" % (113 table_name, new_update_op, "','".join(_ids[idx:idx+self.chunksize]))114 self._conn.execute(update_table)115 else: # =116 update_op = update_ops117 # group rows by new values to speed up118 value_collections = defaultdict(list) # key: values of new values119 for row in rows:120 # self.update_row(row, table_name, update_op, update_columns)121 value_collections[json.dumps([row[k] for k in update_columns])].append(row)122 for new_update_op, op_rows in value_collections.items():123 new_update_op = self._update_update_op(op_rows[0], update_op, update_columns)124 _ids = [row["_id"] for row in op_rows]125 for idx in range(0, len(_ids), self.chunksize):126 update_table = "UPDATE %s SET %s WHERE _id IN ('%s');" % (127 table_name, new_update_op, "','".join(_ids[idx:idx+self.chunksize]))128 self._conn.execute(update_table)129 self._conn.commit()130 """131 if isinstance(update_ops, list) or isinstance(update_ops, tuple):132 assert len(rows) == len(update_ops)133 for i, row in enumerate(rows):134 self.update_row(row, table_name, update_ops[i], update_columns)135 else:136 update_op = update_ops137 update_table = "UPDATE %s SET %s WHERE _id=?" % (138 table_name, update_op)139 self._conn.executemany(140 update_table, [[row[k] for k in update_columns] + [row["_id"]] for row in rows])141 self._conn.commit()142 """143 def get_update_op(self, update_columns, operator):144 if operator in "+-*/":145 update_ops = []146 for update_column in update_columns:147 update_ops.append(update_column + "=" + update_column + operator + "?")148 return ",".join(update_ops)149 elif operator == "=":150 update_ops = []151 for update_column in update_columns:152 update_ops.append(update_column + "=?")153 return ",".join(update_ops)154 else:155 raise NotImplementedError156 def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):157 key_match_events = []158 select_table = "SELECT %s FROM %s WHERE %s" % (159 ",".join(columns), table_name, " AND ".join(["%s=?" % (by) for by in bys]))160 if order_bys:161 select_table += " ORDER BY %s %s" % (",".join(order_bys), "DESC" if reverse else "ASC")162 if top_n:163 select_table += " LIMIT %d" % (top_n)164 select_table += ";"165 for x in self._conn.execute(select_table, keys):166 key_match_event = OrderedDict(zip(columns, x))167 key_match_events.append(key_match_event)168 return key_match_events169class MongoDBConnection(BaseConnection):170 def __init__(self, db_path, chunksize):171 import pymongo172 super(MongoDBConnection, self).__init__(db_path, chunksize)173 self._client = pymongo.MongoClient("mongodb://localhost:27017/", document_class=OrderedDict)174 self._conn = self._client[os.path.splitext(os.path.basename(db_path))[0]]175 def close(self):176 self._client.close()177 def create_table(self, table_name, columns, columns_types):178 self._conn[table_name]179 def __get_projection(self, columns):180 projection = {"_id": 0}181 for k in columns:182 projection[k] = 1183 return projection184 def get_columns(self, table_name, columns):185 projection = self.__get_projection(columns)186 results = list(self._conn[table_name].find({}, projection))187 return results188 def select_row(self, table_name, _id, columns):189 projection = self.__get_projection(columns)190 return self._conn[table_name].find_one({"_id": _id}, projection)191 def select_rows(self, table_name, _ids, columns):192 table = self._conn[table_name]193 exact_match_rows = []194 projection = self.__get_projection(columns)195 for idx in range(0, len(_ids), self.chunksize):196 query = {"_id": {'$in': _ids[idx:idx+self.chunksize]}}197 exact_match_rows.extend(table.find(query, projection))198 row_cache = {x["_id"]: x for x in exact_match_rows}199 exact_match_rows = [row_cache.get(_id, None) for _id in _ids]200 return exact_match_rows201 def insert_row(self, table_name, row):202 self._conn[table_name].insert_one(row)203 def insert_rows(self, table_name, rows):204 self._conn[table_name].insert_many(rows)205 def _update_update_op(self, row, update_op, update_columns):206 new_update_op = update_op.copy()207 for k, v in new_update_op.items():208 if k == "$inc":209 for update_column in update_columns:210 if v[update_column] == 1:211 v[update_column] = row[update_column]212 else:213 v[update_column] = -row[update_column]214 elif k == "$mul":215 for update_column in update_columns:216 if v[update_column] == 2:217 v[update_column] = row[update_column]218 else:219 v[update_column] = 1.0 / row[update_column]220 elif k == "$set":221 for update_column in update_columns:222 v[update_column] = row[update_column]223 return new_update_op224 def update_row(self, table_name, row, update_op, update_columns):225 self._conn[table_name].update_one(226 {"_id": row["_id"]}, self._update_update_op(row, update_op, update_columns))227 def update_rows(self, table_name, rows, update_ops, update_columns):228 if len(rows) > 0:229 if isinstance(update_ops, (tuple, list)): # +-*/230 assert len(rows) == len(update_ops)231 update_op_collections = defaultdict(list)232 for i, row in enumerate(rows):233 # self.update_row(row, table_name, update_ops[i], update_columns)234 new_update_op = self._update_update_op(row, update_ops[i], update_columns)235 update_op_collections[json.dumps(new_update_op)].append(row)236 for new_update_op, op_rows in update_op_collections.items():237 new_update_op = json.loads(new_update_op)238 _ids = [row["_id"] for row in op_rows]239 for idx in range(0, len(_ids), self.chunksize):240 query = {"_id": {'$in': _ids[idx:idx+self.chunksize]}}241 self._conn[table_name].update_many(query, new_update_op)242 else: # =243 update_op = update_ops244 value_collections = defaultdict(list)245 for row in rows:246 # self.update_row(row, table_name, update_op, update_columns)247 value_collections[json.dumps([row[k] for k in update_columns])].append(row)248 for new_update_op, op_rows in value_collections.items():249 new_update_op = self._update_update_op(op_rows[0], update_op, update_columns)250 _ids = [row["_id"] for row in op_rows]251 for idx in range(0, len(_ids), self.chunksize):252 query = {"_id": {'$in': _ids[idx:idx+self.chunksize]}}253 self._conn[table_name].update_many(query, new_update_op)254 def get_update_op(self, update_columns, operator):255 if operator == "+":256 update_ops = {}257 for update_column in update_columns:258 update_ops[update_column] = 1 # placeholder259 return {"$inc": update_ops}260 elif operator == "-":261 update_ops = {}262 for update_column in update_columns:263 update_ops[update_column] = -1 # placeholder264 return {"$inc": update_ops}265 elif operator == "*":266 update_ops = {}267 for update_column in update_columns:268 update_ops[update_column] = 2 # placeholder269 return {"$mul": update_ops}270 elif operator == "/":271 update_ops = {}272 for update_column in update_columns:273 update_ops[update_column] = 0.5 # placeholder274 return {"$mul": update_ops}275 elif operator == "=":276 update_ops = {}277 for update_column in update_columns:278 update_ops[update_column] = 1 # placeholder279 return {"$set": update_ops}280 else:281 raise NotImplementedError282 def get_rows_by_keys(self, table_name, bys, keys, columns, order_bys=None, reverse=False, top_n=None):283 query = OrderedDict(zip(bys, keys))284 projection = self.__get_projection(columns)285 cursor = self._conn[table_name].find(query, projection)286 if order_bys:287 direction = -1 if reverse else 1288 cursor = cursor.sort([(k, direction) for k in order_bys])289 if top_n:290 result = []291 for x in cursor:292 result.append(x)293 if len(result) >= top_n:294 break295 return result296 else:...
dnn_ncrt_liew_combined_latest.py
Source:dnn_ncrt_liew_combined_latest.py
...39 predicted = list(np.round(predicted))40 predicted = list(map(lambda row: list(map(int, row)), predicted))41 return predicted, classifier42def get_fingerprints(mol):43 morgan = update_columns(morgan_fingerprints(mol), 'morgan')44 maccs = update_columns(maccs_fingerprints(mol), 'maccs')45 charge_f, e = charge_features(mol)46 autocorrelation_f, e = autocorrelation_features(mol)47 harmonic_topology = pd.DataFrame(harmonic_topology_index_feature(mol))48 constitutional_f = constitutional_features(mol)49 estate_f = estate_features(mol)50 moe_f = moe_features(mol)51 bcut_f, e = bcut_features(mol)52 molproperty_f, e = molproperty_features(mol)53 cats2d_f = cats2d_features(mol)54 kappa = kappa_descriptors(mol)55 return {56 "maccs": maccs,57 "morgan": morgan,58 'charge': update_columns(charge_f, 'charge'),59 'harmonic_topology': update_columns(harmonic_topology, 'harmonic'),60 'autocorrelation': update_columns(autocorrelation_f, 'autocorrelation'),61 'constitutional': update_columns(constitutional_f, 'constitutional'),62 'estate': update_columns(estate_f, 'estate'),63 'moe': update_columns(moe_f, 'moe'),64 'bcut': update_columns(bcut_f, 'bcut'),65 'molproperty': update_columns(molproperty_f, 'molproperty'),66 'cats2d': update_columns(cats2d_f, 'cats2d'),67 'kappa': update_columns(kappa, 'kappa'),68 }69if __name__ == "__main__":70 ncrt_train = pd.read_csv('./data/raw/ncrt_liew_train.csv', index_col=0).fillna(0).reset_index(drop=True)71 ncrt_test = pd.read_csv('./data/raw/ncrt_liew_test.csv', index_col=0).fillna(0).reset_index(drop=True)72 common_comb = ['maccs', 'morgan', 'charge', 'harmonic_topology', 'autocorrelation', 'constitutional', 'estate',73 'moe', 'bcut', 'molproperty', 'cats2d', 'kappa']74 fingerprints = get_fingerprints(convert_to_mol(ncrt_train['smiles']))75 fingerprints_test = get_fingerprints(convert_to_mol(ncrt_test['smiles']))76 all_inputs = pd.concat(list(map(lambda x: fingerprints[x], common_comb)), axis=1).fillna(0)77 all_inputs_test = pd.concat(list(map(lambda x: fingerprints_test[x], common_comb)), axis=1).fillna(0)78 X = pd.concat([all_inputs, all_inputs_test]).reset_index(drop=True)79 y = pd.concat([ncrt_train['label'], ncrt_test['label']]).reset_index(drop=True)80 # Instantiate the cross validator81 skf = StratifiedKFold(n_splits=10, random_state=1)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!