Best Python code snippet using tempest_python
Monitoring.py
Source:Monitoring.py
1import networkx as nx2from apscheduler.schedulers.background import BackgroundScheduler3from flask_restful import Resource4from flask import request, jsonify, copy_current_request_context5import requests6from mycroft_bus_client import MessageBusClient, Message7from requests.auth import HTTPBasicAuth8import json9import matplotlib.pyplot as plt10import time11import mysql.connector12from itertools import combinations13import random14from resources.GetHostsWithoutServices import *15from Functions.GetServicesHosts import GetServicesHosts16from Functions.Insert import Insert17from Functions.GetPrioritys import Priority18from Functions.GetDataForPath import GetDataForPath19from ModelTopologyWWeight import topologyInformationWWeight20from ModelTopology import topologyInformation21from flask import jsonify, request, make_response22import time23from types import SimpleNamespace24from models import Intents25list_backup_host={}26switch=""27list_of_list_hosts=[]28host=""29new_switch=""30dict_hosts_down={}31def getMysqlConnection():32 return mysql.connector.connect(user='flask', host='0.0.0.0', port='4000', password='password', database='services_active',auth_plugin='mysql_native_password')33class Monitoring(Resource):34 def monitoring(self):35 global list_backup_host36 global switch37 global list_of_list_hosts38 global host39 global new_switch40 global dict_hosts_down41 m = GetHostsWithoutServices()42 hosts_list_wout_service=m.hosts_w_service()43 url = "http://127.0.0.1:5000/monitoringInterfaces"44 response=requests.get(url=url)45 switchs_down=response.json()46 switchs_down=switchs_down['message']47 print(switchs_down)48 list_hosts={}49 for key, value in switchs_down.items():50 if value:51 switch=str(key).split(":")[-1]52 for i in range(0,len(value)):53 host=value[0].split(":")[-1]54 if(host in list_backup_host):55 print("Already solve")56 else:57 try:58 db = getMysqlConnection()59 sqlstr = "SELECT Switch_src, Switch_dst,Host_src,Host_dst from service_active where Host_src='"+host+"' or Host_dst='"+host+"'"60 print(sqlstr)61 cur = db.cursor()62 cur.execute(sqlstr)63 output_json = cur.fetchall()64 print(output_json)65 for j in range(0,len(output_json)):66 list_hosts["Switch_src"] = output_json[j][0]67 list_hosts["Switch_dst"] = output_json[j][1]68 list_hosts["Host_src"] = output_json[j][2]69 list_hosts["Host_dst"] = output_json[j][3]70 print(list_hosts)71 list_of_list_hosts.append(list_hosts)72 #Criar nova alternativa73 url = "http://127.0.0.1:5000/getAllPortInfo"74 response = requests.get(url)75 if(response.ok):76 jData = json.loads(response.content)77 for key, value in jData.items():78 if(key=='openflow:'+switch):79 if value:80 for l in range(0,len(value)):81 for key, value in value[l].items():82 port_switch=key.split(":")[-1]83 if(port_switch!=host and int(port_switch) in hosts_list_wout_service):84 print(port_switch)85 new_switch=port_switch86 flag=187 break88 if(new_switch):89 count=190 #Se tivermos solução para substituir o host down91 url = "http://127.0.0.1:5000/infoSW"92 res = requests.get(url)93 if(res.ok):94 switch_info = json.loads(res.content)95 #Delete Flow IPV496 path=[]97 path.append(int(list_hosts["Host_src"]))98 path.append(int(list_hosts["Switch_src"]))99 path.append(int(list_hosts["Switch_dst"]))100 path.append(int(list_hosts["Host_dst"]))101 switchs_names=[]102 ip_src=path[0]-100103 ip_dst=path[-1]-100104 for i in range(0,len(path)):105 if(i!=0 and i!=len(path)-1):106 switch_name_src=switch_info[str(path[i])]107 switch_name_src=switch_name_src.split("b")[1]108 switchs_names.append(switch_name_src)109 for i in range(0,len(switchs_names)):110 if(i==0):111 in_port=path[0]112 out_port=str(switchs_names[i])+str(switchs_names[i+1])113 elif(i==len(switchs_names)-1):114 in_port=str(switchs_names[i])+str(switchs_names[i-1])115 out_port=path[-1]116 else:117 in_port=str(switchs_names[i])+str(switchs_names[i-1])118 out_port=str(switchs_names[i])+str(switchs_names[i+1])119 for j in switch_info:120 if(switch_info[j]=='b'+str(switchs_names[i])):121 odl_id=j122 url = "http://127.0.0.1:5000/deleteFlowIPV4"123 data={"Switch":"openflow:"+str(odl_id),"in-port":str(in_port), "out-port":str(out_port), "ipv4-src":"192.168.2."+str(ip_src), "ipv4-dst":"192.168.2."+str(ip_dst)}124 print("delete")125 print(data)126 res_ = requests.post(url, json=data)127 if res_.ok:128 print("Done")129 else:130 count=0131 #Adicionar novo Flow132 path=[]133 appont=""134 if(int(list_hosts["Host_src"])==int(host)):135 path.append(int(new_switch))136 path.append(int(list_hosts["Switch_src"]))137 path.append(int(list_hosts["Switch_dst"]))138 path.append(int(list_hosts["Host_dst"]))139 else:140 path.append(int(list_hosts["Host_src"]))141 path.append(int(list_hosts["Switch_src"]))142 path.append(int(list_hosts["Switch_dst"]))143 path.append(int(new_switch))144 try:145 dict_hosts_down[host].append(path)146 except:147 dict_hosts_down[host]=[path]148 switchs_names=[]149 ip_src=path[0]-100150 ip_dst=path[-1]-100151 for i in range(0,len(path)):152 if(i!=0 and i!=len(path)-1):153 switch_name_src=switch_info[str(path[i])]154 switch_name_src=switch_name_src.split("b")[1]155 switchs_names.append(switch_name_src)156 for i in range(0,len(switchs_names)):157 if(i==0):158 in_port=path[0]159 out_port=str(switchs_names[i])+str(switchs_names[i+1])160 elif(i==len(switchs_names)-1):161 in_port=str(switchs_names[i])+str(switchs_names[i-1])162 out_port=path[-1]163 else:164 in_port=str(switchs_names[i])+str(switchs_names[i-1])165 out_port=str(switchs_names[i])+str(switchs_names[i+1])166 for j in switch_info:167 if(switch_info[j]=='b'+str(switchs_names[i])):168 odl_id=j169 lista_source=[]170 lista_destination=[]171 lista_source.append(path[0]-100)172 lista_sw_source="openflow:"+str(path[1])173 lista_destination.append(path[-1]-100)174 lista_sw_destination="openflow:"+str(path[-2])175 url = "http://127.0.0.1:5000/addFlowIP"176 data={"Switch":"openflow:"+str(odl_id),"in-port":str(in_port), "out-port":str(out_port), "ipv4-src":"192.168.2."+str(ip_src), "ipv4-dst":"192.168.2."+str(ip_dst)}177 print("add")178 print(data)179 res_ = requests.post(url, json=data)180 if res_.ok:181 print("Done")182 else:183 count=0184 url = "http://127.0.0.1:5000/addFlowArp"185 data={"Switch_src":str(lista_sw_source),"Switch_dst":str(lista_sw_destination),"HostSource":lista_source, "HostDestination":lista_destination}186 #data = json.dumps(data, indent = 4)187 print(data)188 res_ = requests.post(url, json=data)189 if res_.ok:190 print("Done")191 else:192 count=0193 if host in list_backup_host:194 try:195 list_backup_host[host].append(new_switch)196 except:197 list_backup_host[host]=new_switch198 else:199 list_backup_host[host] = new_switch200 print(list_of_list_hosts)201 except Exception as e:202 print("Error in SQL:\n", e)203 finally:204 db.close()205 else:206 try:207 if(key=='openflow:'+switch):208 try:209 if(list_backup_host):210 print("Host up again")211 print(list_of_list_hosts)212 #Se o host voltar a estar ativo substituir213 url = "http://127.0.0.1:5000/infoSW"214 res = requests.get(url)215 if(res.ok):216 switch_info = json.loads(res.content)217 #Delete FLOWS218 if(dict_hosts_down[host]):219 for i in range(0,len(dict_hosts_down[host])):220 path=dict_hosts_down[host][i]221 switchs_names=[]222 ip_src=path[0]-100223 ip_dst=path[-1]-100224 for i in range(0,len(path)):225 if(i!=0 and i!=len(path)-1):226 switch_name_src=switch_info[str(path[i])]227 switch_name_src=switch_name_src.split("b")[1]228 switchs_names.append(switch_name_src)229 #Insert new Flow230 for i in range(0,len(switchs_names)):231 if(i==0):232 in_port=path[0]233 out_port=str(switchs_names[i])+str(switchs_names[i+1])234 elif(i==len(switchs_names)-1):235 in_port=str(switchs_names[i])+str(switchs_names[i-1])236 out_port=path[-1]237 else:238 in_port=str(switchs_names[i])+str(switchs_names[i-1])239 out_port=str(switchs_names[i])+str(switchs_names[i+1])240 for j in switch_info:241 if(switch_info[j]=='b'+str(switchs_names[i])):242 odl_id=j243 url = "http://127.0.0.1:5000/deleteFlowIPV4"244 data={"Switch":"openflow:"+str(odl_id),"in-port":str(in_port), "out-port":str(out_port), "ipv4-src":"192.168.2."+str(ip_src), "ipv4-dst":"192.168.2."+str(ip_dst)}245 print("delete")246 print(data)247 res_ = requests.post(url, json=data)248 if res_.ok:249 print("Done")250 else:251 count=0252 #Add Flow IPV4253 try:254 db = getMysqlConnection()255 sqlstr = "SELECT Switch_src, Switch_dst,Host_src,Host_dst from service_active where Host_src='"+host+"' or Host_dst='"+host+"'"256 print(sqlstr)257 cur = db.cursor()258 cur.execute(sqlstr)259 output_json = cur.fetchall()260 print(output_json)261 for j in range(0,len(output_json)):262 list_hosts["Switch_src"] = output_json[j][0]263 list_hosts["Switch_dst"] = output_json[j][1]264 list_hosts["Host_src"] = output_json[j][2]265 list_hosts["Host_dst"] = output_json[j][3]266 path=[]267 path.append(int(list_hosts["Host_src"]))268 path.append(int(list_hosts["Switch_src"]))269 path.append(int(list_hosts["Switch_dst"]))270 path.append(int(list_hosts["Host_dst"]))271 switchs_names=[]272 ip_src=path[0]-100273 ip_dst=path[-1]-100274 for i in range(0,len(path)):275 if(i!=0 and i!=len(path)-1):276 switch_name_src=switch_info[str(path[i])]277 switch_name_src=switch_name_src.split("b")[1]278 switchs_names.append(switch_name_src)279 for i in range(0,len(switchs_names)):280 if(i==0):281 in_port=path[0]282 out_port=str(switchs_names[i])+str(switchs_names[i+1])283 elif(i==len(switchs_names)-1):284 in_port=str(switchs_names[i])+str(switchs_names[i-1])285 out_port=path[-1]286 else:287 in_port=str(switchs_names[i])+str(switchs_names[i-1])288 out_port=str(switchs_names[i])+str(switchs_names[i+1])289 for j in switch_info:290 if(switch_info[j]=='b'+str(switchs_names[i])):291 odl_id=j292 lista_source=[]293 lista_destination=[]294 lista_source.append(path[0]-100)295 lista_sw_source="openflow:"+str(path[1])296 lista_destination.append(path[-1]-100)297 lista_sw_destination="openflow:"+str(path[-2])298 url = "http://127.0.0.1:5000/addFlowIP"299 data={"Switch":"openflow:"+str(odl_id),"in-port":str(in_port), "out-port":str(out_port), "ipv4-src":"192.168.2."+str(ip_src), "ipv4-dst":"192.168.2."+str(ip_dst)}300 print("add")301 print(data)302 res_ = requests.post(url, json=data)303 if res_.ok:304 print("Done")305 else:306 count=0307 url = "http://127.0.0.1:5000/addFlowArp"308 data={"Switch_src":str(lista_sw_source),"Switch_dst":str(lista_sw_destination),"HostSource":lista_source, "HostDestination":lista_destination}309 #data = json.dumps(data, indent = 4)310 print(data)311 res_ = requests.post(url, json=data)312 if res_.ok:313 print("Done")314 else:315 count=0316 except Exception as e:317 print("Error in SQL:\n", e)318 finally:319 db.close()320 list_backup_host.clear()321 except:322 print("Host back again")323 except:...
TestInventory.py
Source:TestInventory.py
...43 #####################################44 ### Simple inventory format tests45 def test_simple(self):46 inventory = self.simple_inventory()47 hosts = inventory.list_hosts()48 self.assertEqual(sorted(hosts), sorted(self.all_simple_hosts))49 def test_simple_all(self):50 inventory = self.simple_inventory()51 hosts = inventory.list_hosts('all')52 self.assertEqual(sorted(hosts), sorted(self.all_simple_hosts))53 def test_get_hosts(self):54 inventory = Inventory('127.0.0.1,192.168.1.1')55 hosts = inventory.get_hosts('!10.0.0.1')56 hosts_all = inventory.get_hosts('all')57 self.assertEqual(sorted(hosts), sorted(hosts_all))58 def test_no_src(self):59 inventory = Inventory('127.0.0.1,')60 self.assertEqual(inventory.src(), None)61 def test_simple_norse(self):62 inventory = self.simple_inventory()63 hosts = inventory.list_hosts("norse")64 expected_hosts=['thor', 'odin', 'loki']65 assert sorted(hosts) == sorted(expected_hosts)66 def test_simple_ungrouped(self):67 inventory = self.simple_inventory()68 hosts = inventory.list_hosts("ungrouped")69 expected_hosts=['jupiter', 'saturn',70 'thrudgelmir0', 'thrudgelmir1', 'thrudgelmir2',71 'thrudgelmir3', 'thrudgelmir4', 'thrudgelmir5']72 assert sorted(hosts) == sorted(expected_hosts)73 def test_simple_combined(self):74 inventory = self.simple_inventory()75 hosts = inventory.list_hosts("norse:greek")76 expected_hosts=['zeus', 'hera', 'poseidon',77 'cerberus001','cerberus002','cerberus003',78 'cottus99','cottus100',79 'thor', 'odin', 'loki']80 assert sorted(hosts) == sorted(expected_hosts)81 def test_simple_restrict(self):82 inventory = self.simple_inventory()83 restricted_hosts = ['hera', 'poseidon', 'thor']84 expected_hosts=['zeus', 'hera', 'poseidon',85 'cerberus001','cerberus002','cerberus003',86 'cottus99', 'cottus100',87 'thor', 'odin', 'loki']88 inventory.restrict_to(restricted_hosts)89 hosts = inventory.list_hosts("norse:greek")90 print "Hosts=%s" % hosts91 print "Restricted=%s" % restricted_hosts92 assert sorted(hosts) == sorted(restricted_hosts)93 inventory.lift_restriction()94 hosts = inventory.list_hosts("norse:greek")95 print hosts96 print expected_hosts97 assert sorted(hosts) == sorted(expected_hosts)98 def test_simple_string_ipv4(self):99 inventory = Inventory('127.0.0.1,192.168.1.1')100 hosts = inventory.list_hosts()101 self.assertEqual(sorted(hosts), sorted(['127.0.0.1','192.168.1.1']))102 def test_simple_string_ipv4_port(self):103 inventory = Inventory('127.0.0.1:2222,192.168.1.1')104 hosts = inventory.list_hosts()105 self.assertEqual(sorted(hosts), sorted(['127.0.0.1','192.168.1.1']))106 def test_simple_string_ipv4_vars(self):107 inventory = Inventory('127.0.0.1:2222,192.168.1.1')108 var = inventory.get_variables('127.0.0.1')109 self.assertEqual(var['ansible_ssh_port'], 2222)110 def test_simple_string_ipv6(self):111 inventory = Inventory('FE80:EF45::12:1,192.168.1.1')112 hosts = inventory.list_hosts()113 self.assertEqual(sorted(hosts), sorted(['FE80:EF45::12:1','192.168.1.1']))114 def test_simple_string_ipv6_port(self):115 inventory = Inventory('[FE80:EF45::12:1]:2222,192.168.1.1')116 hosts = inventory.list_hosts()117 self.assertEqual(sorted(hosts), sorted(['FE80:EF45::12:1','192.168.1.1']))118 def test_simple_string_ipv6_vars(self):119 inventory = Inventory('[FE80:EF45::12:1]:2222,192.168.1.1')120 var = inventory.get_variables('FE80:EF45::12:1')121 self.assertEqual(var['ansible_ssh_port'], 2222)122 def test_simple_vars(self):123 inventory = self.simple_inventory()124 vars = inventory.get_variables('thor')125 print vars126 assert vars == {'group_names': ['norse'],127 'inventory_hostname': 'thor',128 'inventory_hostname_short': 'thor'}129 def test_simple_port(self):130 inventory = self.simple_inventory()131 vars = inventory.get_variables('hera')132 print vars133 expected = { 'ansible_ssh_port': 3000,134 'group_names': ['greek'],135 'inventory_hostname': 'hera',136 'inventory_hostname_short': 'hera' }137 print expected138 assert vars == expected139 def test_large_range(self):140 inventory = self.large_range_inventory()141 hosts = inventory.list_hosts()142 self.assertEqual(sorted(hosts), sorted('bob%03i' %i for i in range(0, 143)))143 def test_subset(self):144 inventory = self.simple_inventory()145 inventory.subset('odin;thor,loki')146 self.assertEqual(sorted(inventory.list_hosts()), sorted(['thor','odin','loki']))147 def test_subset_filename(self):148 inventory = self.simple_inventory()149 inventory.subset('@' + os.path.join(self.test_dir, 'restrict_pattern'))150 self.assertEqual(sorted(inventory.list_hosts()), sorted(['thor','odin']))151 @raises(errors.AnsibleError)152 def testinvalid_entry(self):153 Inventory('1234')154 ###################################################155 ### INI file advanced tests156 def test_complex_vars(self):157 inventory = self.complex_inventory()158 vars = inventory.get_variables('rtp_a')159 print vars160 expected = dict(161 a='1', b='2', c='3', d='10002', e='10003', f='10004 != 10005',162 g=' g ', h=' h ', i="' i \"", j='" j',163 rga='1', rgb='2', rgc='3',164 inventory_hostname='rtp_a', inventory_hostname_short='rtp_a',165 group_names=[ 'eastcoast', 'nc', 'redundantgroup', 'redundantgroup2', 'redundantgroup3', 'rtp', 'us' ]166 )167 print vars168 print expected169 assert vars == expected170 def test_complex_group_names(self):171 inventory = self.complex_inventory()172 tests = {173 'host1': [ 'role1', 'role3' ],174 'host2': [ 'role1', 'role2' ],175 'host3': [ 'role2', 'role3' ]176 }177 for host, roles in tests.iteritems():178 group_names = inventory.get_variables(host)['group_names']179 assert sorted(group_names) == sorted(roles)180 def test_complex_exclude(self):181 inventory = self.complex_inventory()182 hosts = inventory.list_hosts("nc:florida:!triangle:!orlando")183 expected_hosts = ['miami', 'rtp_a', 'rtp_b', 'rtp_c']184 print "HOSTS=%s" % sorted(hosts)185 print "EXPECTED=%s" % sorted(expected_hosts)186 assert sorted(hosts) == sorted(expected_hosts)187 def test_regex_exclude(self):188 inventory = self.complex_inventory()189 hosts = inventory.list_hosts("~rtp_[ac]")190 expected_hosts = ['rtp_a', 'rtp_c']191 print "HOSTS=%s" % sorted(hosts)192 print "EXPECTED=%s" % sorted(expected_hosts)193 assert sorted(hosts) == sorted(expected_hosts)194 def test_complex_enumeration(self):195 expected1 = ['rtp_a', 'rtp_b']196 expected2 = ['rtp_c', 'tri_a']197 expected3 = ['rtp_b', 'rtp_c', 'tri_a', 'tri_b', 'tri_c']198 expected4 = ['orlando', 'rtp_c', 'tri_a']199 inventory = self.complex_inventory()200 print "ALL NC=%s" % inventory.list_hosts("nc")201 # use "-1" instead of 0-1 to test the syntax, on purpose202 hosts = inventory.list_hosts("nc[-1]")203 self.compare(hosts, expected1, sort=False)204 hosts = inventory.list_hosts("nc[2-3]")205 self.compare(hosts, expected2, sort=False)206 hosts = inventory.list_hosts("nc[1-99999]")207 self.compare(hosts, expected3, sort=False)208 hosts = inventory.list_hosts("nc[2-3]:florida[1-2]")209 self.compare(hosts, expected4, sort=False)210 def test_complex_intersect(self):211 inventory = self.complex_inventory()212 hosts = inventory.list_hosts("nc:&redundantgroup:!rtp_c")213 self.compare(hosts, ['rtp_a'])214 hosts = inventory.list_hosts("nc:&triangle:!tri_c")215 self.compare(hosts, ['tri_a', 'tri_b'])216 @raises(errors.AnsibleError)217 def test_invalid_range(self):218 Inventory(os.path.join(self.test_dir, 'inventory','test_incorrect_range'))219 @raises(errors.AnsibleError)220 def test_missing_end(self):221 Inventory(os.path.join(self.test_dir, 'inventory','test_missing_end'))222 @raises(errors.AnsibleError)223 def test_incorrect_format(self):224 Inventory(os.path.join(self.test_dir, 'inventory','test_incorrect_format'))225 @raises(errors.AnsibleError)226 def test_alpha_end_before_beg(self):227 Inventory(os.path.join(self.test_dir, 'inventory','test_alpha_end_before_beg'))228 def test_combined_range(self):229 i = Inventory(os.path.join(self.test_dir, 'inventory','test_combined_range'))230 hosts = i.list_hosts('test')231 expected_hosts=['host1A','host2A','host1B','host2B']232 assert sorted(hosts) == sorted(expected_hosts)233 ###################################################234 ### Inventory API tests235 def test_script(self):236 inventory = self.script_inventory()237 hosts = inventory.list_hosts()238 expected_hosts=['jupiter', 'saturn', 'zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']239 print "Expected: %s"%(expected_hosts)240 print "Got : %s"%(hosts)241 assert sorted(hosts) == sorted(expected_hosts)242 def test_script_all(self):243 inventory = self.script_inventory()244 hosts = inventory.list_hosts('all')245 expected_hosts=['jupiter', 'saturn', 'zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']246 assert sorted(hosts) == sorted(expected_hosts)247 def test_script_norse(self):248 inventory = self.script_inventory()249 hosts = inventory.list_hosts("norse")250 expected_hosts=['thor', 'odin', 'loki']251 assert sorted(hosts) == sorted(expected_hosts)252 def test_script_combined(self):253 inventory = self.script_inventory()254 hosts = inventory.list_hosts("norse:greek")255 expected_hosts=['zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']256 assert sorted(hosts) == sorted(expected_hosts)257 def test_script_restrict(self):258 inventory = self.script_inventory()259 restricted_hosts = ['hera', 'poseidon', 'thor']260 expected_hosts=['zeus', 'hera', 'poseidon', 'thor', 'odin', 'loki']261 inventory.restrict_to(restricted_hosts)262 hosts = inventory.list_hosts("norse:greek")263 assert sorted(hosts) == sorted(restricted_hosts)264 inventory.lift_restriction()265 hosts = inventory.list_hosts("norse:greek")266 assert sorted(hosts) == sorted(expected_hosts)267 def test_script_vars(self):268 inventory = self.script_inventory()269 vars = inventory.get_variables('thor')270 print "VARS=%s" % vars271 assert vars == {'hammer':True,272 'group_names': ['norse'],273 'inventory_hostname': 'thor',274 'inventory_hostname_short': 'thor'}275 def test_hosts_list(self):276 # Test the case when playbook 'hosts' var is a list.277 inventory = self.script_inventory()278 host_names = sorted(['thor', 'loki', 'odin']) # Not sure if sorting is in the contract or not279 actual_hosts = inventory.get_hosts(host_names)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!