Best Python code snippet using molecule_python
iwCtf.py
Source:iwCtf.py
1#!/usr/bin/env python2import sys 3import os # system functions4import shutil5import argparse6import _utilities as util7import nibabel as nb8import pandas as pd9import numpy as np10def calc_affine(in_rotate, in_translate, in_origin):11 affine_ctf = np.zeros((4,4))12 affine_ctf[3,3] = 1 13 affine_ctf[0:3,0:3] = in_rotate14 affine_ctf[0:3,3] = in_translate15 # Calculate the Native to CTF matrix is based upon the fiducials. antsApplyTransformToPoints use the inverse transforms of 16 # that used by antsApplyTransform. For this reason, take the inverse so things are consistently inconsistent.17 affine_inverse_ctf = np.linalg.inv(affine_ctf)18 return affine_inverse_ctf19def calc_rotate_translate_origin( in_nas, in_lpa, in_rpa, in_scale=1 ):20 nas = np.array( [ in_nas[0], in_nas[1], in_nas[2] ] )21 rpa = np.array( [ in_rpa[0], in_rpa[1], in_rpa[2] ] )22 lpa = np.array( [ in_lpa[0], in_lpa[1], in_lpa[2] ] )23 origin_ctf = 0.5 * (lpa+rpa)24 x_ctf = norm( nas - origin_ctf)25 z_ctf = norm( np.cross( x_ctf, lpa-rpa))26 y_ctf = norm( np.cross( z_ctf, x_ctf))27 28 meg_to_mri = np.matrix( [ [ 0,1,0],[-1, 0,0], [0,0,1] ])29 30 rotate_ctf = in_scale * np.dot( meg_to_mri, np.matrix( [x_ctf, y_ctf, z_ctf ]).getT().getI())31 translate_ctf = -np.dot(rotate_ctf, origin_ctf)32 33 return [rotate_ctf, translate_ctf, origin_ctf ]34def norm(x):35 x_norm = np.linalg.norm(x)36 if not x_norm == 0:37 return x/x_norm38 else:39 print "Error: norm of vector is 0"40 quit()41def scale_points( in_points, scale, verbose_flag=False):42 43 out_points = in_points44 45 for ii in [ 0,1,2]:46 out_points[ out_points.columns[ii] ] = scale*in_points[ in_points.columns[ii] ]47 if verbose_flag:48 print49 print out_points50 print51 return out_points52def round_points( in_points, verbose_flag=False):53 54 out_points = in_points55 56 for ii in [ 0,1,2]:57 out_points[ out_points.columns[ii] ] = np.round( in_points[ in_points.columns[ii] ] )58 if verbose_flag:59 print60 print out_points61 print62 return out_points63 64def write_points( in_filename, in_pdframe, verbose_flag=False):65 print_points( in_filename, in_pdframe, verbose_flag)66 in_pdframe.to_csv(in_filename, index=False, float_format='%.3f')67def print_points_from_file( in_filename, verbose_flag=False):68 df_points = pd.read_csv(in_filename)69 print_points( in_filename, df_points, verbose_flag)70 return df_points71 72def print_points( in_filename, in_pdframe, verbose_flag=False):73 if verbose_flag:74 print 75 print in_filename76 print '----------------------------------------'77 print in_pdframe78 print79def icsa_to_wlps(in_filename, out_filename, transform, verbose_flag=False, debug_flag=False):80 _out1 = '00_iras_to_wlps__iras.csv'81 _out2 = '00_iras_to_wlps__wras.csv'82 if debug_flag:83 print "!!! Entering icsa_to_wlps"84 print in_filename85 print transform86 print _out187 print _out188 print out_filename89 print90 icsa_to_iras(in_filename, _out1, verbose_flag, debug_flag)91 iras_to_wlps(_out1, out_filename, transform, verbose_flag, debug_flag)92# wras_to_wlps(_out2, out_filename, verbose_flag, debug_flag)93 if not debug_flag:94 for ii in [ _out1 ]:95 os.remove(ii)96def wlps_to_icsa(in_filename, out_filename, transform, verbose_flag=False, debug_flag=False):97 _out1 = '00_wlps_to_icsa__wiras.csv'98 if debug_flag:99 print "!!! Entering wlps_to_icsa"100 print in_filename101 print transform102 print _out1103 print out_filename104 print105 wlps_to_iras(in_filename, _out1, transform, verbose_flag, debug_flag)106 iras_to_icsa(_out1, out_filename, verbose_flag)107 if not debug_flag:108 for ii in [ _out1 ]:109 os.remove(ii)110def icsa_to_iras(in_filename, out_filename, verbose_flag=False, debug_flag=False):111 112 if debug_flag:113 print "!!! Entering icsa_to_iras"114 print in_filename115 print out_filename116 print117 in_points = pd.read_csv(in_filename, names=['c','s','a','t','label', 'comment'], skiprows=[0])118 119 print_points( in_filename, in_points, verbose_flag)120 out_points = in_points.copy()121 out_points.columns = ['r','a','s','t','label','comment']122 out_points['r'] = 255-in_points['s']123 out_points['a'] = 255-in_points['c']124 out_points['s'] = 255-in_points['a']125 write_points( out_filename, out_points, verbose_flag)126 return out_points127def iras_to_icsa(in_filename, out_filename, verbose_flag=False):128 129 in_points = pd.read_csv(in_filename, names=['r','a','s','t','label', 'comment'], skiprows=[0])130 131 print_points( in_filename, in_points, verbose_flag)132 out_points = in_points.copy()133 out_points.columns = ['c','s','a','t','label','comment']134 out_points['s'] = 255-in_points['r']135 out_points['c'] = 255-in_points['a']136 out_points['a'] = 255-in_points['s']137 write_points( out_filename, out_points, verbose_flag)138 return out_points139def meg_to_mri(in_filename, out_filename, verbose_flag=False):140 141 in_points = pd.read_csv(in_filename, names=['x','y','z','t','label', 'comment'], skiprows=[0])142 143 print_points( in_filename, in_points, verbose_flag)144 out_points = in_points.copy()145 out_points.columns = ['l','p','s','t','label','comment']146 scale = 10 # cm to mm147 out_points['l'] = in_points['y'].apply(lambda x: scale*x)148 out_points['p'] = in_points['x'].apply(lambda x: -scale*x)149 out_points['s'] = in_points['z'].apply(lambda x: scale*x)150 write_points( out_filename, out_points, verbose_flag)151 return out_points152def mri_to_meg(in_filename, out_filename, verbose_flag=False):153 154 in_points = pd.read_csv(in_filename, names=['l','p','s','t','label', 'comment'], skiprows=[0])155 156 print_points( in_filename, in_points, verbose_flag)157 out_points = in_points.copy()158 out_points.columns = ['x','y','z','t','label','comment']159 scalar = 0.10 # mm to cm160 out_points['y'] = in_points['l'].apply(lambda x: scalar*x)161 out_points['x'] = in_points['p'].apply(lambda x: -scalar*x)162 out_points['z'] = in_points['s'].apply(lambda x: scalar*x)163 write_points( out_filename, out_points, verbose_flag)164 return out_points165def wlps_to_wctf(in_filename, out_filename, transform, verbose_flag=False, debug_flag=False):166 apply_affine_transform(in_filename, out_filename, transform, True, verbose_flag )167def wctf_to_wlps(in_filename, out_filename, transform, verbose_flag=False, debug_flag=False):168 apply_affine_transform(in_filename, out_filename, transform, False, verbose_flag )169def iras_to_wlps(in_filename, out_filename, transform, verbose_flag=False, debug_flag=False):170 apply_affine_transform(in_filename, out_filename, transform, False, verbose_flag )171def wlps_to_iras(in_filename, out_filename, transform, verbose_flag=False, debug_flag=False):172 print_points_from_file(in_filename, verbose_flag)173 _pd1 = apply_affine_transform(in_filename, out_filename, transform, True, False )174 write_points(out_filename, round_points(_pd1), verbose_flag)175def apply_affine_transform(in_filename, out_filename, transform, inverse_flag = False, verbose_flag=False):176 177 if isinstance(transform, basestring):178 transform = [transform]179 in_points = pd.read_csv(in_filename, names=['x','y','z','t','label', 'comment'], skiprows=[0])180 181 print_points( in_filename, in_points, verbose_flag)182 cmd1 =[ 'antsApplyTransformsToPoints', '-d', '3', '-i', in_filename, '-o', out_filename ]183 if inverse_flag:184 cmd2 = [ "-t", "[" ] + transform + [ ",", "1", "]" ]185 else:186 cmd2 = [ "-t"] + transform187 util.iw_subprocess( cmd1 + cmd2 , False, False)188 189 out_points = pd.read_csv(out_filename, names=['x','y','z','t','label', 'comment'], skiprows=[0])190 out_points['label'] = in_points['label']191 out_points['comment'] = in_points['comment']192 write_points(out_filename, out_points, verbose_flag)193 return out_points194#195#196#197def sort_fiducials(in_fiducials):198 return out_fiducials199def check_fiducials(df_fiducials):200 lpa = np.asarray(df_fiducials.values[0,0:3])201 nas = np.asarray(df_fiducials.values[1,0:3])202 rpa = np.asarray(df_fiducials.values[2,0:3])203 204 if not ( (lpa[0] > nas[0]) and (nas[0] > rpa[0]) ):205 print206 print 'Fiducials must be listed left to right in OUT fiducial file'207 print208 print df_fiducials.values209 print210 sys.exit()211 212 return [ lpa, nas, rpa ]213def calc_matrix( in_fiducials, out_matrix, ctf_scale=1, verbose_flag=False):214 df_fiducials = pd.read_csv( in_fiducials, sep=',',header=0)215 if verbose_flag:216 print_points( in_fiducials, df_fiducials, verbose_flag)217 [ lpa, nas, rpa ] = check_fiducials( df_fiducials ) 218 219 [ rotate_ctf, translate_ctf, origin_ctf ] = calc_rotate_translate_origin( nas, lpa, rpa, ctf_scale)220 print rotate_ctf221 222 affine_wlps_to_wctf = calc_affine( rotate_ctf, translate_ctf, 0*origin_ctf)223 224 util.write_itk_affine_matrix( affine_wlps_to_wctf, [0,0,0], out_matrix, verbose_flag )225 return affine_wlps_to_wctf226def transform_points(in_filename, out_filename, in_transforms, scale, verbose_flag, debug_flag):227 if type(in_filename) is list:228 filename = in_filename229 else:230 filename = [ in_filename ]231 if type(in_transforms) is list:232 transforms = in_transforms233 else:234 transforms = [ in_transforms ]235 input_files = filename + transforms 236 if debug_flag:237 print238 print '!!! ctf.transform_points '239 print input_files240 print241 util.verify_inputs( filename )242 in_points = print_points_from_file(in_filename, verbose_flag)243 cmd = [ "antsApplyTransformsToPoints", "-d", "3", "-i", in_filename, "-o", out_filename , "-t"] + transforms244 util.iw_subprocess(cmd, debug_flag, debug_flag)245 # Perform scaling246 out_points = pd.read_csv(out_filename, sep=',',header=0)247 out_points = scale_points( out_points, scale, debug_flag )248 249 # Fix Comment Column250 out_points['label'] = in_points['label']251 out_points['comment'] = in_points['comment']252 #253 write_points( out_filename, out_points, verbose_flag)254def transform_image(in_filename, out_filename, reference_filename, in_transforms, interpolation_method, verbose_flag, debug_flag):255 if type(in_filename) is list:256 filename = in_filename257 else:258 filename = [ in_filename ]259 if type(in_transforms) is list:260 transforms = in_transforms261 else:262 transforms = [ in_transforms ]263 input_files = filename + reference_filename + transforms 264 if debug_flag:265 print266 print '!!! ctf.transform_image '267 print input_files268 print269 util.verify_inputs( filename )270 cmd = [ "antsApplyTransforms", "-d", "3", "-i", in_filename[0], "-o", out_filename[0], "-r", reference_filename[0], 271 "-n", interpolation_method,"-t"] + transforms272 util.iw_subprocess(cmd, debug_flag, debug_flag)273def extract_affine(in_image, out_affine_filename, lps_flag=False, verbose_flag=False):274 img = nb.load(in_image)275 header = img.get_header()276 # Save transform277 affine_wras_to_wlps = util.wras_to_wlps_matrix();278 affine_iras_to_wras = np.asarray(img.get_affine())279 if lps_flag:280 out_affine = np.dot(affine_wras_to_wlps, affine_iras_to_wras)281 else:282 out_affine = affine_iras_to_wras283 ...
create_toplogy.py
Source:create_toplogy.py
1# -*- coding: utf-8 -*-2"""3 create_topology.py4 Script to process raw node and edge data.5 Workflow:6 - Merge Multilinestrings from power line data [Complete]7 - Add junction nodes where lines split [Complete]8 - Add sink nodes to low voltage [Complete]9 - Connect supply to substations [Complete]10 - Connect high voltage grid to low voltage grid [Complete]11 - Create bi-directional grid [Complete]12 - Save processed spatial data [Complete]13"""14#=======================15# Modules16import warnings17warnings.simplefilter(action='ignore', category=FutureWarning)18import pandas as pd19import geopandas as gpd20from shapely.geometry import Point21from shapely.wkt import loads22import re23from tqdm import tqdm24tqdm.pandas()25# Add local directory to path26import sys27sys.path.append("../../")28# Import infrasim spatial tools29from JEM.jem.spatial import get_isolated_graphs30from JEM.jem.utils import get_nodal_edges31# Import local copy of snkit32from JEM.snkit.snkit.src.snkit.network import *33# Import local functions34from utils import *35from merge_cost_data import *36from electricity_demand_assignment import *37from merge_elec_consumption_data import *38#=======================39# GLOBAL PARAMS40verbose_flag=True41remove_connected_components = True42connected_component_tolerance = 143#=======================44# PROCESSING45# read data46network = read_data()47verbose_print('loaded data',flag=verbose_flag)48# remove known bugs49if 'bug' in network.edges.columns:50 network.edges = network.edges[network.edges.bug != 'true'].reset_index(drop=True)51verbose_print('removed known bugs',flag=verbose_flag)52# merge multilinestrings53network = remove_multiline(network)54verbose_print('removed multilines',flag=verbose_flag)55# delete NoneType56network = remove_nontype(network)57verbose_print('removed NonType',flag=verbose_flag)58# explode multipart linestrings59network = explode_multipart(network)60verbose_print('explode multipart linestrings',flag=verbose_flag)61# save raw data from jps62jps_nodes = network.nodes.copy()63jps_edges = network.edges.copy()64# Merge edges65network = add_endpoints(network)66verbose_print('added end points',flag=verbose_flag)67# add ids68network = add_ids(network)69verbose_print('added IDs',flag=verbose_flag)70# add topology71network = add_topology(network, id_col='id')72verbose_print('added topology',flag=verbose_flag)73# merge using snkit74# network = merge_edges(network,by='asset_type')75verbose_print('merged edges',flag=verbose_flag)76# remove multilines again...77network = remove_multiline(network)78#===79# SNAP LV LINES TO SUBSTATIONS80verbose_print('snapping lines to substations...',flag=verbose_flag)81# LV82lv_voltages = ['24 kV', '12 kV']83# get substations84substations = network.nodes[network.nodes.subtype == 'substation'].geometry85# loop86for s in substations:87 # index edges88 idx_edges = edges_within(s,89 network.edges[network.edges.voltage.isin(lv_voltages)],90 distance=40)91 # snap92 for e in idx_edges.itertuples():93 # get current coords of edge94 e_coords = list(e.geometry.coords)95 # get coords of point96 s_coords = list(s.coords)97 # modify first coord of edge to be coord of point (i.e. snap)98 e_coords[0] = s_coords[0]99 # update in edge data100 network.edges.loc[network.edges.index == e.Index, 'geometry'] = LineString(e_coords)101verbose_print('done',flag=verbose_flag)102#===103# ADD JUNCTIONS AND SINKS104verbose_print('adding junctions and sinks...',flag=verbose_flag)105# add endpoints106network = add_endpoints(network)107# update asset_type108network.nodes.loc[~network.nodes.subtype.isin(['sink','junction','sink']),'subtype'] = 'pole'109network.nodes.loc[~network.nodes.asset_type.isin(['sink','junction','sink']),'asset_type'] = 'junction'110# split edges between nodes111network = split_edges_at_nodes(network)112# add ids113network = update_notation(network)114## network.edges.drop(['id','from_id','to_id'],axis=1)115## network = add_id_to_nodes(network)116## network = add_edge_notation(network)117# find true sink nodes118sinks = list(network.edges.to_id.unique())119starts = list(network.edges.from_id.unique())120true_sinks = []121for s in sinks:122 if s in starts:123 continue124 else:125 true_sinks.append(s)126# update true sinks127network.nodes.loc[network.nodes.id.isin(true_sinks),'asset_type'] = 'sink'128network.nodes.loc[network.nodes.id.isin(true_sinks),'subtype'] = 'demand'129# remap asset_type and asset_type from original data130for n in jps_nodes.title:131 network.nodes.loc[network.nodes.title == n, 'asset_type'] = jps_nodes.loc[jps_nodes.title == n].asset_type.iloc[0]132 network.nodes.loc[network.nodes.title == n, 'subtype'] = jps_nodes.loc[jps_nodes.title == n].subtype.iloc[0]133verbose_print('done',flag=verbose_flag)134#===135# CONVERT FALSE JUNCTIONS TO SINKS136verbose_print('converting false junctions...',flag=verbose_flag)137nodes_to_test = network.nodes[network.nodes.subtype.isin(['pole'])].reset_index(drop=True)138for n in nodes_to_test.id:139#for n in ['node_1694']:140 degree = node_connectivity_degree(node=n, network=network)141 if degree == 1:142 # change node asset_type143 network.nodes.loc[network.nodes.id == n, 'asset_type'] = 'sink'144 network.nodes.loc[network.nodes.id == n, 'subtype'] = 'demand'145 # reverse arc direction146 prev_line = network.edges[network.edges.from_id == n].geometry.values[0]147 network.edges.loc[network.edges.from_id == n, 'geometry'] = flip(prev_line)148verbose_print('done',flag=verbose_flag)149#===150# CLEANING/FORMATTING151# add length to line data152network = add_edge_length(network)153verbose_print('added line lengths',flag=verbose_flag)154# remove duplicated155network = remove_duplicates(network)156verbose_print('removed duplicates',flag=verbose_flag)157# change voltage column format158network.edges['voltage_kV'] = network.edges.voltage.str.replace('kV','').astype('int')159# add max/min160network = add_limits_to_edges(network)161verbose_print('added limits to edge flows',flag=verbose_flag)162# double-up edges163network = bidirectional_edges(network)164verbose_print('made edges bidirectional',flag=verbose_flag)165# remove sink-to-sink connections166network = remove_sink_to_sink(network)167verbose_print('removed sink to sinks',flag=verbose_flag)168# add node degree169network = add_nodal_degree(network)170verbose_print('added nodal degrees',flag=verbose_flag)171# drop zero degree sinks172network = remove_stranded_nodes(network)173verbose_print('removed stranded nodes',flag=verbose_flag)174# remove self-loops175network = remove_self_loops(network)176verbose_print('removed self-loops',flag=verbose_flag)177# change asset_type of sinks with >2 degree connectivity178network.nodes.loc[(network.nodes.degree > 2) & \179 (network.nodes.asset_type == 'sink'), 'asset_type'] = 'junction'180verbose_print('converted sinks of degree>0 to junctions',flag=verbose_flag)181#===182# ADD COST DATA183verbose_print('merging cost data...',flag=verbose_flag)184network = merge_cost_data(network,185 path_to_costs='../data/costs_and_damages/maximum_damage_values.csv',186 print_to_console=False)187verbose_print('done',flag=verbose_flag)188#===189# GET CONNECTED COMPONENTS190verbose_print('getting connected components...',flag=verbose_flag)191network = add_component_ids(network)192# remove193if not remove_connected_components:194 pass195else:196 graphs_to_remove = network.edges.loc[network.edges.component_id > connected_component_tolerance]197 nodes_to_remove = graphs_to_remove.from_id.to_list() + graphs_to_remove.to_id.to_list()198 edges_to_remove = graphs_to_remove.id.to_list()199 # drop200 network.nodes = network.nodes.loc[~network.nodes.id.isin(nodes_to_remove)].reset_index(drop=True)201 network.edges = network.edges.loc[~network.edges.id.isin(edges_to_remove)].reset_index(drop=True)202# Update network notation203network = update_notation(network)204verbose_print('done',flag=verbose_flag)205#===206# ADD CAPACITY ATTRIBUTES207verbose_print('adding capacity attributes to nodes...',flag=verbose_flag)208def nodal_capacity_from_edges(node,network):209 nodal_edges = get_nodal_edges(network,node).id.to_list()210 return network.edges.loc[network.edges.id.isin(nodal_edges)]['max'].max()211network.nodes['capacity'] \212 = network.nodes.progress_apply(213 lambda x: nodal_capacity_from_edges(x['id'],network) \214 if pd.isnull(x['capacity']) else x['capacity'], axis=1 )215verbose_print('done',flag=verbose_flag)216# #===217# # MAP ELEC ASSETS TO WATER ASSETS218# verbose_print('mapping water assets...',flag=verbose_flag)219# water_nodes = gpd.read_file('../data/water/merged_water_assets.shp')220# map_elec_and_water_assets(network.nodes,water_nodes)221# verbose_print('done',flag=verbose_flag)222#===223# ADD TOTAL COSTS224# edges225network.edges['cost_min'] = network.edges['uc_min'] * network.edges['max'] * network.edges['length']226network.edges['cost_max'] = network.edges['uc_max'] * network.edges['max'] * network.edges['length']227network.edges['cost_avg'] = network.edges['uc_avg'] * network.edges['max'] * network.edges['length']228network.edges['cost_uom'] = '$US'229# nodes230network.nodes['cost_min'] = network.nodes['uc_min'] * network.nodes['capacity']231network.nodes['cost_max'] = network.nodes['uc_max'] * network.nodes['capacity']232network.nodes['cost_avg'] = network.nodes['uc_avg'] * network.nodes['capacity']233network.nodes['cost_uom'] = '$US'234#===235# ADD PARISH236verbose_print('adding parish to nodes...',flag=verbose_flag)237parish_boundaries = gpd.read_file('../data/spatial/else/admin-boundaries.shp')238parish_boundaries = parish_boundaries[['Parish','geometry']]239network.nodes['parish'] = network.nodes.sjoin(parish_boundaries, \240 predicate='within').drop('index_right',axis=1)['Parish']241verbose_print('done',flag=verbose_flag)242#===243# REINDEX244network.edges = network.edges[['id', 'asset_type', 'from_id', 'to_id', 'from_type', 'to_type',245 'voltage_kV', 'losses', 'length', 'min', 'max', 246 'uc_min','uc_max', 'uc_avg','uc_uom',247 'cost_min','cost_max', 'cost_avg','cost_uom',248 'name', 'parish','source', 'component_id', 'geometry']]249network.nodes = network.nodes[['id','asset_type','subtype','capacity',#'population','ei', 'ei_uom',250 'uc_min','uc_max','uc_avg','uc_uom',251 'cost_min','cost_max', 'cost_avg','cost_uom',252 'degree','parish','title','source','geometry']]253verbose_print('re-indexed data',flag=verbose_flag)254#===255# SAVE DATA256verbose_print('saving...',flag=verbose_flag)257save_data(network)258verbose_print('create_toplogy finished',flag=verbose_flag)259# #===260# # ADD POPULATION261# verbose_print('adding population...',flag=verbose_flag)262# network = assign_pop_to_sinks(network)263# verbose_print('done',flag=verbose_flag)264# #===265# # APPEND ELECTRICITY INTENSITIES266# network = append_electricity_intensities(network)...
test_cli_output.py
Source:test_cli_output.py
1import pytest2def test_cli_product_subcommand(index_empty, clirunner, dataset_add_configs):3 runner = clirunner(['product', 'update'], verbose_flag=False, expect_success=False)4 assert "Usage: [OPTIONS] [FILES]" in runner.output5 assert "Update existing products." in runner.output6 assert runner.exit_code == 17 runner = clirunner(['product', 'update', dataset_add_configs.empty_file], verbose_flag=False, expect_success=False)8 assert "All files are empty, exit" in runner.output9 assert runner.exit_code == 110 runner = clirunner(['product', 'add'], verbose_flag=False, expect_success=False)11 assert "Usage: [OPTIONS] [FILES]" in runner.output12 assert "Add or update products in" in runner.output13 assert runner.exit_code == 114 runner = clirunner(['product', 'add', dataset_add_configs.empty_file], verbose_flag=False, expect_success=False)15 assert "All files are empty, exit" in runner.output16 assert runner.exit_code == 117def test_cli_metadata_subcommand(index_empty, clirunner, dataset_add_configs):18 runner = clirunner(['metadata', 'update'], verbose_flag=False, expect_success=False)19 assert "Usage: [OPTIONS] [FILES]" in runner.output20 assert "Update existing metadata types." in runner.output21 assert runner.exit_code == 122 runner = clirunner(['metadata', 'update', dataset_add_configs.empty_file], verbose_flag=False, expect_success=False)23 assert "All files are empty, exit" in runner.output24 assert runner.exit_code == 125 runner = clirunner(['metadata', 'add'], verbose_flag=False, expect_success=False)26 assert "Usage: [OPTIONS] [FILES]" in runner.output27 assert "Add or update metadata types in" in runner.output28 assert runner.exit_code == 129 runner = clirunner(['metadata', 'add', dataset_add_configs.empty_file], verbose_flag=False, expect_success=False)30 assert "All files are empty, exit" in runner.output31 assert runner.exit_code == 132def test_cli_dataset_subcommand(index_empty, clirunner, dataset_add_configs):33 clirunner(['metadata', 'add', dataset_add_configs.metadata])34 clirunner(['product', 'add', dataset_add_configs.products])35 runner = clirunner(['dataset', 'add'], verbose_flag=False, expect_success=False)36 assert "Indexing datasets [####################################] 100%" not in runner.output37 assert "Usage: [OPTIONS] [DATASET_PATHS]" in runner.output38 assert "Add datasets" in runner.output39 assert runner.exit_code == 140 runner = clirunner(['dataset', 'update'], verbose_flag=False, expect_success=False)41 assert "0 successful, 0 failed" not in runner.output42 assert "Usage: [OPTIONS] [DATASET_PATHS]" in runner.output43 assert "Update datasets" in runner.output44 assert runner.exit_code == 145 runner = clirunner(['dataset', 'info'], verbose_flag=False, expect_success=False)46 assert "Usage: [OPTIONS] [IDS]" in runner.output47 assert "Display dataset information" in runner.output48 assert runner.exit_code == 149 runner = clirunner(['dataset', 'uri-search'], verbose_flag=False, expect_success=False)50 assert "Usage: [OPTIONS] [PATHS]" in runner.output51 assert "Search by dataset locations" in runner.output52 assert runner.exit_code == 153 if index_empty.supports_legacy:54 clirunner(['dataset', 'add', dataset_add_configs.datasets])55 else:56 # Does not support legacy datasets57 with pytest.raises(ValueError):58 # Expect to fail with legacy datasets59 clirunner(['dataset', 'add', dataset_add_configs.datasets])60 # Use EO3 datasets to allow subsequent tests to run.61 clirunner(['dataset', 'add', dataset_add_configs.datasets_eo3])62 runner = clirunner(['dataset', 'archive'], verbose_flag=False, expect_success=False)63 assert "Completed dataset archival." not in runner.output64 assert "Usage: [OPTIONS] [IDS]" in runner.output65 assert "Archive datasets" in runner.output66 assert runner.exit_code == 167 runner = clirunner(['dataset', 'archive', "--all"], verbose_flag=False)68 assert "Completed dataset archival." in runner.output69 assert "Usage: [OPTIONS] [IDS]" not in runner.output70 assert "Archive datasets" not in runner.output71 assert runner.exit_code == 072 runner = clirunner(['dataset', 'restore'], verbose_flag=False, expect_success=False)73 assert "Usage: [OPTIONS] [IDS]" in runner.output74 assert "Restore datasets" in runner.output75 assert runner.exit_code == 176 runner = clirunner(['dataset', 'restore', "--all"], verbose_flag=False)77 assert "restoring" in runner.output78 assert "Usage: [OPTIONS] [IDS]" not in runner.output79 assert "Restore datasets" not in runner.output80 assert runner.exit_code == 081 runner = clirunner(['dataset', 'purge'], verbose_flag=False, expect_success=False)82 assert "Completed dataset purge." not in runner.output83 assert "Usage: [OPTIONS] [IDS]" in runner.output84 assert "Purge archived datasets" in runner.output85 assert runner.exit_code == 186 runner = clirunner(['dataset', 'purge', "--all"], verbose_flag=False)87 assert "Completed dataset purge." in runner.output88 assert "Usage: [OPTIONS] [IDS]" not in runner.output89 assert runner.exit_code == 090def test_readd_and_update_metadata_product_dataset_command(index_empty, clirunner, dataset_add_configs):91 clirunner(['metadata', 'add', dataset_add_configs.metadata])92 rerun_add = clirunner(['metadata', 'add', dataset_add_configs.metadata])93 assert "WARNING Metadata Type" in rerun_add.output94 assert "is already in the database" in rerun_add.output95 update = clirunner(['metadata', 'update', dataset_add_configs.metadata])96 assert "WARNING No changes detected for metadata type" in update.output97 add = clirunner(['product', 'add', dataset_add_configs.products])98 rerun_add = clirunner(['product', 'add', dataset_add_configs.products])99 assert "WARNING Product" in rerun_add.output100 assert "is already in the database" in rerun_add.output101 update = clirunner(['product', 'update', dataset_add_configs.products])102 assert "WARNING No changes detected for product" in update.output103 clirunner(['dataset', 'add', dataset_add_configs.datasets_eo3])104 rerun_add = clirunner(['dataset', 'add', dataset_add_configs.datasets_eo3])105 assert "WARNING Dataset" in rerun_add.output106 assert "is already in the database" in rerun_add.output107 update = clirunner(['dataset', 'update', dataset_add_configs.datasets_eo3])...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!