Best Python code snippet using molecule_python
DataMigrator.py
Source:DataMigrator.py
1import mlflow2import psycopg23import json4import yaml5import os6import re7from tqdm import tqdm89from mlflow.entities import RunStatus10from mlflow.entities import SourceType111213def migrate_data(config_path="conf/db.json", root_log_dir="./mlruns", queries_file=None,14 init_tables=True, clean_all_tables=False):15 migrator = DataMigrator(config_path)16 migrator.write_insert_queries(root_log_dir, queries_file)17 migrator.send_queries(init_tables=init_tables, clean_all_tables=clean_all_tables)181920class DataMigrator:21 _TABLES = (22 "alembic_version",23 "experiment_tags",24 "tags",25 "latest_metrics",26 "metrics",27 "model_versions",28 "params",29 "registered_models",30 "runs",31 "experiments",32 )3334 def __init__(self, config_path="conf/db.json"):35 with open(config_path) as f:36 self._conf = json.load(f)37 self._queries_file = "queries"38 self._counter_value_on_last_commit = 03940 def write_insert_queries(self, root_log_dir="./mlruns", queries_file=None):41 if queries_file is not None:42 self._queries_file = queries_file4344 self._clear_file(self._queries_file)4546 print("The follow 'experiment/run' were processed:")47 with open(self._queries_file, "a") as sink:48 for experiment_id in os.listdir(root_log_dir):49 if experiment_id.startswith("."):50 continue5152 with open(f"{root_log_dir}/{experiment_id}/meta.yaml") as ef:53 experiment = yaml.load(ef, Loader=yaml.FullLoader)54 experiment_insert = self._get_experiment_insert(**experiment)55 sink.write(experiment_insert)5657 for run_uuid in os.listdir(f"{root_log_dir}/{experiment_id}"):58 if run_uuid == 'meta.yaml' or run_uuid.startswith("."):59 continue60 with open(f"{root_log_dir}/{experiment_id}/{run_uuid}/meta.yaml") as rf:61 run = yaml.load(rf, Loader=yaml.FullLoader)62 if not run:63 continue64 run_insert = self._get_run_insert(**run)65 sink.write(run_insert)66 print(f"{experiment_id}/{run_uuid}")6768 path_to_run_folder = f"{root_log_dir}/{experiment_id}/{run_uuid}"6970 self._write_run_metrics_inserts(path_to_run_folder, run_uuid, sink)71 self._write_run_params_inserts(path_to_run_folder, run_uuid, sink)72 self._write_run_tags_inserts(path_to_run_folder, run_uuid, sink)7374 def init_tables(self):75 dummy_experiment_name = "__DataMigrator_dummy_experiment"76 mlflow.set_tracking_uri(self._get_uri())77 mlflow.set_experiment(dummy_experiment_name)78 self._delete_record(table="experiments", field="name", field_value=f"'{dummy_experiment_name}'")7980 def clean_all_tables(self):81 self._clean_tables(tables=self._TABLES)8283 def get_db_cursor(cursor_foo):84 def wrapper(self, *args, **kwargs):85 with psycopg2.connect(86 dbname=self._conf['database'],87 user=self._conf['username'],88 password=self._conf['username'],89 host=self._conf['host'],90 port=self._conf['port'],91 ) as conn:92 with conn.cursor() as cursor:93 kwargs['conn'] = conn94 kwargs['cursor'] = cursor95 return cursor_foo(self, *args, **kwargs)9697 return wrapper9899 @get_db_cursor100 def send_queries(self, start_query_num=None, init_tables=True, clean_all_tables=False, **kwargs):101 """102103 :param clean_all_tables:104 :param init_tables:105 :param start_query_num: from 0106 :return:107 """108 conn = kwargs['conn']109 cursor = kwargs['cursor']110 if init_tables:111 self.init_tables()112 if clean_all_tables:113 self.clean_all_tables()114115 for _ in tqdm(self._queries_iterator(start_query_num, conn, cursor)):116 pass117118 def _queries_iterator(self, start_query_num, conn, cursor):119 if start_query_num is None:120 start_query_num = self._counter_value_on_last_commit121122 with open(self._queries_file, "r") as f:123 for i in range(start_query_num):124 query = f.readline()125 yield126 while query:127 try:128 cursor.execute(query)129 except Exception as e:130 print(f"The exception raised when querying: {query}")131 raise e132 if i % 999 == 0:133 conn.commit()134 self._counter_value_on_last_commit = i135 i += 1136 query = f.readline()137 yield138139 def _get_uri(self):140 prefix = self._conf['dialect']141 if self._conf['driver']:142 prefix = f"{prefix}+{self._conf['driver']}"143144 return f"{prefix}://{self._conf['username']}:{self._conf['password']}@{self._conf['host']}:{self._conf['port']}/{self._conf['database']}"145146 @get_db_cursor147 def _delete_record(self, table, field, field_value, **kwargs):148 cursor = kwargs['cursor']149 cursor.execute(f"DELETE FROM {table} WHERE {field}={field_value};")150151 @get_db_cursor152 def _clean_tables(self, tables, **kwargs):153 cursor = kwargs['cursor']154 for t in tables:155 cursor.execute(f"DELETE FROM {t};")156 mlflow.set_experiment()157158 @staticmethod159 def _clear_file(path):160 with open(path, 'w'):161 pass162163 @staticmethod164 def _get_experiment_insert(**experiment):165 query = f"""166 INSERT INTO experiments (167 experiment_id, 168 name, 169 artifact_location, 170 lifecycle_stage171 ) VALUES (172 {experiment['experiment_id']}, 173 '{experiment['name']}', 174 '{experiment['artifact_location']}', 175 '{experiment['lifecycle_stage']}'176 );177 """178 return re.sub(r'\s{2,}', "", query) + "\n"179180 @staticmethod181 def _get_run_insert(**run):182 if run['end_time'] is None:183 run['end_time'] = "NULL"184185 query = f"""186 INSERT INTO runs (187 run_uuid, 188 name, 189 source_type, 190 source_name, 191 entry_point_name, 192 user_id, 193 status, 194 start_time, 195 end_time, 196 source_version, 197 lifecycle_stage, 198 artifact_uri, 199 experiment_id200 ) VALUES ( 201 '{run['run_uuid']}', 202 '{run['name']}', 203 '{SourceType.to_string(run['source_type'])}', 204 '{run['source_name']}', 205 '{run['entry_point_name']}', 206 '{run['user_id']}', 207 '{RunStatus.to_string(run['status'])}', 208 {run['start_time']}, 209 {run['end_time']}, 210 '{run['source_version']}', 211 '{run['lifecycle_stage']}', 212 '{run['artifact_uri']}', 213 {run['experiment_id']}214 );215 """216 return re.sub(r'\s{2,}', "", query) + "\n"217218 @staticmethod219 def _get_metric_insert(is_nan=False, **metric):220 if metric['val'] == "nan":221 metric['val'] = -1.222 elif metric['val'] == "inf":223 metric['val'] = -9.224225 query = f"""226 INSERT INTO metrics (227 key, 228 value, 229 timestamp, 230 run_uuid,231 step,232 is_nan233 ) VALUES ( 234 '{metric['metric']}', 235 {metric['val']}, 236 {metric['timestamp']}, 237 '{metric['run_uuid']}',238 {metric['step']},239 {is_nan}240 );241 """242 return re.sub(r'\s{2,}', "", query) + "\n"243244 @staticmethod245 def _get_param_insert(**param):246 query = f"""247 INSERT INTO params (248 key, 249 value, 250 run_uuid251 ) VALUES ( 252 '{param['param']}', 253 '{param['val'].strip()}', 254 '{param['run_uuid']}' 255 );256 """257 return re.sub(r'\s{2,}', "", query) + "\n"258259 @staticmethod260 def _get_tag_insert(**tag):261 query = f"""262 INSERT INTO tags (263 key, 264 value, 265 run_uuid266 ) VALUES ( 267 '{tag['tag']}', 268 '{tag['val'].strip()}', 269 '{tag['run_uuid']}' 270 );271 """272 return re.sub(r'\s{2,}', "", query) + "\n"273274 def _write_run_tags_inserts(self, path_to_run_folder, run_uuid, sink_file):275 for tag in os.listdir(f"{path_to_run_folder}/tags"):276 with open(f"{path_to_run_folder}/tags/{tag}") as tagf:277 val = tagf.read().strip()278 if not val:279 continue280 tag_insert = self._get_tag_insert(281 tag=tag,282 val=val,283 run_uuid=run_uuid,284 )285 sink_file.write(tag_insert)286287 def _write_run_params_inserts(self, path_to_run_folder, run_uuid, sink_file):288 for param in os.listdir(f"{path_to_run_folder}/params"):289 with open(f"{path_to_run_folder}/params/{param}") as pf:290 val = pf.read().strip()291 if not val:292 continue293 param_insert = self._get_param_insert(294 param=param,295 val=pf.read(),296 run_uuid=run_uuid,297 )298 sink_file.write(param_insert)299300 def _write_run_metrics_inserts(self, path_to_run_folder, run_uuid, sink_file):301 for path, _, metrics in os.walk(f"{path_to_run_folder}/metrics"):302 for metric in metrics:303 with open(os.path.join(path, metric)) as mf:304 for line in mf.readlines():305 timestamp, val, step = line.split()306 metric_insert = self._get_metric_insert(307 metric=metric,308 val=val,309 timestamp=timestamp,310 run_uuid=run_uuid,311 step=step,312 )313 sink_file.write(metric_insert)314 # if metric_insert:315 # lastest_metruc_insert = metric_insert.replace(" metrics ", " latest_metrics ", 1)
...
remote_server.py
Source:remote_server.py
1from __future__ import print_function2import os3import shutil4import sys5import random6import tempfile7import mlflow8from mlflow import log_metric, log_param, log_artifacts, get_artifact_uri, active_run,\9 get_tracking_uri, log_artifact10if __name__ == "__main__":11 print("Running {} with tracking URI {}".format(sys.argv[0], get_tracking_uri()))12 log_param("param1", 5)13 log_metric("foo", 5)14 log_metric("foo", 6)15 log_metric("foo", 7)16 log_metric("random_int", random.randint(0, 100))17 run_uuid = active_run().info.run_uuid18 # Get run metadata & data from the tracking server19 service = mlflow.tracking.MlflowClient()20 run = service.get_run(run_uuid)21 print("Metadata & data for run with UUID %s: %s" % (run_uuid, run))22 local_dir = tempfile.mkdtemp()23 message = "test artifact written during run %s within artifact URI %s\n" \24 % (active_run().info.run_uuid, get_artifact_uri())25 try:26 file_path = os.path.join(local_dir, "some_output_file.txt")27 with open(file_path, "w") as handle:28 handle.write(message)29 log_artifacts(local_dir, "some_subdir")30 log_artifact(file_path, "another_dir")31 finally:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!