Best Python code snippet using autotest_python
Cisco_To_Json.py
Source:Cisco_To_Json.py
1# Script works fine with IOS, NX-OS, XR, SG-220, Air-OS.2# Caution! Script might fail trying to connect to some devices ,especially when connecting to more than 10 devices simultaneously in a thread.3# This script worked fine and was tested in one Enterprise network environment only, and some templates were edited to work in that environment.4import argparse5import collections6import ipaddress7import json8import logging9import re10import socket11import sys12import time13from concurrent.futures import ThreadPoolExecutor, as_completed14from contextlib import closing15from atlassian.confluence import Confluence16import html_to_json17import paramiko18import textfsm19import yaml20from netmiko import ConnectHandler, SSHDetect21from paramiko_expect import SSHClientInteraction22from tabulate import tabulate23from pprint import pprint24# All specific info is written in yaml file25with open("config.yml", 'r', encoding="utf-8") as ymlfile:26 config = yaml.load(ymlfile, Loader=yaml.FullLoader)27 # Maximum number of ssh conections28 limit = config['limit']29# Arguments30parser = argparse.ArgumentParser()31parser.add_argument("-f", "--file", required=True, help='Choices: "all" - from ip_list_all.txt file; "confluence" - from confluence page; "test" - from ip_list.txt file')32parser.add_argument("-d", "--debug", required=False)33args = parser.parse_args()34file_with_ip_addresses = args.file35debug_flag = args.debug36def logs_and_debug(debug_flag):37 # To view debug logs on a screen38 if debug_flag:39 stream = logging.StreamHandler(sys.stdout)40 stream.setLevel(logging.DEBUG)41 log = logging.getLogger()42 log.addHandler(stream)43 log.setLevel(logging.DEBUG)44 # To write debug logs into a text file45 logging.basicConfig(46 level=logging.DEBUG,47 format='%(asctime)s %(levelname)s %(message)s',48 filename='Log//LOGS.txt',49 filemode='w')50logs_and_debug(debug_flag)51# These lists gather info if something goes wrong52unable_to_login_by_ssh_ip_list = []53authentication_failed_list = []54# Connecting to Confluence page and getting ip list of all devices to connect by SSH55def confluence():56 username = config['confluence.username']57 password = config['confluence.password']58 confluenceBaseUrl = config['confluence.url']59 pageId = config['confluence.pageid']60 #61 ip_list_all = []62 try:63 confluence = Confluence(url=confluenceBaseUrl, username=username, password=password)64 page = confluence.get_page_by_id(page_id=pageId, expand='body.storage').get('body').get('storage').get('value')65 output_json = html_to_json.convert_tables(page)66 for device_dict in output_json[0]:67 ip_address = re.search('\d+\.\d+\.\d+\.\d+', device_dict['IP']).group(0)68 ip_list_all.append(ip_address)69 except:70 print('SOMETHING WENT WRONG GETTING INFO FROM CONFLUENCE PAGE'.center(200, '!'))71 return ip_list_all72# After we got ip list to connect we check each ip for 22 port is open making new reachable ip list73def connect_list(list_ipaddr):74 list_reachable_devices = []75 list_closed_ssh_port = []76 for item in list_ipaddr:77 # Check if the 22 port is open on a device78 with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:79 if sock.connect_ex((item, 22)) == 0:80 list_reachable_devices.append(item)81 else:82 print(str('PORT "22" IS CLOSED ON A DEVICE - ' + item).center(200, '-'))83 list_closed_ssh_port.append(item)84 unable_to_login_by_ssh_ip_list.append(item)85 return True, list_reachable_devices, list_closed_ssh_port86# For each reachable ip we create a dictionary preparing it for connection by Netmiko87def device_dictionary(list_ipaddr_dev):88 username = config['ssh.username']89 password = config['ssh.password']90 # Compiling dictionary of devices91 list_dic_devices = []92 for item in list_ipaddr_dev:93 list_dic_devices.append({'device_type': 'autodetect', 'ip': item, 'username': username, 'password': password, 'fast_cli': False})94 return True, list_dic_devices95# Connecting to device by SSH96def send_command_and_get_output(list_dic_devices, command, limit=limit):97 def send_commands(dic_command, dic_device):98 def netmiko_get_device_type():99 guesser = SSHDetect(timeout=10, **dic_device)100 best_match = guesser.autodetect()101 if best_match:102 dic_device['device_type'] = best_match103 if best_match == 'cisco_wlc':104 dic_device['device_type'] = 'cisco_wlc_ssh'105 return dic_device['device_type']106 def paramiko_get_device_type():107 result = None108 client = paramiko.SSHClient()109 client.set_missing_host_key_policy(paramiko.AutoAddPolicy())110 client.connect(hostname=dic_device['ip'], username=dic_device['username'], password=dic_device['password'], timeout=10)111 with client.invoke_shell() as ssh:112 ssh.send('show version\n')113 time.sleep(10)114 result = ssh.recv(1000).decode('utf-8')115 ssh.close()116 return result117 def paramiko_expect_get_device_type():118 result = None119 client = paramiko.SSHClient()120 client.set_missing_host_key_policy(paramiko.AutoAddPolicy())121 try:122 client.connect(hostname=dic_device['ip'], username=dic_device['username'], password=dic_device['password'])123 except:124 client.get_transport().auth_none(dic_device['username'])125 interact = SSHClientInteraction(client, timeout=10, display=False)126 #127 interact.expect('Username:.*')128 interact.send(dic_device['username'])129 interact.expect('.*:.*')130 username_failed = interact.current_output131 if '% Authentication Failed' in username_failed:132 client.close()133 auth_failed_flag = True134 return result, auth_failed_flag135 else:136 interact.send(dic_device['password'])137 prompt = interact.expect(['.*:.*', '.*#'])138 if prompt == 0:139 password_failed = interact.current_output140 if '% Authentication Failed' in password_failed:141 client.close()142 auth_failed_flag = True143 return result, auth_failed_flag144 else:145 if prompt == 1:146 #interact.expect('.*#')147 interact.send('show version')148 interact.expect('.*#')149 result = interact.current_output150 interact.send('exit')151 interact.expect('.*>')152 interact.send('exit')153 client.close()154 auth_failed_flag = False155 return result, auth_failed_flag156 def ssh_get_device_type():157 # An attempt to understand what kind of device we are connecting to. For the first attempt we will use Netmiko158 try:159 dic_device['device_type'] = netmiko_get_device_type()160 except Exception as e:161 result = None162 if 'Authentication failed' not in str(e):163 print(str('NETMIKO AUTODETECT DOESNT WORK ON THE DEVICE ' + dic_device['ip'] + ' , TRYING PARAMIKO').center(200, '-'))164 # We are trying to determine the type by Paramiko165 time.sleep(5)166 try:167 result = paramiko_get_device_type()168 except:169 print(f'Login on Device {dic_device["ip"]} Was Successful, But no Result From "show version" Command')170 return {'ip': dic_device['ip'], 'output': None, 'device_type': None}171 elif 'Authentication failed' in str(e):172 # Authentication failed. But this can be SG-220, we try it out with Expect and Paramiko173 try:174 result, auth_failed_flag = paramiko_expect_get_device_type()175 if auth_failed_flag:176 unable_to_login_by_ssh_ip_list.append(dic_device['ip'])177 authentication_failed_list.append(dic_device['ip'])178 return {'ip': dic_device['ip'], 'output': None, 'device_type': None}179 pass180 except:181 print('PARAMIKO_EXPECT FAILED TO GET OUTPUT FROM DEVICE'.center(200, '!'))182 unable_to_login_by_ssh_ip_list.append(dic_device['ip'])183 authentication_failed_list.append(dic_device['ip'])184 return {'ip': dic_device['ip'], 'output': None, 'device_type': None}185 pass186 # Result is output from "show version" command by Paramiko in case Netmiko had failed187 if result:188 if 'XR' in result:189 dic_device['device_type'] = 'cisco_xr'190 elif 'Cisco IOS' in result:191 dic_device['device_type'] = 'cisco_ios'192 elif 'NX-OS' in result:193 dic_device['device_type'] = 'cisco_nxos'194 elif 'Cisco Controller' in result:195 dic_device['device_type'] = 'cisco_wlc_ssh'196 elif 'SG220' in result:197 dic_device['device_type'] = 'cisco_s200'198 elif 'image_tesla_hybrid' in result:199 dic_device['device_type'] = 'cisco_s350'200 return dic_device['device_type']201 def netmiko_send_command_cisco_wlc(command_list):202 command_result = {}203 with ConnectHandler(timeout=10, **dic_device) as ssh:204 find_hostname = ssh.send_command('show sysinfo')205 find_hostname = re.search('System Name\S+\.\s+(\S+)', find_hostname).group(1)206 interfaces = ssh.send_command('show interface summary')207 interface_names = re.findall('(\S+)\s+\S+\s+\S+\s+\d+\.', interfaces)208 result_1 = ''209 for interface in interface_names:210 interface_result = ssh.send_command('show interface detailed ' + interface)211 interface_result = interface_result.strip()212 result_1 = result_1 + '\n' + interface_result213 command_result.update({'show interface detailed': result_1})214 time.sleep(2)215 for command in command_list:216 result_2 = ssh.send_command(command, read_timeout=50)217 result_2 = result_2.strip()218 time.sleep(2)219 command_result.update({command: result_2})220 ssh.disconnect()221 return find_hostname, command_result222 def netmiko_send_command_sg_220(command_list):223 command_result = {}224 client = paramiko.SSHClient()225 client.set_missing_host_key_policy(paramiko.AutoAddPolicy())226 try:227 client.connect(hostname=dic_device['ip'], username=dic_device['username'], password=dic_device['password'])228 except:229 client.get_transport().auth_none(dic_device['username'])230 interact = SSHClientInteraction(client, timeout=10, display=False)231 #232 interact.expect('Username: ')233 if interact.last_match == 'Username: ':234 interact.send(dic_device['username'])235 #236 interact.expect('Password: ')237 if interact.last_match == 'Password: ':238 interact.send(dic_device['password'])239 #240 interact.expect('.*#')241 result_for_hostname = interact.current_output242 find_hostname = re.search('(\S+)#', result_for_hostname).group(1)243 interact.send('terminal length 0')244 interact.expect('.*#')245 for command in command_list:246 interact.send(command)247 interact.expect('.*#', timeout=30)248 result = interact.current_output249 result = result.strip()250 command_result.update({command: result})251 interact.send('exit')252 interact.expect('.*>')253 interact.send('exit')254 return find_hostname, command_result255 def netmiko_send_command_cisco_xr(command_list):256 command_result = {}257 with ConnectHandler(timeout=10, **dic_device) as ssh:258 find_hostname = ssh.find_prompt()259 chars = ['RP/0/RSP0/CPU0:', '>', '#']260 for c in chars:261 if c in find_hostname:262 find_hostname = find_hostname.replace(c, '')263 find_hostname = find_hostname.strip()264 time.sleep(2)265 for command in command_list:266 result_2 = ssh.send_command(command, read_timeout=50)267 result_2 = result_2.strip()268 time.sleep(2)269 command_result.update({command: result_2})270 # For TenGig Interfaces Only271 interfaces = ssh.send_command('show interface brief | i Te')272 interface_names = re.findall("\s+(Te\S+)\s+", interfaces)273 result_1 = ''274 for interface in interface_names:275 interface_result = ssh.send_command(f'show controllers {interface} phy | in Vendor')276 interface_result = f'{interface}\n' + interface_result.strip()277 result_1 = result_1 + '\n' + interface_result278 command_result.update({'show controllers interface': result_1})279 ssh.disconnect()280 return find_hostname, command_result281 def netmiko_send_command_cisco_common(command_list):282 command_result = {}283 with ConnectHandler(timeout=10, **dic_device) as ssh:284 if ssh.check_config_mode == False:285 ssh.enable()286 for command in command_list:287 result = ssh.send_command(command, read_timeout=50)288 result = result.strip()289 command_result.update({command: result})290 # Determine the host name for devices291 find_hostname = ssh.find_prompt()292 if '>' in find_hostname:293 find_hostname = find_hostname.replace('>', '')294 elif '#' in find_hostname:295 find_hostname = find_hostname.replace('#', '')296 find_hostname = find_hostname.strip()297 ssh.disconnect()298 return find_hostname, command_result299 dic_device['device_type'] = ssh_get_device_type()300 time.sleep(3)301 #302 if dic_device['device_type']:303 if dic_device['device_type'] != 'autodetect':304 # Connecting to device by Netmiko, sending command and getting output305 try:306 command_list = dic_command[dic_device['device_type']]307 except Exception as e:308 if 'KeyError' not in str(e):309 print(f'THERE IS NO COMMANDS DICTIONARY ADDED FOR DEVICE TYPE "{dic_device["device_type"]}" . ADD NEW DICT TO "dic_command" VARIABLE'.center(200, '!'))310 return ({'ip': dic_device['ip'], 'output': None, 'device_type': dic_device['device_type']})311 if dic_device['device_type'] == 'cisco_wlc_ssh':312 try:313 find_hostname, command_result = netmiko_send_command_cisco_wlc(command_list)314 return {'ip': dic_device['ip'], 'output': command_result, 'hostname': find_hostname, 'device_type': dic_device['device_type']}315 except:316 return ({'ip': dic_device['ip'], 'output': None, 'device_type': dic_device['device_type']})317 elif dic_device['device_type'] == 'cisco_s200':318 try:319 find_hostname, command_result = netmiko_send_command_sg_220(command_list)320 return {'ip': dic_device['ip'], 'output': command_result, 'hostname': find_hostname, 'device_type': dic_device['device_type']}321 except:322 return {'ip': dic_device['ip'], 'output': None, 'device_type': dic_device['device_type']}323 elif dic_device['device_type'] == 'cisco_xr':324 try:325 find_hostname, command_result = netmiko_send_command_cisco_xr(command_list)326 return {'ip': dic_device['ip'], 'output': command_result, 'hostname': find_hostname, 'device_type': dic_device['device_type']}327 except:328 return ({'ip': dic_device['ip'], 'output': None, 'device_type': dic_device['device_type']})329 else:330 try:331 find_hostname, command_result = netmiko_send_command_cisco_common(command_list)332 return {'ip': dic_device['ip'], 'output': command_result, 'hostname': find_hostname, 'device_type': dic_device['device_type']}333 except:334 return {'ip': dic_device['ip'], 'output': None, 'device_type': dic_device['device_type']}335 else:336 print(f"NETMIKO OR PARAMIKO FAILED TO DETERMINE DEVICE TYPE FOR DEVICE {dic_device['ip']}".center(200, '!'))337 def send_commands_threads(command_for_device, list_dic_devices, limit=limit):338 list_results_all_connections = []339 with ThreadPoolExecutor(max_workers=limit) as executor:340 futures_result = [executor.submit(send_commands, command_for_device, device) for device in list_dic_devices]341 for f in as_completed(futures_result):342 list_results_all_connections.append(f.result())343 return list_results_all_connections344 command_to_send = command345 # We run connection in parallel threads346 devices_with_output = []347 result = send_commands_threads(command_to_send, list_dic_devices, limit=limit)348 if result:349 for item in result:350 if item:351 if 'output' in item:352 if item['output']:353 devices_with_output.append(item)354 return True, devices_with_output355# Parsing raw data from cli output356def parse_output_textfsm(list_of_command_output, dic_index):357 # Creating list of dictionaries with data about devices358 list_all_devices_dicts = []359 for device in list_of_command_output:360 list_parsed_otput_current_device = []361 parse_check_list = [f"{device['ip']} [{device['device_type']}]".center(200, '=')]362 try:363 # We are setting compliance of device type -> command -> template364 device_type = device['device_type']365 # If we don't have output for the device we must to break iteration366 if device['output'] is None:367 print('\n', '=' * 200)368 print('\n', str('SOMETHING WENT WRONG, THERE IS NO OUTPUT FOR DEVICE ' + device['ip']).center(200, '-'))369 continue370 if device['output']:371 parse_check_list.append(f'{device["ip"]}: Output +')372 list_outpud_commands = list(device['output'].keys())373 for output_cmd in list_outpud_commands:374 def template_parse():375 template = dic_index[device_type][output_cmd]376 line_outpud = device['output'][output_cmd]377 # We are opening the template file and parse the output.378 try:379 with open(template, 'r') as f_template:380 re_table = textfsm.TextFSM(f_template)381 header = re_table.header382 result = re_table.ParseText(line_outpud)383 device_parsed_output = [header] + result384 parse_check_list.append(f'{device["ip"]}: Template {template} +')385 except(FileNotFoundError):386 print('\n', str(f'SOMETHING WENT WRONG PARSING "{output_cmd}" FOR DEVICE {device["ip"]}').center(200, '!'))387 device_parsed_output = None388 # Saving raw parsed output to file just in case389 try:390 with open('Devices_Files/' + device['hostname'] + '.txt', 'a') as raw_parsed_output:391 raw_parsed_output.writelines('\n')392 raw_parsed_output.writelines(str(f'Parsed output of command: "{output_cmd}" for device "{device["ip"]}"').center(200, '-'))393 raw_parsed_output.writelines('\n')394 raw_parsed_output.writelines(tabulate(device_parsed_output, headers='firstrow', tablefmt='grid'))395 raw_parsed_output.writelines('\n')396 except:397 print('FAILED TO CREATE A TEXT FILE. DOES THE DIRECTORY "Devices_Files" EXIST IN THE SCRIPT FOLDER?'.center(200, '!'))398 return device_parsed_output399 device_parsed_output = template_parse()400 # Collecting parsed data to new dictionary. Commands level401 list_parsed_otput_current_cmd = [dict(zip(device_parsed_output[0], x)) for x in device_parsed_output[1:]]402 # pprint(list_parsed_otput_current_cmd)403 list_parsed_otput_current_device.append(list_parsed_otput_current_cmd)404 # pprint(list_parsed_otput_current_device)405 # Gathering info about device interfaces 406 def interface_parse():407 shortcuts = {'Gi': 'GigabitEthernet', 'gi': 'GigabitEthernet', 'Fa': 'FastEthernet', 'Twe': 'TwentyFiveGigE', 'Tw': 'TwoGigabitEthernet', 'Te': 'TenGigabitEthernet', 'Po': 'Port-channel', 'po': 'Port-channel',408 'port-channel': 'Port-channel', 'Fo': 'FortyGigabitEthernet', 'Eth': 'Ethernet', 'Ap': 'AppGigabitEthernet'}409 if device['device_type'] != 'cisco_xr' and device['device_type'] != 'cisco_wlc_ssh':410 # Parsing "show interfaces" command in order to get data about VLANs411 def interface_vlan_parse():412 try:413 vlan_info = list_parsed_otput_current_device[2]414 if device['device_type'] == 'cisco_s200':415 vlan_info = list_parsed_otput_current_device[1]416 for i in vlan_info:417 # Sorting Vlans on Each Interface418 if i['TRUNKING_VLANS'] == ['1-4094']:419 i['TRUNKING_VLANS'] = ['ALL']420 else:421 vlans = str(i['TRUNKING_VLANS']).strip("[]'").replace("', '", '').split(',')422 for z in range(len(vlans)):423 if '-' in vlans[z]:424 vlan_scope = re.compile('(\w+)-(\w+)')425 scope = range(int(vlan_scope.search(vlans[z]).group(1)), int(vlan_scope.search(vlans[z]).group(2)) + 1)426 vlans[z] = str([*scope]).strip('[]').replace(' ', '')427 vlans = str(vlans).strip("[]'").replace("'", '').replace(' ', '').split(',')428 i['TRUNKING_VLANS'] = vlans429 # The command "show int switchport" doesnt show up the whole name of an interface, so we must replace it430 for key, value in shortcuts.items():431 if key != re.search('([A-zZ-a]+)\d', i['INTF']).group(1):432 continue433 elif key == re.search('([A-zZ-a]+)\d', i['INTF']).group(1):434 i['INTF'] = i['INTF'].replace(key, value)435 break436 except:437 print('SOMETHING WENT WRONG PARSING VLANS FROM INTERFACES DATA'.center(200, '!'))438 interface_vlan_parse()439 if device['device_type'] == 'cisco_s200':440 reshaped_interfaces_dictionary_list = [{x["INTF"]: x for x in y} for y in list_parsed_otput_current_device[0:2]]441 for i in reshaped_interfaces_dictionary_list:442 i['mgmt'] = {'INTF': 'mgmt',443 "PROTOCOL_STATUS": "up",444 'DESCRIPTION': 'Management Vlan',445 'PRIMARY_IP_ADDRESS': [list_parsed_otput_current_device[5][0]['IP']],446 'PRIMARY_IP_MASK': [list_parsed_otput_current_device[5][0]['MASK']]}447 else:448 reshaped_interfaces_dictionary_list = [{x["INTF"]: x for x in y} for y in list_parsed_otput_current_device[0:4]]449 elif device['device_type'] == 'cisco_xr':450 reshaped_interfaces_dictionary_list = [{x["INTF"]: x for x in y} for y in list_parsed_otput_current_device[0:2]]451 elif device['device_type'] == 'cisco_wlc_ssh':452 reshaped_interfaces_dictionary_list = [{x["INTF"]: x for x in y} for y in list_parsed_otput_current_device[0:2]]453 for i in list_parsed_otput_current_device[1]:454 try:455 wlc_port = re.search('(\d)', i['INTF']).group(1)456 except:457 wlc_port = None458 if 'PROTOCOL_STATUS' in i.keys():459 if wlc_port:460 i['INTF'] = 'GigabitEthernet0/0/' + wlc_port461 i['HARDWARE_TYPE'] = 'Gigabit Ethernet'462 if 'RP' in i['INTF'] or 'SP' in i['INTF']:463 i['HARDWARE_TYPE'] = 'Gigabit Ethernet'464 # Merging all dictionaries by key that contains the name of the interface465 results = collections.defaultdict(dict)466 if reshaped_interfaces_dictionary_list:467 for c in reshaped_interfaces_dictionary_list:468 for key, value in c.items():469 results[key] = {**results[key], **value}470 interfaces_final_list_of_dicts = list(results.values())471 parse_check_list.append(f'{device["ip"]}: Interfaces +')472 return interfaces_final_list_of_dicts473 else:474 print('SOMETHING WENT WRONG MERGING INTERFACE DATA'.center(200, '!'))475 interfaces_final_list_of_dicts = interface_parse()476 # Now We got a list where each interface is a dict with keys477 #478 # Info specific to an Enterprise479 switches = config['switches']480 switches_L3 = config['switches_L3']481 sites = config['sites']482 def software_and_manufacturer_parse():483 sotware_types = {'cisco_ios': 'Cisco IOS Software', 'cisco_nxos': 'Cisco Nexus Operating System (NX-OS) Software', 'cisco_xr': 'Cisco IOS XR Software', 'cisco_s200': 'Cisco Sx220 Series Switch Software'}484 software = None485 manufacturer = None486 if device['device_type'] == 'cisco_wlc_ssh':487 software = list_parsed_otput_current_device[2][0]['DESCR']488 else:489 for key, value in sotware_types.items():490 if device['device_type'] == key:491 software = value492 manufacturer = re.search('^(\S+)', value).group(1)493 return software, manufacturer494 software, manufacturer = software_and_manufacturer_parse()495 def inventory_parse():496 try:497 inventory_list = []498 num = None499 transivers_interfaces = []500 if device['device_type'] == 'cisco_wlc_ssh':501 num = 2502 elif device['device_type'] == 'cisco_ios':503 num = 4504 elif device['device_type'] == 'cisco_nxos':505 if list_parsed_otput_current_device[4]:506 num = 4507 elif not list_parsed_otput_current_device[4] and list_parsed_otput_current_device[9]:508 num = 9509 elif device['device_type'] == 'cisco_xr':510 num = 2511 transivers_interfaces = list_parsed_otput_current_device[5]512 if list_parsed_otput_current_device[num]:513 inventory_data = list_parsed_otput_current_device[num] + transivers_interfaces514 for inv in inventory_data:515 if 'NAME' not in inv:516 inv['NAME'] = inv['PID']517 if 'DESCR' not in inv:518 inv['DESCR'] = inv['PID']519 inventory = {'NAME': inv['NAME'].strip(),520 'MANUFACTURER': 'Cisco',521 'PID': inv['PID'].strip(),522 'SN': inv['SN'].strip(),523 'DESCRIPTION': inv['DESCR'].strip()}524 inventory_list.append(inventory)525 parse_check_list.append(f'{device["ip"]}: Inventory +')526 if device['device_type'] == 'cisco_xr':527 for inventory_module in inventory_list:528 if 'Chassis' in inventory_module['DESCRIPTION']:529 dev_type = inventory_module['PID']530 serial_number = inventory_module['SN']531 else:532 dev_type = list_parsed_otput_current_device[num][0]['PID']533 serial_number = list_parsed_otput_current_device[num][0]['SN']534 return dev_type, serial_number, inventory_list535 except:536 print('SOMETHING WENT WRONG PARSING INVENTORY DATA'.center(200, '!'))537 if device['device_type'] != 'cisco_s200':538 dev_type, serial_number, inventory_list = inventory_parse()539 def device_role_parse():540 device_role = 'router'541 # By default each device is considered a router type542 # For small business switches543 if device['device_type'] == 'cisco_s200':544 device_role = 'switch_smb'545 # For wireless controllers546 elif device['device_type'] == 'cisco_wlc_ssh':547 device_role = 'wlc'548 if device['device_type'] != ('cisco_wlc_ssh' and 'cisco_s200'):549 for name in switches:550 if name not in dev_type:551 continue552 elif name in dev_type:553 device_role = 'switch'554 break555 for name in switches_L3:556 if name not in dev_type:557 continue558 elif name in dev_type:559 device_role = 'switch_layer_3'560 break561 return device_role562 device_role = device_role_parse()563 def site_parse():564 site = None565 region = None566 for site_name, region in sites.items():567 if site_name in device['hostname']:568 site = site_name569 region = region570 break571 else:572 site = None573 region = None574 return site, region575 site, region = site_parse()576 def vlans_parse():577 vlan_group = ''578 vlan_group_description = ''579 # Gathering Information About Vlans580 try:581 intf_reshaped = {}582 for o in list_parsed_otput_current_device[0]:583 if 'lan' in o['INTF']:584 intf_reshaped[re.search('(\d+)', o['INTF']).group(1)] = o585 if device['device_type'] == 'cisco_ios' or device['device_type'] == 'cisco_nxos':586 if list_parsed_otput_current_device[7] != [] and list_parsed_otput_current_device[8] == []:587 vlan_reshaped = {i['VLAN_ID']: i for i in list_parsed_otput_current_device[7]}588 elif list_parsed_otput_current_device[8] != [] and list_parsed_otput_current_device[7] == []:589 vlan_reshaped = {i['VLAN_ID']: i for i in list_parsed_otput_current_device[8]}590 elif list_parsed_otput_current_device[8] == [] and list_parsed_otput_current_device[7] == []:591 vlan_reshaped = {}592 elif device['device_type'] == 'cisco_s200':593 if list_parsed_otput_current_device[4]:594 vlan_reshaped = {i['VLAN_ID']: i for i in list_parsed_otput_current_device[4]}595 elif not list_parsed_otput_current_device[4]:596 vlan_reshaped = {}597 li = [intf_reshaped, vlan_reshaped]598 vlan = collections.defaultdict(dict)599 for z in li:600 for key, value in z.items():601 vlan[key] = {**vlan[key], **value}602 vlans_list = []603 # prefix = str(ipaddress.ip_network(ip.address, False))604 for i in list(vlan.values()):605 if 'VRF' in i.keys() and i['VRF'] != '':606 vrf = i['VRF']607 else:608 vrf = ''609 if 'PRIMARY_IP_ADDRESS' in i.keys() and i['PRIMARY_IP_ADDRESS'] != []:610 primary_prefixes = [str(ipaddress.ip_network(address, False)) for address in ['/'.join(pair) for pair in zip(i['PRIMARY_IP_ADDRESS'], i['PRIMARY_IP_MASK'])]]611 else:612 primary_prefixes = []613 if 'SECONDARY_IP_ADDRESS' in i.keys() and i['SECONDARY_IP_ADDRESS'] != []:614 secondary_prefixes = [str(ipaddress.ip_network(address, False)) for address in ['/'.join(pair) for pair in zip(i['SECONDARY_IP_ADDRESS'], i['SECONDARY_IP_MASK'])]]615 else:616 secondary_prefixes = []617 if 'VLAN_ID' in i:618 if i['INTERFACES'] == '---':619 i['INTERFACES'] = ''620 vlans = {'vlan_id': i['VLAN_ID'],621 'vlan_name': i['NAME'].rstrip(),622 'vlan_status': 'active',623 'vlan_group': vlan_group,624 'vlan_group_description': vlan_group_description,625 'vlan_ports': i['INTERFACES'],626 'vlan_primary_prefixes': primary_prefixes,627 'vlan_secondary_prefixes': secondary_prefixes,628 'vrf': vrf,629 'site': site}630 vlans_list.append(vlans)631 if vlans_list:632 parse_check_list.append(f'{device["ip"]}: Vlans +')633 return vlans_list634 except:635 print('SOMETHING WENT WRONG PARSING VLANS DATA'.center(200, '!'))636 if device['device_type'] != 'cisco_xr' and device['device_type'] != 'cisco_wlc_ssh':637 vlans_list = vlans_parse()638 def cdp_neighbor_parse():639 try:640 if device['device_type'] == 'cisco_s200':641 shortcuts_cdp = {'Gi ': 'GigabitEthernet', 'gi': 'GigabitEthernet', 'Fa ': 'FastEthernet', 'Two ': 'TwoGigabitEthernet', 'Ten ': 'TenGigabitEthernet', 'Eth ': 'Ethernet'}642 elif device['device_type'] == 'cisco_ios':643 shortcuts_cdp = {'Gig': 'GigabitEthernet', 'gi': 'GigabitEthernet', 'Fas': 'FastEthernet', 'Two': 'TwoGigabitEthernet', 'Ten': 'TenGigabitEthernet', 'Eth': 'Ethernet', 'App': ' AppGigabitEthernet'}644 elif device['device_type'] == 'cisco_nxos':645 shortcuts_cdp = {'Gig': 'GigabitEthernet', 'gi': 'GigabitEthernet', 'Fas': 'FastEthernet', 'Two': 'TwoGigabitEthernet', 'Ten': 'TenGigabitEthernet', 'Eth': 'Ethernet', 'App': ' AppGigabitEthernet'}646 elif device['device_type'] == 'cisco_xr':647 shortcuts_cdp = {'Gi': 'GigabitEthernet', 'gi': 'GigabitEthernet', 'Fa': 'FastEthernet', 'Tw': 'TwoGigabitEthernet', 'Te': 'TenGigE', 'Et': 'Ethernet', 'Mg': 'MgmtEth'}648 elif device['device_type'] == 'cisco_wlc_ssh':649 shortcuts_cdp = {'Gig': 'GigabitEthernet', 'gi': 'GigabitEthernet', 'Fas': 'FastEthernet', 'Two': 'TwoGigabitEthernet', 'Ten': 'TenGigabitEthernet', 'Eth': 'Ethernet', 'App': ' AppGigabitEthernet'}650 neighbors_list = []651 if device['device_type'] == 'cisco_xr':652 lso = list_parsed_otput_current_device[4]653 for neighbor in lso:654 for key, value in shortcuts_cdp.items():655 if key == re.search('([A-zZ-a]+)\s*\d', neighbor['LOCAL_PORT']).group(1):656 neighbor['LOCAL_PORT'] = neighbor['LOCAL_PORT'].replace(key, value).replace(' ', '')657 if key == re.search('([A-zZ-a]+)\s*\d', neighbor['REMOTE_PORT']).group(1):658 neighbor['REMOTE_PORT'] = neighbor['REMOTE_PORT'].replace(key, value).replace(' ', '')659 # To get rid of domain suffix of neighbor device or nexus module suffix660 try:661 nexus_extras = re.search('\S+(\(\S+\))', neighbor['DEST_HOST']).group(1)662 except:663 nexus_extras = 'null'664 domains = config['domains'] + [nexus_extras]665 for domain in domains:666 if domain not in neighbor['DEST_HOST']:667 continue668 else:669 neighbor['DEST_HOST'] = neighbor['DEST_HOST'].replace(domain, '')670 connection = {'DEVICE_B': neighbor['DEST_HOST'],671 'DEVICE_A': device['hostname'],672 'INTERFACE_A': neighbor['LOCAL_PORT'],673 'INTERFACE_B': neighbor['REMOTE_PORT']}674 neighbors_list.append(connection)675 parse_check_list.append(f'{device["ip"]}: Neighbors +')676 #677 else:678 if device['device_type'] == 'cisco_s200':679 lso = list_parsed_otput_current_device[3]680 elif device['device_type'] == 'cisco_wlc_ssh':681 lso = list_parsed_otput_current_device[3]682 else:683 lso = list_parsed_otput_current_device[6]684 for neighbor in lso:685 for key, value in shortcuts_cdp.items():686 if key == re.search('([A-zZ-a]+)\s*\d', neighbor['LOCAL_INTERFACE']).group(1):687 neighbor['LOCAL_INTERFACE'] = neighbor['LOCAL_INTERFACE'].replace(key, value).replace(' ', '')688 if key == re.search('([A-zZ-a]+)\s*\d', neighbor['NEIGHBOR_INTERFACE']).group(1):689 neighbor['NEIGHBOR_INTERFACE'] = neighbor['NEIGHBOR_INTERFACE'].replace(key, value).replace(' ', '')690 try:691 nexus_extras = re.search('\S+(\(\S+\))', neighbor['NEIGHBOR']).group(1)692 except:693 nexus_extras = 'null'694 domains = config['domains'] + [nexus_extras]695 for domain in domains:696 if domain not in neighbor['NEIGHBOR']:697 continue698 else:699 neighbor['NEIGHBOR'] = neighbor['NEIGHBOR'].replace(domain, '')700 connection = {'DEVICE_B': neighbor['NEIGHBOR'],701 'DEVICE_A': device['hostname'],702 'INTERFACE_A': neighbor['LOCAL_INTERFACE'],703 'INTERFACE_B': neighbor['NEIGHBOR_INTERFACE']}704 neighbors_list.append(connection)705 parse_check_list.append(f'{device["ip"]}: Neighbors +')706 return neighbors_list707 except:708 print('SOMETHING WENT WRONG PARSING CDP NEIGHBORS DATA'.center(200, '!'))709 neighbors_list = cdp_neighbor_parse()710 def hsrp_parse():711 virtual_ip_list = []712 try:713 shortcuts_hsrp = {'Vl': 'Vlan', 'Gi': 'GigabitEthernet'}714 if list_parsed_otput_current_device[9]:715 for virtual_interface in list_parsed_otput_current_device[9]:716 for key, value in shortcuts_hsrp.items():717 if key in virtual_interface['INTF']:718 virtual_interface['INTF'] = virtual_interface['INTF'].replace(key, value)719 virtual_ip = {'VIRTUAL_INTERFACE': virtual_interface['INTF'],720 'VIRTUAL_IP': virtual_interface['VIRTUALIP'],721 'ACTIVE_IP': virtual_interface['ACTIVE'],722 'STANDBY_IP': virtual_interface['STANDBY']}723 virtual_ip_list.append(virtual_ip)724 parse_check_list.append(f'{device["ip"]}: hsrp +')725 if virtual_ip_list:726 return virtual_ip_list727 except:728 print('SOMETHING WENT WRONG PARSING HSRP DATA'.center(200, '!'))729 if device['device_type'] == 'cisco_ios':730 virtual_ip_list = hsrp_parse()731 def cisco_wlc_parse():732 ap_list = []733 wlans = []734 try:735 if list_parsed_otput_current_device[4] and list_parsed_otput_current_device[5]:736 list_reshaped_aps = [{x["AP_NAME"]: x for x in y} for y in list_parsed_otput_current_device[4:6]]737 results_ap = collections.defaultdict(dict)738 for c in list_reshaped_aps:739 for key, value in c.items():740 results_ap[key] = {**results_ap[key], **value}741 final_list_of_ap_dicts = list(results_ap.values())742 for access_point in final_list_of_ap_dicts:743 ap = {'AP_NAME': access_point['AP_NAME'],744 'AP_IP': access_point['PRIMARY_IP_ADDRESS'],745 'AP_MAC': access_point['MAC'],746 'AP_MODEL': access_point['AP_MODEL'],747 'AP_INTERFACE': 'GigabitEthernet0',748 'AP_PLATFORM': access_point['PLATFORM'],749 'AP_SN': access_point['SN']}750 ap_list.append(ap)751 parse_check_list.append(f'{device["ip"]}: Access_Points +')752 except:753 print('SOMETHING WENT WRONG PARSING AP DATA'.center(200, '!'))754 # Getting wireless LAN list755 try:756 if list_parsed_otput_current_device[6]:757 for ssid in list_parsed_otput_current_device[6]:758 wlan = {'WLANID': ssid['WLANID'],759 'SSID': ssid['SSID'],760 'INTERFACE': ssid['INTERFACE']}761 wlans.append(wlan)762 parse_check_list.append(f'{device["ip"]}: Wireless_LANs +')763 except:764 print('SOMETHING WENT WRONG PARSING WLANs DATA'.center(200, '!'))765 return ap_list, wlans766 if device['device_type'] == 'cisco_wlc_ssh':767 ap_list, wlans = cisco_wlc_parse()768 # Then we create a final dict for a device769 def device_final_dict():770 if device['device_type'] == 'cisco_wlc_ssh':771 platform = software772 final_device_dict = {773 'MANUFACTURER': 'Cisco',774 'DEVICE_NAME': device['hostname'],775 'DEVICE_IP': device['ip'],776 'REGION': region,777 'SITE': site,778 'DEVICE_TYPE': list_parsed_otput_current_device[2][0]['PID'],779 'DEVICE_PLATFORM': platform,780 'SOFTWARE': 'Cisco Air OS',781 'DEVICE_ROLE': device_role,782 'SERIAL_NUMBER': list_parsed_otput_current_device[2][0]['SN'],783 'INTERFACES': interfaces_final_list_of_dicts,784 'AP': ap_list,785 'CONNECTIONS': neighbors_list,786 'WLANS': wlans,787 'INVENTORY': inventory_list788 }789 if device['device_type'] == 'cisco_s200':790 final_device_dict = {791 'MANUFACTURER': manufacturer,792 'DEVICE_NAME': device['hostname'],793 'DEVICE_IP': device['ip'],794 'REGION': region,795 'SITE': site,796 'DEVICE_TYPE': list_parsed_otput_current_device[2][0]['PID'],797 'DEVICE_PLATFORM': 'Sx220',798 'SOFTWARE': software,799 'DEVICE_ROLE': device_role,800 'SERIAL_NUMBER': list_parsed_otput_current_device[2][0]['SN'],801 'INTERFACES': interfaces_final_list_of_dicts,802 'VLANS': vlans_list,803 'CONNECTIONS': neighbors_list804 }805 if device['device_type'] == 'cisco_ios':806 final_device_dict = {807 'MANUFACTURER': manufacturer,808 'DEVICE_NAME': device['hostname'],809 'DEVICE_IP': device['ip'],810 'REGION': region,811 'SITE': site,812 'DEVICE_TYPE': list_parsed_otput_current_device[4][0]['PID'],813 'DEVICE_PLATFORM': list_parsed_otput_current_device[5][0]['PLATFORM'],814 'SOFTWARE': software,815 'DEVICE_ROLE': device_role,816 'SERIAL_NUMBER': list_parsed_otput_current_device[4][0]['SN'],817 'INTERFACES': interfaces_final_list_of_dicts,818 'VLANS': vlans_list,819 'CONNECTIONS': neighbors_list,820 'HSRP': virtual_ip_list,821 'INVENTORY': inventory_list822 }823 if device['device_type'] == 'cisco_xr':824 platform = 'IOS-XR'825 final_device_dict = {826 'MANUFACTURER': manufacturer,827 'DEVICE_NAME': device['hostname'],828 'DEVICE_IP': device['ip'],829 'REGION': region,830 'SITE': site,831 'DEVICE_TYPE': dev_type,832 'DEVICE_PLATFORM': platform,833 'SOFTWARE': software,834 'DEVICE_ROLE': device_role,835 'SERIAL_NUMBER': serial_number,836 'INTERFACES': interfaces_final_list_of_dicts,837 'CONNECTIONS': neighbors_list,838 'INVENTORY': inventory_list839 }840 if device['device_type'] == 'cisco_nxos':841 platform = 'NX-OS'842 final_device_dict = {843 'MANUFACTURER': manufacturer,844 'DEVICE_NAME': device['hostname'],845 'DEVICE_IP': device['ip'],846 'REGION': region,847 'SITE': site,848 'DEVICE_TYPE': dev_type,849 'DEVICE_PLATFORM': platform,850 'SOFTWARE': software,851 'DEVICE_ROLE': device_role,852 'SERIAL_NUMBER': serial_number,853 'INTERFACES': interfaces_final_list_of_dicts,854 'VLANS': vlans_list,855 'CONNECTIONS': neighbors_list,856 'INVENTORY': inventory_list857 }858 return final_device_dict859 final_device_dict = device_final_dict()860 # Then we create a final list of dict for all devices861 list_all_devices_dicts.append(final_device_dict)862 print(*parse_check_list, sep='\n')863 except:864 print(f"PARSING DATA FROM DEVICE {device['hostname']} FAILED".center(200, '!'))865 pass866 with open('devices_api.csv', 'w', encoding='utf8') as file:867 json.dump(list_all_devices_dicts, file, indent=16, sort_keys=True)868 with open('devices_api.csv') as json_file:869 device_list = json.load(json_file)870 for device in device_list:871 name = device['DEVICE_NAME']872 with open('api//' + name + '.csv', 'w', encoding='utf8') as file:873 json.dump([device], file, indent=16, sort_keys=True)874 return True, list_all_devices_dicts875# Flag of the correct execution of each step of the script876all_doing_well = False877# Timer of showing error notification878sleep_time = 5879# Choosing a File880if file_with_ip_addresses == 'all':881 with open('ip_list_all.txt', 'r') as f:882 request_ip_address_list = re.findall('\d+\.\d+\.\d+\.\d+', f.read())883elif file_with_ip_addresses == 'confluence':884 request_ip_address_list = confluence()885elif file_with_ip_addresses == 'test':886 with open('ip_list.txt', 'r') as f:887 request_ip_address_list = re.findall('\d+\.\d+\.\d+\.\d+', f.read())888list_reachable_ips = None889list_unreachable_ips = None890if request_ip_address_list:891 all_doing_well, list_reachable_ips, list_unreachable_ips = connect_list(request_ip_address_list)892else:893 print('\n', '=' * 200)894 print(str('NO IPs in a FILE. CHECK IT OUT').center(200, ' '))895 print('=' * 200)896 time.sleep(sleep_time)897 exit()898if list_reachable_ips:899 all_doing_well, list_of_dic_devices = device_dictionary(list_reachable_ips)900else:901 print('\n', '=' * 200)902 print(str('IP LIST IS EMPTY OR SOMETHING WENT WRONG MAKING LIST OF DICTS').center(200, ' '))903 print('=' * 200)904 time.sleep(sleep_time)905 if list_unreachable_ips:906 print('\n')907 print(str('SSH PORT IS CLOSED ON THESE DEVICES').center(200, ' '))908 for i in list_unreachable_ips:909 print(str(i).center(200, '-'))910 exit()911if all_doing_well:912 # The commands are little different for different devices types913 dic_command = {'cisco_ios': ['show ip interface',914 'show interfaces',915 'show interfaces switchport',916 'show running-config',917 'show inventory',918 'show version',919 'show cdp neighbors',920 'show vlan',921 'show vlan-switch',922 'show standby brief'],923 'cisco_xr': ['show ipv4 interface',924 'show interfaces',925 'show inventory all',926 'show version',927 'show cdp neighbors detail'],928 'cisco_nxos': ['show ip interface vrf all',929 'show interface',930 'show interface switchport',931 'show running-config',932 'show inventory all',933 'show version',934 'show cdp neighbors',935 'show vlan',936 'show vlan-switch',937 'show inventory'],938 'cisco_s200': ['show interfaces GigabitEthernet 1-50',939 'show running-config',940 'show version',941 'show cdp neighbor',942 'show vlan',943 'show ip'],944 'cisco_wlc_ssh': ['show port summary',945 'show inventory',946 'show cdp neighbors',947 'show ap summary',948 'show ap inventory all',949 'show wlan summary']}950 all_doing_well, list_command_output = send_command_and_get_output(list_of_dic_devices, dic_command, limit=limit)951# Parsing output of devices by TextFSM952if all_doing_well:953 # We use different template for parsing of different output of different type of device954 dic_parse_index = {'cisco_ios': {'show ip interface': 'templates/cisco_ios_show_ip_interface.template',955 'show interfaces': 'templates/cisco_ios_show_interfaces.template',956 'show interfaces switchport': 'templates/cisco_ios_show_interfaces_switchport.template',957 'show running-config': 'templates/cisco_ios_show_running_config.template',958 'show inventory': 'templates/cisco_ios_show_inventory.template',959 'show version': 'templates/cisco_ios_show_version.template',960 'show cdp neighbors': 'templates/cisco_ios_show_cdp_neighbors.template',961 'show vlan': 'templates/cisco_ios_show_vlan.template',962 'show vlan-switch': 'templates/cisco_ios_show_vlan.template',963 'show standby brief': 'templates/cisco_ios_show_standby_brief.template'},964 'cisco_xr': {'show ipv4 interface': 'templates/cisco_xr_show_ipv4_interface.template',965 'show interfaces': 'templates/cisco_xr_show_interfaces.template',966 'show inventory all': 'templates/cisco_xr_show_inventory_all.template',967 'show version': 'templates/cisco_xr_show_version.template',968 'show cdp neighbors detail': 'templates/cisco_xr_show_cdp_neighbors_detail.template',969 'show controllers interface': 'templates/cisco_xr_show_controllers_interface.template'},970 'cisco_nxos': {'show ip interface vrf all': 'templates/cisco_nxos_show_ip_interface_vrf_all.template',971 'show interface': 'templates/cisco_nxos_show_interface.template',972 'show interface switchport': 'templates/cisco_nxos_show_interface_switchport.template',973 'show running-config': 'templates/cisco_nxos_show_running_config.template',974 'show inventory all': 'templates/cisco_nxos_show_inventory_all.template',975 'show version': 'templates/cisco_nxos_show_version.template',976 'show cdp neighbors': 'templates/cisco_nxos_show_cdp_neighbors.template',977 'show vlan': 'templates/cisco_nxos_show_vlan.template',978 'show vlan-switch': 'templates/cisco_nxos_show_vlan.template',979 'show inventory': 'templates/cisco_nxos_show_inventory.template'},980 'cisco_s200': {'show interfaces GigabitEthernet 1-50': 'templates/cisco_s200_show_gigabit_interface.template',981 'show running-config': 'templates/cisco_s200_show_running_config.template',982 'show version': 'templates/cisco_s200_show_version.template',983 'show cdp neighbor': 'templates/cisco_s200_show_cdp_neighbor.template',984 'show vlan': 'templates/cisco_s200_show_vlan.template',985 'show ip': 'templates/cisco_s200_show_ip.template'},986 'cisco_s350': {'show interfaces switchport': 'templates/cisco_s350_show_interfaces_switchport.template',987 'show running-config': 'templates/cisco_s350_show_running_config.template',988 'show inventory': 'templates/cisco_s350_show_inventory.template',989 'show cdp neighbor': 'templates/cisco_s350_show_cdp_neighbor.template',990 'show vlan': 'templates/cisco_s350_show_vlan.template',991 'show ip interface': 'templates/cisco_s350_show_ip_interface.template'},992 'cisco_wlc_ssh': {'show interface detailed': 'templates/cisco_wlc_ssh_show_interface_detailed.template',993 'show port summary': 'templates/cisco_wlc_ssh_show_port_summary.template',994 'show inventory': 'templates/cisco_wlc_ssh_show_inventory.template',995 'show cdp neighbors': 'templates/cisco_wlc_ssh_show_cdp_neighbors.template',996 'show ap summary': 'templates/cisco_wlc_ssh_show_ap_summary.template',997 'show ap inventory all': 'templates/cisco_wlc_ssh_show_ap_inventory_all.template',998 'show wlan summary': 'templates/cisco_wlc_ssh_show_wlan_summary.template'}999 }1000 all_doing_well, parsed_devices_output = parse_output_textfsm(list_command_output, dic_parse_index)1001print('\n', '=' * 200, '\n')1002if unable_to_login_by_ssh_ip_list:1003 print(str('UNABLE TO LOGIN BY SSH ON THESE DEVICES').center(200, ' '))1004 for i in unable_to_login_by_ssh_ip_list:1005 print(str(i).center(200, '-'))1006 print('+++++++++++++'.center(200))1007if list_unreachable_ips:1008 print(str('SSH PORT IS CLOSED ON THESE DEVICES').center(200, ' '))1009 for i in list_unreachable_ips:1010 print(str(i).center(200, '-'))1011if authentication_failed_list:1012 print(str('AUTHENTICATION FAILED ON THESE DEVICES').center(200, ' '))1013 for i in authentication_failed_list:1014 print(str(i).center(200, '-'))1015if not (unable_to_login_by_ssh_ip_list or list_unreachable_ips or authentication_failed_list):1016 print(str('NO SSH CONNECTION OR AUTHENTICATION ERRORS').center(200, ' '))1017print('\n', '=' * 200)1018time.sleep(sleep_time)...
inventory_m.py
Source:inventory_m.py
1from re import template2from flask.sessions import NullSession3from werkzeug.wrappers import response4from pynetwork import app5from flask_sqlalchemy import SQLAlchemy6from pynetwork.controller.inventory_c import get_model_as_dict7from sqlalchemy.orm import aliased, query8appRoot = app.root_path9app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:////{appRoot}/data/pynetwork.db"10app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False11db = SQLAlchemy(app)12# Define the Datamodel13class Device(db.Model):14 host = db.Column(db.Text, primary_key=True)15 username = db.Column(db.Text, nullable=False)16 password = db.Column(db.Text, nullable=False)17 port = db.Column(db.Integer, default=22)18 # credential_template = db.Column(db.Text)19# Create the Database20def create_db():21 db.create_all()22 return("Database Created")23# Add New device to the DB24def add_device(deviceDic):25 device_obj = Device(**deviceDic)26 db.session.add(device_obj)27 db.session.commit()28 return("Device Added to Pynetwork")29# Retrive list of device from DB30def list_devices():31 all_device_obj = Device.query.all()32 all_device_list = list()33 for device_obj in all_device_obj:34 all_device_list.append(get_model_as_dict(device_obj))35 return all_device_list36# Get one Device from variable37def get_device(deviceDic):38 find_hostname = deviceDic["host"]39 record = Device.query.filter_by(host=find_hostname).first_or_404(40 description='device - {} does not not exist'.format(find_hostname))41 response = get_model_as_dict(record)42 return (response)43# Update one Device variables/ Properties44def update_device(deviceDic):45 find_hostname = deviceDic["host"]46 record = Device.query.filter_by(host=find_hostname).first_or_404(47 description='device - {} does not not exist '.format(find_hostname))48 record.username = deviceDic["username"]49 record.password = deviceDic["password"]50 record.port = deviceDic["port"]51 # record.credential_template = deviceDic["credential_template"]52 response = get_model_as_dict(record)53 db.session.commit()54 return (response)55# Delete one Device from DB56def delete_device(deviceDic):57 find_hostname = deviceDic["host"]58 record = Device.query.filter_by(host=find_hostname).first_or_404(59 description='device - {} does not not exist '.format(find_hostname))60 db.session.delete(record)61 db.session.commit()62 return ("Sucessfully deleted")63# Get the device parameters from device/devices and make it into list.64def get_device_list(deviceNames):65 devices = deviceNames["devices"]66 device_list = list()67 for device in devices:68 record = Device.query.filter_by(host=device).first()69 if record:70 device_list.append(get_model_as_dict(record))71 else:72 device_list.append("{} does not not exist ".format(device))73 print(device_list)...
collector.py
Source:collector.py
1# -*- coding: utf-8 -*-2from netmiko import ConnectHandler3import threading4import xlrd5import os67wb=xlrd.open_workbook('book.xlsx')8sheet=wb.sheet_by_index(0)9list_device=[]10path=str(os.getcwd())+'/results'1112if 'results' not in os.listdir():13 os.mkdir(path)1415nrows=sheet.nrows16ncols=sheet.ncols17n=01819for x in range(nrows):20 arr = []21 for i in range (ncols):22 arr.append(sheet.cell_value(x,i))23 list_device.append(arr)242526print(list_device)2728lst = [];die=[];diel=open('die.txt','w')293031def try_connect(ipadx):32 try:33 connect_ssh=ConnectHandler(device_type='cisco_ios_telnet',username=ipadx[1],password=ipadx[2],secret=ipadx[3],ip=ipadx[0])34 connect_ssh.enable()35 find_hostname = connect_ssh.find_prompt()36 connect_ssh.send_command("terminal len 0")37 connect_ssh.open_session_log('results/'+ find_hostname +'.txt', 'w')38 connect_ssh.send_command('sh run') # you can edit this command39 connect_ssh.close_session_log()404142 except:43 die.append(ipadx);diel.write(ipadx+'\n')4445for i in list_device:46 t = threading.Thread(target=try_connect, args=(i,))47 lst.append(t)4849for t in lst:50 t.start()5152for t in lst:53 t.join()54del t5556if not die:57 print('\nSUCCESS')58else :59 print ('\nSUCCESS\n### list device down ###');diel.close()6061 for i in range (len(die)):
...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!