Best Python code snippet using localstack_python
tests_int_access_control.py
Source:tests_int_access_control.py
...19 def test_get_by_id_owner_with_read_access_returns_data(20 self, get_all_workspaces_with_read_access_by_user21 ):22 data_id = self.fixture.data_collection[fixture_data.USER_1_WORKSPACE_1].id23 mock_user = _create_user("1")24 get_all_workspaces_with_read_access_by_user.return_value = [25 fixture_data.workspace_126 ]27 data = data_api.get_by_id(data_id, mock_user)28 self.assertTrue(isinstance(data, Data))29 @patch(30 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"31 )32 def test_get_by_id_owner_without_read_access_returns_data(33 self, get_all_workspaces_with_read_access_by_user34 ):35 data_id = self.fixture.data_collection[fixture_data.USER_1_WORKSPACE_1].id36 mock_user = _create_user("1")37 get_all_workspaces_with_read_access_by_user.return_value = []38 with self.assertRaises(AccessControlError):39 data_api.get_by_id(data_id, mock_user)40 @patch(41 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"42 )43 def test_get_by_id_owner_without_read_access_returns_data(44 self, get_all_workspaces_with_read_access_by_user45 ):46 data_id = self.fixture.data_collection[fixture_data.USER_1_WORKSPACE_1].id47 mock_user = _create_user("1")48 get_all_workspaces_with_read_access_by_user.return_value = []49 data = data_api.get_by_id(data_id, mock_user)50 self.assertTrue(isinstance(data, Data))51 @patch(52 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"53 )54 def test_get_by_id_user_without_read_access_raises_error(55 self, get_all_workspaces_with_read_access_by_user56 ):57 data_id = self.fixture.data_collection[fixture_data.USER_1_WORKSPACE_1].id58 mock_user = _create_user("2")59 get_all_workspaces_with_read_access_by_user.return_value = []60 with self.assertRaises(AccessControlError):61 data_api.get_by_id(data_id, mock_user)62 def test_get_by_id_owner_no_workspace_read_access_returns_data(self):63 data_id = self.fixture.data_collection[fixture_data.USER_1_NO_WORKSPACE].id64 mock_user = _create_user("1")65 data = data_api.get_by_id(data_id, mock_user)66 self.assertTrue(isinstance(data, Data))67 def test_get_by_id_not_owner_no_workspace_raises_error(self):68 data_id = self.fixture.data_collection[fixture_data.USER_1_NO_WORKSPACE].id69 mock_user = _create_user("2")70 with self.assertRaises(AccessControlError):71 data_api.get_by_id(data_id, mock_user)72class TestDataGetAll(MongoIntegrationBaseTestCase):73 fixture = fixture_data74 def test_get_all_as_superuser_returns_all_data(self):75 mock_user = _create_user("1", is_superuser=True)76 data_list = data_api.get_all(mock_user)77 self.assertTrue(len(data_list) == len(self.fixture.data_collection))78 def test_get_all_as_user_raises_error(self):79 mock_user = _create_user("1")80 with self.assertRaises(AccessControlError):81 data_api.get_all(mock_user)82class TestDataGetAllByUser(MongoIntegrationBaseTestCase):83 fixture = fixture_data84 @patch(85 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"86 )87 def test_get_all_returns_data(self, get_all_workspaces_with_read_access_by_user):88 mock_user = _create_user("1")89 get_all_workspaces_with_read_access_by_user.return_value = []90 with self.assertRaises(AccessControlError):91 data_api.get_all_except_user(mock_user)92 @patch(93 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"94 )95 def test_get_all_by_user_returns_owned_data(96 self, get_all_workspaces_with_read_access_by_user97 ):98 mock_user = _create_user("1")99 data_list = data_api.get_all_by_user(mock_user)100 get_all_workspaces_with_read_access_by_user.return_value = []101 self.assertTrue(len(data_list) == 3)102 self.assertTrue(data.id == "1" for data in data_list)103 @patch(104 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"105 )106 def test_get_all_by_user_returns_no_data_if_owns_zero(107 self, get_all_workspaces_with_read_access_by_user108 ):109 mock_user = _create_user("3")110 data_list = data_api.get_all_by_user(mock_user)111 get_all_workspaces_with_read_access_by_user.return_value = []112 self.assertTrue(len(data_list) == 0)113 def test_get_all_by_user_as_superuser_returns_own_data(self):114 mock_user = _create_user("1", is_superuser=True)115 data_list = data_api.get_all_by_user(mock_user)116 self.assertTrue(len(data_list) == 3)117 self.assertTrue(data.user_id == "1" for data in data_list)118class TestDataGetAllExceptUser(MongoIntegrationBaseTestCase):119 # NOTE: Will always fail when private data are present (data.workspace=None, data.user_id!=user.id)120 fixture = fixture_data121 @patch(122 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"123 )124 def test_get_all_except_user_raises_error_if_no_workspace_access(125 self, get_all_workspaces_with_read_access_by_user126 ):127 mock_user = _create_user("1")128 get_all_workspaces_with_read_access_by_user.return_value = []129 with self.assertRaises(AccessControlError):130 data_api.get_all_except_user(mock_user)131 @patch(132 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"133 )134 def test_get_all_except_user_raises_error_data_if_workspace_access(135 self, get_all_workspaces_with_read_access_by_user136 ):137 mock_user = _create_user("1")138 get_all_workspaces_with_read_access_by_user.return_value = [139 fixture_data.workspace_1,140 fixture_data.workspace_2,141 ]142 with self.assertRaises(AccessControlError):143 data_api.get_all_except_user(mock_user)144 def test_get_all_except_user_as_superuser_returns_others_data(self):145 mock_user = _create_user("1", is_superuser=True)146 data_list = data_api.get_all_except_user(mock_user)147 self.assertTrue(len(data_list) > 0)148 self.assertTrue(data.user_id != mock_user.id for data in data_list)149class TestDataUpsert(MongoIntegrationBaseTestCase):150 # TODO: can not test without mock for GridFS151 pass152class TestDataExecuteQuery(MongoIntegrationBaseTestCase):153 fixture = fixture_data154 @patch(155 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"156 )157 def test_execute_query_returns_data(158 self, get_all_workspaces_with_read_access_by_user159 ):160 mock_user = _create_user("3")161 get_all_workspaces_with_read_access_by_user.return_value = [162 fixture_data.workspace_1163 ]164 data_list = data_api.execute_query({}, mock_user)165 self.assertTrue(len(data_list) > 0)166 self.assertTrue(all(isinstance(data, Data) for data in data_list))167 @patch(168 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"169 )170 def test_execute_query_returns_data_in_workspace_1(171 self, get_all_workspaces_with_read_access_by_user172 ):173 mock_user = _create_user("3")174 get_all_workspaces_with_read_access_by_user.return_value = [175 fixture_data.workspace_1176 ]177 data_list = data_api.execute_query({}, mock_user)178 self.assertTrue(len(data_list) == 2)179 self.assertTrue(data.workspace == "1" for data in data_list)180 @patch(181 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"182 )183 def test_execute_query_returns_data_in_workspace_2(184 self, get_all_workspaces_with_read_access_by_user185 ):186 mock_user = _create_user("3")187 get_all_workspaces_with_read_access_by_user.return_value = [188 fixture_data.workspace_1189 ]190 data_list = data_api.execute_query({}, mock_user)191 self.assertTrue(len(data_list) == 2)192 self.assertTrue(data.workspace == "2" for data in data_list)193 @patch(194 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"195 )196 def test_execute_query_returns_data_in_workspace_1_and_2(197 self, get_all_workspaces_with_read_access_by_user198 ):199 mock_user = _create_user("3")200 get_all_workspaces_with_read_access_by_user.return_value = [201 fixture_data.workspace_1,202 fixture_data.workspace_2,203 ]204 data_list = data_api.execute_query({}, mock_user)205 self.assertTrue(len(data_list) == 3)206 self.assertTrue(207 data.workspace == "1" or data.workspace == "2" for data in data_list208 )209 @patch(210 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"211 )212 def test_execute_query_force_workspace_1_returns_data_from_workspace_1(213 self, get_all_workspaces_with_read_access_by_user214 ):215 mock_user = _create_user("3")216 get_all_workspaces_with_read_access_by_user.return_value = [217 fixture_data.workspace_1218 ]219 data_list = data_api.execute_query(220 {"workspace": fixture_data.workspace_1.id}, mock_user221 )222 self.assertTrue(len(data_list) == 2)223 self.assertTrue(data.workspace == "1" for data in data_list)224 @patch(225 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"226 )227 def test_execute_query_force_workspace_1_does_not_return_data_if_no_access(228 self, get_all_workspaces_with_read_access_by_user229 ):230 mock_user = _create_user("3")231 get_all_workspaces_with_read_access_by_user.return_value = []232 data_list = data_api.execute_query(233 {"workspace": fixture_data.workspace_1.id}, mock_user234 )235 self.assertTrue(len(data_list) == 0)236 @patch(237 "core_main_app.components.workspace.api.get_all_workspaces_with_read_access_by_user"238 )239 def test_execute_query_force_workspace_none_does_not_return_data_if_no_access(240 self, get_all_workspaces_with_read_access_by_user241 ):242 mock_user = _create_user("3")243 get_all_workspaces_with_read_access_by_user.return_value = []244 data_list = data_api.execute_query({"workspace": None}, mock_user)245 self.assertTrue(len(data_list) == 0)246 def test_execute_query_as_superuser_returns_all_data(self):247 mock_user = _create_user("1", is_superuser=True)248 data_list = data_api.execute_query({}, mock_user)249 self.assertTrue(len(data_list) == 5)250class TestDataDelete(MongoIntegrationBaseTestCase):251 fixture = fixture_data252 @unittest.skip("GridFS not supported by mongomock")253 @patch(254 "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"255 )256 def test_delete_own_data_in_accessible_workspace_deletes_data(257 self, get_all_workspaces_with_write_access_by_user258 ):259 mock_user = _create_user("1")260 get_all_workspaces_with_write_access_by_user.return_value = [261 fixture_data.workspace_1262 ]263 data_api.delete(264 fixture_data.data_collection[fixture_data.USER_1_WORKSPACE_1], mock_user265 )266 @unittest.skip("GridFS not supported by mongomock")267 @patch(268 "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"269 )270 def test_delete_own_data_in_not_accessible_workspace_deletes_data(271 self, get_all_workspaces_with_write_access_by_user272 ):273 mock_user = _create_user("1")274 get_all_workspaces_with_write_access_by_user.return_value = []275 data_api.delete(276 fixture_data.data_collection[fixture_data.USER_1_WORKSPACE_1], mock_user277 )278 @unittest.skip("GridFS not supported by mongomock")279 @patch(280 "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"281 )282 def test_delete_others_data_in_accessible_workspace_deletes_data(283 self, get_all_workspaces_with_write_access_by_user284 ):285 mock_user = _create_user("1")286 get_all_workspaces_with_write_access_by_user.return_value = [287 fixture_data.workspace_2288 ]289 data_api.delete(290 fixture_data.data_collection[fixture_data.USER_2_WORKSPACE_2], mock_user291 )292 @patch(293 "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"294 )295 def test_delete_others_data_not_accessible_workspace_raises_error(296 self, get_all_workspaces_with_write_access_by_user297 ):298 mock_user = _create_user("1")299 get_all_workspaces_with_write_access_by_user.return_value = [300 fixture_data.workspace_1301 ]302 with self.assertRaises(AccessControlError):303 data_api.delete(304 fixture_data.data_collection[fixture_data.USER_2_WORKSPACE_2], mock_user305 )306 @unittest.skip("GridFS not supported by mongomock")307 @patch(308 "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"309 )310 def test_delete_own_data_not_in_workspace_deletes_data(311 self, get_all_workspaces_with_write_access_by_user312 ):313 mock_user = _create_user("1")314 get_all_workspaces_with_write_access_by_user.return_value = []315 with self.assertRaises(AccessControlError):316 data_api.delete(317 fixture_data.data_collection[fixture_data.USER_1_WORKSPACE_1], mock_user318 )319 @patch(320 "core_main_app.components.workspace.api.get_all_workspaces_with_write_access_by_user"321 )322 def test_delete_others_data_not_in_workspace_raises_error(323 self, get_all_workspaces_with_write_access_by_user324 ):325 mock_user = _create_user("1")326 get_all_workspaces_with_write_access_by_user.return_value = []327 with self.assertRaises(AccessControlError):328 data_api.delete(329 fixture_data.data_collection[fixture_data.USER_2_NO_WORKSPACE],330 mock_user,331 )332class TestDataChangeOwner(MongoIntegrationBaseTestCase):333 fixture = fixture_data334 def test_change_owner_from_owner_to_owner_ok(self):335 mock_owner = _create_user("1")336 data_api.change_owner(337 document=fixture_data.data_collection[fixture_data.USER_1_NO_WORKSPACE],338 new_user=mock_owner,339 user=mock_owner,340 )341 def test_change_owner_from_owner_to_user_ok(self):342 mock_owner = _create_user("1")343 mock_user = _create_user("2")344 data_api.change_owner(345 document=fixture_data.data_collection[fixture_data.USER_1_NO_WORKSPACE],346 new_user=mock_user,347 user=mock_owner,348 )349 def test_change_owner_from_user_to_user_raises_exception(self):350 mock_owner = _create_user("1")351 mock_user = _create_user("2")352 with self.assertRaises(AccessControlError):353 data_api.change_owner(354 document=fixture_data.data_collection[fixture_data.USER_1_NO_WORKSPACE],355 new_user=mock_owner,356 user=mock_user,357 )358 def test_change_owner_as_superuser_ok(self):359 mock_user = _create_user("2", is_superuser=True)360 data_api.change_owner(361 document=fixture_data.data_collection[fixture_data.USER_1_NO_WORKSPACE],362 new_user=mock_user,363 user=mock_user,364 )365def _create_user(user_id, is_superuser=False):...
Export_SDDC_config.py
Source:Export_SDDC_config.py
1### Package Imports ####2import requests3import json4import argparse5### Ready arguments from command line ###6parser = argparse.ArgumentParser(description='Export user created NSX-T Firewall rules and objects for a given VMC SDDC.')7parser.add_argument('orgid')8parser.add_argument('sddcid')9parser.add_argument('refreshtoken')10args = parser.parse_args()11### Access Token ###12authurl = 'https://console.cloud.vmware.com/csp/gateway/am/api/auth/api-tokens/authorize?refresh_token=%s' %(args.refreshtoken)13headers = {'Accept': 'application/json'}14payload = {}15authresp = requests.post(authurl,headers=headers,data=payload)16authjson = json.loads(authresp.text)17token = authjson["access_token"]18### Get ReverseProxy URL ###19infourl = 'https://vmc.vmware.com/vmc/api/orgs/%s/sddcs/%s' %(args.orgid,args.sddcid)20headers = {'csp-auth-token': token, 'content-type': 'application/json'}21payload = {}22sddcresp = requests.get(infourl,headers=headers,data=payload)23sddcjson = json.loads(sddcresp.text)24srevproxyurl = sddcjson["resource_config"]["nsx_api_public_endpoint_url"]25curCursor = ''26pageSize = 100027### Source SDDC URL's ###28smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)29scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)30scgwurl = '%s/policy/api/v1/infra/domains/cgw/gateway-policies/default/rules' %(srevproxyurl)31smgwurl = '%s/policy/api/v1/infra/domains/mgw/gateway-policies/default/rules' %(srevproxyurl)32sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)33sdfwurl = '%s/policy/api/v1/infra/domains/cgw/communication-maps' %(srevproxyurl)34ikeprofurl = '%s/policy/api/v1/infra/ipsec-vpn-ike-profiles' %(srevproxyurl)35tunnelprofurl = '%s/policy/api/v1/infra/ipsec-vpn-tunnel-profiles' %(srevproxyurl)36bgpneighborurl = '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/bgp/neighbors' %(srevproxyurl)37l3vpnsessionurl = '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/ipsec-vpn-services/default/sessions' %(srevproxyurl)38l2vpnsessionurl = '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/l2vpn-services/default/sessions' %(srevproxyurl)39headers = {'csp-auth-token': token, 'content-type': 'application/json'}40sfwDump = open("sourceRules.json", "a+")41### Get Source MGW Groups ###42print("{\"MGWGroups\": [")43mgroupsresp = requests.get(smgwgroupsurl,headers=headers)44mg = json.loads(mgroupsresp.text)45mgroups = mg["results"]46if mg["result_count"] > pageSize:47 curCursor = mg["cursor"]48 smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)49 while "cursor" in mg:50 mgroupsresp = requests.get(smgwgroupsurl,headers=headers)51 mg = json.loads(mgroupsresp.text)52 mgroups = mg["results"]53 if "cursor" in mg:54 curCursor = mg["cursor"]55 smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)56 ### Filter out system groups ###57 index = 058 count = 059 for group in mgroups:60 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":61 count = count + 162 for group in mgroups:63 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":64 #print("Index..... Item Value: "+repr(index),mgroups[index])65 print(json.dumps(group,indent=4),end = "")66 if(count-1 > index):67 print(",")68 index = index + 169index = 070count = 071for group in mgroups:72 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":73 count = count + 174for group in mgroups:75 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":76 #print("Index..... Item Value: "+repr(index),mgroups[index])77 print(json.dumps(group,indent=4),end="")78 if(count-1 > index):79 print(",")80 index = index + 181print("],")82### Get Source CGW Groups ###83cgroupsresp = requests.get(scgwgroupsurl,headers=headers)84cg = json.loads(cgroupsresp.text)85cgroups = cg["results"]86### Filter out system groups ###87print("\"CGWGroups\": [")88if cg["result_count"] > pageSize:89 curCursor = cg["cursor"]90 scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)91 while "cursor" in cg:92 cgroupsresp = requests.get(scgwgroupsurl,headers=headers)93 cg = json.loads(cgroupsresp.text)94 cgroups = cg["results"]95 if "cursor" in cg:96 curCursor = cg["cursor"]97 scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)98 ### Filter out system groups ###99 index = 0100 count = 0101 for group in cgroups:102 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":103 count = count + 1104 for group in cgroups:105 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":106 print(json.dumps(group,indent=4),end="")107 if(count-1 > index):108 print(",")109 index = index + 1110index = 0111count = 0112for group in cgroups:113 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":114 count = count + 1115for group in cgroups:116 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":117 print(json.dumps(group,indent=4),end="")118 if(count-1 > index):119 print(",")120 index = index + 1121print("],")122### Get Source SDDC Firewall Services ###123servicesresp = requests.get(sservicesurl,headers=headers)124srv = json.loads(servicesresp.text)125services = srv["results"]126user_service_count = 0127### Filter out system Services ###128for service in services:129 if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":130 user_service_count = user_service_count + 1131#print("Result Count for Services: "+repr(user_service_count))132if(user_service_count > 0):133 print("\"Services\": [")134if srv["result_count"] > pageSize:135 curCursor = srv["cursor"]136 sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)137 while "cursor" in srv:138 servicesresp = requests.get(sservicesurl,headers=headers)139 srv = json.loads(servicesresp.text)140 services = srv["results"]141 if "cursor" in srv:142 curCursor = srv["cursor"]143 sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)144 ### Filter out system services ###145 index = 0146 count = 0147 for service in services:148 if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":149 count = count + 1150 for service in services:151 if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":152 print(json.dumps(service,indent=4),end="")153 if(count-1 > index):154 print(",")155 index = index + 1156index = 0157count = 0158for service in services:159 if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":160 count = count + 1 161for service in services:162 if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":163 print(json.dumps(service,indent=4))164 if(count-1 > index):165 print(",")166 index = index + 1167if(user_service_count > 0):168 print("],")169### Get Management Gateway Firewall Rules ###170mgwresponse = requests.get(smgwurl,headers=headers)171m = json.loads(mgwresponse.text)172mgwrules = m["results"]173### Filter out system Rules ###174curCursor = ''175print("\"MGWRules\": [")176index = 0177count = 0178for rule in mgwrules:179 if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":180 count = count + 1181for rule in mgwrules:182 if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":183 print(json.dumps(rule,indent=4),end="")184 if(count-1 > index):185 print(",")186 index = index + 1187print("],")188### Get Compute Gateway Firewall Rules ###189cgwresponse = requests.get(scgwurl,headers=headers)190c = json.loads(cgwresponse.text)191cgwrules = c["results"]192### Filter out system Rules ###193print("\"CGWRules\": [")194index = 0195count = 0196for rule in cgwrules:197 if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":198 count = count + 1199for rule in cgwrules:200 if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":201 print(json.dumps(rule,indent=4),end="")202 if(count-1 > index):203 print(",")204 index = index + 1205print("],")206### Get Source Distributed Firewall Rules ###207#print("DFW URL: "+sdfwurl)208dfwresponse = requests.get(sdfwurl,headers=headers)209d = json.loads(dfwresponse.text)210#print('DFW Comms Map: '+str(d))211cmaps = d["results"]212print("\"DFWRules\": [")213index = 0214count = 0215for cmap in cmaps:216 requrl = "%s/%s" %(sdfwurl,cmap["id"])217 cmapDetails = requests.get(requrl,headers=headers)218 count = count + 1219for cmap in cmaps:220 requrl = "%s/%s" %(sdfwurl,cmap["id"])221 cmapDetails = requests.get(requrl,headers=headers)222 cmapd = json.loads(cmapDetails.text)223 print(cmapDetails.text,end="")224 if(count-1 > index):225 print(",")226 index = index + 1227print("],")228 229### Get VPN IKE Profiles ###230ikeprofresponse = requests.get(ikeprofurl,headers=headers)231ikep = json.loads(ikeprofresponse.text)232ikeprofiles = ikep["results"]233### Filter out system profiles ###234curCursor = ''235print("\"IKEProfiles\": [")236index = 0237count = 0238for ikeprofile in ikeprofiles:239 if ikeprofile["_create_user"]!= "admin" and ikeprofile["_create_user"]!="admin;admin" and ikeprofile["_create_user"]!="system":240 count = count + 1241for ikeprofile in ikeprofiles:242 if ikeprofile["_create_user"]!= "admin" and ikeprofile["_create_user"]!="admin;admin" and ikeprofile["_create_user"]!="system":243 print(json.dumps(ikeprofile,indent=4),end="")244 if(count-1 > index):245 print(",")246 index = index + 1247print("],")248 249### Get VPN Tunnel Profiles ###250tunprofresponse = requests.get(tunnelprofurl,headers=headers)251tunp = json.loads(tunprofresponse.text)252tunprofiles = tunp["results"]253### Filter out system profiles ###254curCursor = ''255print("\"TunnelProfiles\": [")256index = 0257count = 0258for tunprofile in tunprofiles:259 if tunprofile["_create_user"]!= "admin" and tunprofile["_create_user"]!="admin;admin" and tunprofile["_create_user"]!="system":260 count = count + 1261for tunprofile in tunprofiles:262 if tunprofile["_create_user"]!= "admin" and tunprofile["_create_user"]!="admin;admin" and tunprofile["_create_user"]!="system":263 print(json.dumps(tunprofile,indent=4),end="")264 if(count-1 > index):265 print(",")266 index = index + 1267print("],")268### Get BGP Neighbors for Route Based VPN's ###269bgpnresponse = requests.get(bgpneighborurl,headers=headers)270bgn = json.loads(bgpnresponse.text)271bgpns = bgn["results"]272### Filter out system BGP Neighbors ###273curCursor = ''274print("\"BGPNeighbors\": [")275index = 0276count = 0277for bgpn in bgpns:278 if bgpn["_create_user"]!= "admin" and bgpn["_create_user"]!="admin;admin" and bgpn["_create_user"]!="system":279 count = count + 1280for bgpn in bgpns:281 if bgpn["_create_user"]!= "admin" and bgpn["_create_user"]!="admin;admin" and bgpn["_create_user"]!="system":282 print(json.dumps(bgpn,indent=4),end="")283 if(count-1 > index):284 print(",")285 index = index + 1286print("],")287### Get L2VPN Sessions ###288l2vpnsresponse = requests.get(l2vpnsessionurl,headers=headers)289l2v = json.loads(l2vpnsresponse.text)290l2vpns = l2v["results"]291### Filter out system profiles ###292curCursor = ''293print("\"L2VPNSessions\": [")294index = 0295count = 0296for l2vpn in l2vpns:297 if l2vpn["_create_user"]!= "admin" and l2vpn["_create_user"]!="admin;admin" and l2vpn["_create_user"]!="system":298 count = count + 1299for l2vpn in l2vpns:300 if l2vpn["_create_user"]!= "admin" and l2vpn["_create_user"]!="admin;admin" and l2vpn["_create_user"]!="system":301 print(json.dumps(l2vpn,indent=4),end="")302 if(count-1 > index):303 print(",")304 index = index + 1305print("],")306### Get L3VPN Sessions ###307l3vpnsresponse = requests.get(l3vpnsessionurl,headers=headers)308l3v = json.loads(l3vpnsresponse.text)309l3vpns = l3v["results"]310### Filter out system profiles ###311curCursor = ''312print("\"L3VPNSessions\": [")313index = 0314count = 0315for l3vpn in l3vpns:316 if l3vpn["_create_user"]!= "admin" and l3vpn["_create_user"]!="admin;admin" and l3vpn["_create_user"]!="system":317 count = count + 1318for l3vpn in l3vpns:319 if l3vpn["_create_user"]!= "admin" and l3vpn["_create_user"]!="admin;admin" and l3vpn["_create_user"]!="system":320 session_id = l3vpn["id"]321 l3vpnsessionpskurl = '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/ipsec-vpn-services/default/sessions/%s?action=show_sensitive_data' %(srevproxyurl,session_id)322 l3vpnpskresponse = requests.get(l3vpnsessionpskurl,headers=headers)323 l3vpsk = json.loads(l3vpnpskresponse.text)324 l3vpn["psk"] = l3vpsk["psk"]325 print(json.dumps(l3vpn,indent=4),end="")326 if(count-1 > index):327 print(",")328 index = index + 1...
Export_NSX-T_FW_config_from_an_SDDC.py
Source:Export_NSX-T_FW_config_from_an_SDDC.py
1### Package Imports ####2import requests3import json4import argparse5### Ready arguments from command line ###6parser = argparse.ArgumentParser(description='Export user created NSX-T Firewall rules and objects for a given VMC SDDC.')7parser.add_argument('orgid')8parser.add_argument('sddcid')9parser.add_argument('refreshtoken')10args = parser.parse_args()11### Access Token ###12authurl = 'https://console.cloud.vmware.com/csp/gateway/am/api/auth/api-tokens/authorize?refresh_token=%s' %(args.refreshtoken)13headers = {'Accept': 'application/json'}14payload = {}15authresp = requests.post(authurl,headers=headers,data=payload)16authjson = json.loads(authresp.text)17token = authjson["access_token"]18### Get ReverseProxy URL ###19infourl = 'https://vmc.vmware.com/vmc/api/orgs/%s/sddcs/%s' %(args.orgid,args.sddcid)20headers = {'csp-auth-token': token, 'content-type': 'application/json'}21payload = {}22sddcresp = requests.get(infourl,headers=headers,data=payload)23sddcjson = json.loads(sddcresp.text)24srevproxyurl = sddcjson["resource_config"]["nsx_api_public_endpoint_url"]25curCursor = ''26pageSize = 100027### Source SDDC URL's ###28smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)29scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)30scgwurl = '%s/policy/api/v1/infra/domains/cgw/gateway-policies/default/rules' %(srevproxyurl)31smgwurl = '%s/policy/api/v1/infra/domains/mgw/gateway-policies/default/rules' %(srevproxyurl)32sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)33sdfwurl = '%s/policy/api/v1/infra/domains/cgw/communication-maps' %(srevproxyurl)34ikeprofurl = '%s/policy/api/v1/infra/ipsec-vpn-ike-profiles' %(srevproxyurl)35tunnelprofurl = '%s/policy/api/v1/infra/ipsec-vpn-tunnel-profiles' %(srevproxyurl)36bgpneighborurl = '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/bgp/neighbors' %(srevproxyurl)37l3vpnsessionurl = '%s/policy/api/v1/infra/tier-0s/vmc/locale-services/default/ipsec-vpn-services/default/sessions' %(srevproxyurl)38headers = {'csp-auth-token': token, 'content-type': 'application/json'}39sfwDump = open("sourceRules.json", "a+")40### Get Source MGW Groups ###41print("MGW Groups")42mgroupsresp = requests.get(smgwgroupsurl,headers=headers)43mg = json.loads(mgroupsresp.text)44mgroups = mg["results"]45if mg["result_count"] > pageSize:46 curCursor = mg["cursor"]47 smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)48 while "cursor" in mg:49 mgroupsresp = requests.get(smgwgroupsurl,headers=headers)50 mg = json.loads(mgroupsresp.text)51 mgroups = mg["results"]52 if "cursor" in mg:53 curCursor = mg["cursor"]54 smgwgroupsurl = '%s/policy/api/v1/infra/domains/mgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)55 ### Filter out system groups ###56 for group in mgroups:57 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":58 print(json.dumps(group,indent=4))59for group in mgroups:60 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":61 print(json.dumps(group,indent=4))62### Get Source CGW Groups ###63cgroupsresp = requests.get(scgwgroupsurl,headers=headers)64cg = json.loads(cgroupsresp.text)65cgroups = cg["results"]66### Filter out system groups ###67print("CGW Groups")68if cg["result_count"] > pageSize:69 curCursor = cg["cursor"]70 scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)71 while "cursor" in cg:72 cgroupsresp = requests.get(scgwgroupsurl,headers=headers)73 cg = json.loads(cgroupsresp.text)74 cgroups = cg["results"]75 if "cursor" in cg:76 curCursor = cg["cursor"]77 scgwgroupsurl = '%s/policy/api/v1/infra/domains/cgw/groups?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)78 ### Filter out system groups ###79 for group in cgroups:80 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":81 print(json.dumps(group,indent=4))82for group in cgroups:83 if group["_create_user"]!= "admin" and group["_create_user"]!="admin;admin":84 print(json.dumps(group,indent=4))85### Get Source SDDC Firewall Services ###86servicesresp = requests.get(sservicesurl,headers=headers)87srv = json.loads(servicesresp.text)88services = srv["results"]89### Filter out system Services ###90print("Services")91if srv["result_count"] > pageSize:92 curCursor = srv["cursor"]93 sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)94 while "cursor" in srv:95 servicesresp = requests.get(sservicesurl,headers=headers)96 srv = json.loads(servicesresp.text)97 services = srv["results"]98 if "cursor" in srv:99 curCursor = srv["cursor"]100 sservicesurl = '%s/policy/api/v1/infra/services?page_size=%s&cursor=%s' %(srevproxyurl,pageSize,curCursor)101 ### Filter out system services ###102 for service in services:103 if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":104 print(json.dumps(service,indent=4))105for service in services:106 if service["_create_user"]!= "admin" and service["_create_user"]!="admin;admin" and service["_create_user"]!="system":107 print(json.dumps(service,indent=4))108### Get Management Gateway Firewall Rules ###109mgwresponse = requests.get(smgwurl,headers=headers)110m = json.loads(mgwresponse.text)111mgwrules = m["results"]112### Filter out system Rules ###113curCursor = ''114print("MGW Rules")115for rule in mgwrules:116 if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":117 print(json.dumps(rule,indent=4))118### Get Compute Gateway Firewall Rules ###119cgwresponse = requests.get(scgwurl,headers=headers)120c = json.loads(cgwresponse.text)121cgwrules = c["results"]122### Filter out system Rules ###123print("CGW Rules")124for rule in cgwrules:125 if rule["_create_user"]!= "admin" and rule["_create_user"]!="admin;admin" and rule["_create_user"]!="system":126 print(json.dumps(rule,indent=4))127### Get Source Distributed Firewall Rules ###128dfwresponse = requests.get(sdfwurl,headers=headers)129d = json.loads(dfwresponse.text)130#print('DFW Comms Map: ')131cmaps = d["results"]132print('Distributed Firewall Rules: ')133for cmap in cmaps:134 requrl = "%s/%s" %(sdfwurl,cmap["id"])135 cmapDetails = requests.get(requrl,headers=headers)136 cmapd = json.loads(cmapDetails.text)137 print(cmapDetails.text)138 139### Get VPN IKE Profiles ###140ikeprofresponse = requests.get(ikeprofurl,headers=headers)141ikep = json.loads(ikeprofresponse.text)142ikeprofiles = ikep["results"]143### Filter out system profiles ###144curCursor = ''145print("IKE Profiles")146for ikeprofile in ikeprofiles:147 if ikeprofile["_create_user"]!= "admin" and ikeprofile["_create_user"]!="admin;admin" and ikeprofile["_create_user"]!="system":148 print(json.dumps(ikeprofile,indent=4))149 150### Get VPN Tunnel Profiles ###151tunprofresponse = requests.get(tunnelprofurl,headers=headers)152tunp = json.loads(tunprofresponse.text)153tunprofiles = tunp["results"]154### Filter out system profiles ###155curCursor = ''156print("Tunnel Profiles")157for tunprofile in tunprofiles:158 if tunprofile["_create_user"]!= "admin" and tunprofile["_create_user"]!="admin;admin" and tunprofile["_create_user"]!="system":159 print(json.dumps(tunprofile,indent=4))160### Get BGP Neighbors for Route Based VPN's ###161bgpnresponse = requests.get(bgpneighborurl,headers=headers)162bgn = json.loads(bgpnresponse.text)163bgpns = bgn["results"]164### Filter out system BGP Neighbors ###165curCursor = ''166print("BGP Neighbors:")167for bgpn in bgpns:168 if bgpn["_create_user"]!= "admin" and bgpn["_create_user"]!="admin;admin" and bgpn["_create_user"]!="system":169 print(json.dumps(bgpn,indent=4))170### Get L3VPN Sessions ###171l3vpnsresponse = requests.get(l3vpnsessionurl,headers=headers)172l3v = json.loads(l3vpnsresponse.text)173l3vpns = l3v["results"]174### Filter out system profiles ###175curCursor = ''176print("L3VPN Sessions:")177for l3vpn in l3vpns:178 if l3vpn["_create_user"]!= "admin" and l3vpn["_create_user"]!="admin;admin" and l3vpn["_create_user"]!="system":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!