Best Python code snippet using localstack_python
netraz_commands.py
Source:netraz_commands.py
...51 pass52 netraz_output("o", "Created NIP:")53 jsonprint(stdoutjson)54 return stdoutjson55def delete_network_insights_path(args, recursive=False):56 if cli_is_not_authenticated():57 return58 elif (len(args) < 1):59 netraz_output("h", "netraz delete-nip [path_id] [path_id] ...")60 raise Exception("No arguments prescribed")61 else:62 # if recursive = true then remove all the child analyses from the args list63 if recursive:64 # get all child nia's for each args(path_id)65 aws_cli_command = construct_aws_cli_command("ec2", "describe-network-insights-analyses")66 stdout, stderr = execute_aws_cli(aws_cli_command)67 all_nias_list = stdout_to_json(stdout) 68 # get list of nia's that are blocking the deleting of args paths69 ntd = [nia for nia in all_nias_list["NetworkInsightsAnalyses"] if nia["NetworkInsightsPathId"] in args]70 ntd_id = [nia["NetworkInsightsAnalysisId"] for nia in ntd]71 # then delete_network_insights_analysis(nias_to_dele)72 delete_network_insights_analysis(ntd_id)73 params, values = [], []74 for path_id in args:75 values.append(path_id)76 params.append("network-insights-path-id")77 aws_cli_command = construct_aws_cli_command("ec2", "delete-network-insights-path", params, values)78 stdout, stderr = execute_aws_cli(aws_cli_command)79 if "AnalysisExistsForNetworkInsightsPath" in stderr:80 raise Exception(stderr)81 else:82 # print what you have just done83 print(stdout)84 return85def delete_network_insights_analysis(args):86 if cli_is_not_authenticated():87 return88 elif (len(args) < 1):89 netraz_output("e", "No arguments prescribed")90 netraz_output("h", "netraz delete-nia [analysis_id] [analysis_id] ...")91 else:92 params, values = [], []93 for nia_id in args:94 values.append(nia_id)95 params.append("network-insights-analysis-id")96 aws_cli_command = construct_aws_cli_command("ec2", "delete-network-insights-analysis", params, values)97 stdout, stderr = execute_aws_cli(aws_cli_command)98 if "AnalysisExistsForNetworkInsightsPath" in stderr:99 netraz_output("e", stderr)100 raise Exception()101 else:102 # print what you have just done103 print(stdout)104 return 0105def start_network_insights_analysis(args):106 if cli_is_not_authenticated():107 return108 elif (len(args) != 1):109 netraz_output("e", "Only one argument allowed: nip-id")110 netraz_output("h", "netraz start-nia [path_id]")111 else:112 # one arg is network insights path id113 nip_id = args[0]114 115 params, values = [], []116 values.append(nip_id)117 params.append("network-insights-path-id")118 aws_cli_command = construct_aws_cli_command("ec2", "start-network-insights-analysis", params, values)119 stdout, stderr = execute_aws_cli(aws_cli_command)120 if "InvalidParameterValue" in stderr:121 netraz_output("e", stderr)122 raise Exception()123 else:124 # print what you have just done125 stdoutjson = stdout_to_json(stdout)["NetworkInsightsAnalysis"]126 for k in ["NetworkInsightsAnalysisArn", "Status"]:127 del stdoutjson[k]128 netraz_output("o", "Started NIA:")129 jsonprint(stdoutjson)130 return stdoutjson131def get_network_insights_paths(args):132 if cli_is_not_authenticated():133 return134 elif (len(args) >= 0):135 params, values = [], []136 if (len(args) == 0):137 output_message = "Getting all NIPs"138 print_title = "All available"139 elif (len(args) > 0):140 output_message = "Getting specific NIPs"141 print_title = "Requested"142 params.append("network-insights-path-ids")143 values.append(' '.join(args))144 netraz_output("o", output_message)145 aws_cli_command = construct_aws_cli_command("ec2", "describe-network-insights-paths", params, values)146 stdout, stderr = execute_aws_cli(aws_cli_command)147 148 # Convert output to json149 nips_list = stdout_to_json(stdout)150 # Construct info array151 info, headers = construct_array_nip(nips_list)152 if (len(args) == 1):153 # print single json154 jsonprint(nips_list["NetworkInsightsPaths"][0])155 else:156 # print info157 netraz_output("o", print_title + " network insights paths:")158 print_netraz_table(info, headers)159 return160def get_network_insights_analyses(args):161 if cli_is_not_authenticated():162 return163 elif (len(args) >= 0):164 if (len(args) == 0):165 output_message = "Getting all NIAs"166 print_title = "All available"167 elif (len(args) > 0):168 output_message = "Getting specific NIAs"169 print_title = "Requested"170 netraz_output("o", output_message)171 aws_cli_command = construct_aws_cli_command("ec2", "describe-network-insights-analyses")172 stdout, stderr = execute_aws_cli(aws_cli_command)173 174 # Convert output to json175 nias_list = stdout_to_json(stdout)176 # Construct info array177 info, headers = construct_array_nia(nias_list)178 if (len(args) == 1):179 # print json and explanations180 if args[0][0:5] != "nia-0":181 netraz_output("e", "Single argument get-nias is only permitted to be an nia_id")182 raise Exception("Parameter Error")183 else:184 explanations = nias_list["NetworkInsightsAnalyses"][0]["Explanations"]185 nias_list["NetworkInsightsAnalyses"][0].pop("Explanations",None)186 netraz_output("o", print_title + " network insights analysis:")187 jsonprint(nias_list["NetworkInsightsAnalyses"][0])188 # Construct info array189 headers = ['ExplanationCode', 'Direction', 'Subnet_Id', 'Vpc_Id', 'Acl_Id', 'NetworkInterface_Id', 'SecurityGroups_Ids']190 info = []191 for exp in explanations:192 row = []193 for column in headers:194 try:195 if "_" in column:196 key, subkey = column.split("_")197 if key == "SecurityGroups":198 subkey = subkey[:-1]199 sgids = []200 for sg in exp[key]:201 sgids.append(sg[subkey])202 row.append('\n'.join(sgids))203 else:204 row.append(exp[key][subkey])205 else:206 row.append(exp[column])207 except:208 row.append("-")209 info.append(row)210 netraz_output("o", print_title + " network insights analysis explanations:")211 print_netraz_table(info, headers)212 else:213 for idx, nia_row in enumerate(info):214 # if no match then delete from info215 non_empty_args = args != []216 row_nip_not_in_args = nia_row[0] not in args217 row_nia_not_in_args = nia_row[1] not in args218 if (non_empty_args and row_nip_not_in_args and row_nia_not_in_args):219 # this entry is not interesting220 info.pop(idx)221 # print info as table (no explanations)222 netraz_output("o", print_title + " network insights analyses:")223 print_netraz_table(info, headers)224 return225 226def variable_file_support(args):227 if cli_is_not_authenticated():228 return229 else:230 # unpack231 vars_name = args[0]232 try:233 action = args[1] # may throw index error234 action = file_support_actions.index(args[1]) # may throw value error235 action = file_support_actions[action] # success236 except ValueError:237 raise Exception("Parameter error: unknown action \"" + action + "\"")238 except IndexError:239 raise Exception("Parameter error: no action given")240 241 sv, cv = check_file_support_dependencies(vars_name)242 # All good to go ahead with the file243 netraz_output("o", "Processing file: "+vars_name)244 # load json files245 try:246 # vars file247 with open(sv) as f:248 var = load(f)249 # common file250 with open(cv) as f:251 c_var = load(f)252 except JSONDecodeError:253 raise Exception("Vars specification file syntax error")254 except Exception as e:255 print(type(e))256 # validate vars file257 var, c_var = validate_and_process_vars(var, c_var)258 259 # 1 check if the nips exist already260 # get all nips261 aws_cli_command = construct_aws_cli_command("ec2", "describe-network-insights-paths")262 stdout, stderr = execute_aws_cli(aws_cli_command)263 # Convert output to json264 all_nips_list = stdout_to_json(stdout)265 info, headers = construct_array_nip(all_nips_list)266 #print_netraz_table(info, headers)267 # get existing nip names268 existing_nip_names = [[t["Value"] for t in nip_json["Tags"] if t["Key"]=="Name"][0] for nip_json in all_nips_list["NetworkInsightsPaths"]]269 existing_nips = [nip for nip in all_nips_list["NetworkInsightsPaths"]]270 # ntc name declared here as some nips will be delete/create to change properties271 ntc_name = []272 # nip's to delete273 # nip's should be deleted if they (exist and are tagged with the current file path) and:274 # - are not listed (by name) in the current vars file275 # - are listed by name and any properties have changed (delete & re-create)276 ntd_name = []277 for nip in existing_nips:278 # ignore if not tagged with current file path279 twcfp = [t["Value"] for t in nip["Tags"] if t["Key"]=="SourcePath"][0][5:-5] == vars_name 280 if not twcfp: continue281 # delete if name not listed in current vars file282 nlicvf = [t["Value"] for t in nip["Tags"] if t["Key"]=="Name"][0] in [n["Tags:Name"] for n in var["nips"]]283 if not nlicvf: 284 ntd_name.append([t["Value"] for t in nip["Tags"] if t["Key"]=="Name"][0])285 continue286 # delete if any properties have changed287 # match by name288 ename = [t["Value"] for t in nip["Tags"] if t["Key"]=="Name"][0]289 v = [n for n in var["nips"] if n["Tags:Name"]==ename][0]290 phc = []291 phc.append(nip["Destination"] != v["Destination"])292 phc.append("DestinationPort" in nip and str(nip["DestinationPort"]) != str(v["DestinationPort"]))293 phc.append(nip["Protocol"] != v["Protocol"])294 phc.append(nip["Source"] != v["Source"])295 nip_cb = [t["Value"] for t in nip["Tags"] if t["Key"]=="CreatedBy"][0]296 v_cb = c_var["Tags"]["CreatedBy"]297 phc.append(nip_cb != v_cb)298 if any(phc):299 ntd_name.append([t["Value"] for t in nip["Tags"] if t["Key"]=="Name"][0])300 ntc_name.append([t["Value"] for t in nip["Tags"] if t["Key"]=="Name"][0])301 continue302 # else don't delete303 # nip's to create304 # nip's should be created if they either:305 # - don't exist at all306 # - do exist but(and) are tagged with a different vars file path307 for nip in var["nips"]:308 # if nip does not exist (by name)309 ndne = nip["Tags:Name"] not in existing_nip_names310 # if not tagged with current file path311 # SOMETHING IS WRONG WITH THIS LINE312 313 current_var_filepath_tag = ""314 try:315 current_var_filepath_tag = [[t["Value"] for t in n["Tags"] if t["Key"]=="SourcePath"] for n in existing_nips if [t["Value"] for t in n["Tags"] if t["Key"]=="Name"][0] == nip["Tags:Name"]][0][0][5:-5]316 ntwcfp = current_var_filepath_tag != vars_name317 except:318 ntwcfp = False319 if ndne: 320 # create if not exist321 ntc_name.append(nip["Tags:Name"])322 elif ntwcfp:323 # does exist but not current file path tag324 ntc_name.append(nip["Tags:Name"])325 else:326 continue327 # print328 if len(ntc_name)==0 and len(ntd_name)==0:329 # no ntc either330 netraz_output("o","No changes. AWS reachability for " + vars_name + " appears up-to-date.")331 return332 else:333 if len(ntd_name)!=0:334 # some ntd335 netraz_output("p","NIPs to delete: " + ', '.join(ntd_name))336 if len(ntc_name)!=0:337 # no ntd but some ntc338 netraz_output("p","NIPs to create: " + ', '.join(ntc_name))339 # get ntc's from var (declaration file)340 ntc = [nip for nip in var["nips"] if nip["Tags:Name"] in ntc_name]341 # get ntd's from existing_nips where the name is in ntd_name342 ntd = [n for n in existing_nips if [t["Value"] for t in n["Tags"] if t["Key"]=="Name"][0] in ntd_name]343 #plan does nothing344 if action!=file_support_actions[0]:345 # apply346 netraz_output("o", "Do you want to perform these actions? Only \"yes\" will be accepted to approve.")347 apply_confirmation = input(" Enter a value: ")348 if apply_confirmation != "yes":349 raise Exception("User aborted")350 else:351 # delete all ntd's and all of each ones associated nia's352 if (len(ntd) > 0):353 ntd_ids = [n["NetworkInsightsPathId"] for n in ntd]354 delete_network_insights_path(ntd_ids, recursive=True)355 # create non-existing356 for nip in ntc:357 # setup create nip358 args = []359 args.append(nip["Source"])360 args.append(nip["Destination"])361 args.append(nip["Protocol"])362 args.append(nip["DestinationPort"])363 args.append(construct_tag_string(nip, c_var))364 nip_id = create_network_insights_path(args)["NetworkInsightsPathId"]365 nia_id = start_network_insights_analysis([nip_id])366 ...
app.py
Source:app.py
...21 logging.error(22 "Call to delete_network_insights_analysis failed")23 raise error24 try:25 ec2Client.delete_network_insights_path(26 NetworkInsightsPathId=test_detail['NetworkInsightsPathId']27 )28 except botocore.exceptions.ClientError as error:29 logging.error("Call to delete_network_insights_path failed")30 raise error31def format_test_output(inflight_network_test_details, event):32 testResult = {}33 successful_tests = []34 timedout_tests = []35 failed_tests = []36 for test_detail in inflight_network_test_details:37 if (test_detail['Status'] == 'succeeded'):38 successful_tests.append(test_detail)39 elif(test_detail['Status'] == 'running'):...
main.py
Source:main.py
...29 create_network_insights_path(args)30 elif command == commands[2]:31 # delete-nip32 netraz_output("o","Delete network insights path")33 delete_network_insights_path(args)34 elif command == commands[3]:35 # delete-nia36 netraz_output("o","Delete network insights analysis")37 delete_network_insights_analysis(args)38 elif command == commands[4]:39 # start-nia40 netraz_output("o","Start network insights analysis")41 start_network_insights_analysis(args)42 elif command == commands[5]:43 # get-nips44 netraz_output("o","Get network insights paths")45 get_network_insights_paths(args)46 elif command == commands[6]:47 # get-nias...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!