Best Python code snippet using tempest_python
test_security_groups_basic_ops.py
Source:test_security_groups_basic_ops.py
...215 self.assertEqual(216 sorted([s['name'] for s in security_groups]),217 sorted([s['name'] for s in server['security_groups']]))218 return server219 def _create_tenant_servers(self, tenant, num=1):220 for i in range(num):221 name = 'server-{tenant}-gen-{num}-'.format(222 tenant=tenant.creds.tenant_name,223 num=i224 )225 name = data_utils.rand_name(name)226 server = self._create_server(name, tenant)227 tenant.servers.append(server)228 def _set_access_point(self, tenant):229 """230 creates a server in a secgroup with rule allowing external ssh231 in order to access tenant internal network232 workaround ip namespace233 """234 secgroups = tenant.security_groups.values()235 name = 'server-{tenant}-access_point-'.format(236 tenant=tenant.creds.tenant_name)237 name = data_utils.rand_name(name)238 server = self._create_server(name, tenant,239 security_groups=secgroups)240 tenant.access_point = server241 self._assign_floating_ips(tenant, server)242 def _assign_floating_ips(self, tenant, server):243 public_network_id = CONF.network.public_network_id244 floating_ip = self._create_floating_ip(245 server, public_network_id,246 client=tenant.manager.network_client)247 self.floating_ips.setdefault(server['id'], floating_ip)248 def _create_tenant_network(self, tenant):249 network, subnet, router = self.create_networks(250 client=tenant.manager.network_client)251 tenant.set_network(network, subnet, router)252 def _set_compute_context(self, tenant):253 self.servers_client = tenant.manager.servers_client254 return self.servers_client255 def _deploy_tenant(self, tenant_or_id):256 """257 creates:258 network259 subnet260 router (if public not defined)261 access security group262 access-point server263 """264 if not isinstance(tenant_or_id, self.TenantProperties):265 tenant = self.tenants[tenant_or_id]266 else:267 tenant = tenant_or_id268 self._set_compute_context(tenant)269 self._create_tenant_keypairs(tenant)270 self._create_tenant_network(tenant)271 self._create_tenant_security_groups(tenant)272 self._set_access_point(tenant)273 def _get_server_ip(self, server, floating=False):274 """275 returns the ip (floating/internal) of a server276 """277 if floating:278 server_ip = self.floating_ips[server['id']].floating_ip_address279 else:280 server_ip = None281 network_name = self.tenants[server['tenant_id']].network.name282 if network_name in server['addresses']:283 server_ip = server['addresses'][network_name][0]['addr']284 return server_ip285 def _connect_to_access_point(self, tenant):286 """287 create ssh connection to tenant access point288 """289 access_point_ssh = \290 self.floating_ips[tenant.access_point['id']].floating_ip_address291 private_key = tenant.keypair['private_key']292 access_point_ssh = self._ssh_to_server(access_point_ssh,293 private_key=private_key)294 return access_point_ssh295 def _check_connectivity(self, access_point, ip, should_succeed=True):296 if should_succeed:297 msg = "Timed out waiting for %s to become reachable" % ip298 else:299 msg = "%s is reachable" % ip300 try:301 self.assertTrue(self._check_remote_connectivity(access_point, ip,302 should_succeed),303 msg)304 except test.exceptions.SSHTimeout:305 raise306 except Exception:307 debug.log_net_debug()308 raise309 def _test_in_tenant_block(self, tenant):310 access_point_ssh = self._connect_to_access_point(tenant)311 for server in tenant.servers:312 self._check_connectivity(access_point=access_point_ssh,313 ip=self._get_server_ip(server),314 should_succeed=False)315 def _test_in_tenant_allow(self, tenant):316 ruleset = dict(317 protocol='icmp',318 remote_group_id=tenant.security_groups['default'].id,319 direction='ingress'320 )321 self._create_security_group_rule(322 secgroup=tenant.security_groups['default'],323 **ruleset324 )325 access_point_ssh = self._connect_to_access_point(tenant)326 for server in tenant.servers:327 self._check_connectivity(access_point=access_point_ssh,328 ip=self._get_server_ip(server))329 def _test_cross_tenant_block(self, source_tenant, dest_tenant):330 """331 if public router isn't defined, then dest_tenant access is via332 floating-ip333 """334 access_point_ssh = self._connect_to_access_point(source_tenant)335 ip = self._get_server_ip(dest_tenant.access_point,336 floating=self.floating_ip_access)337 self._check_connectivity(access_point=access_point_ssh, ip=ip,338 should_succeed=False)339 def _test_cross_tenant_allow(self, source_tenant, dest_tenant):340 """341 check for each direction:342 creating rule for tenant incoming traffic enables only 1way traffic343 """344 ruleset = dict(345 protocol='icmp',346 direction='ingress'347 )348 self._create_security_group_rule(349 secgroup=dest_tenant.security_groups['default'],350 client=dest_tenant.manager.network_client,351 **ruleset352 )353 access_point_ssh = self._connect_to_access_point(source_tenant)354 ip = self._get_server_ip(dest_tenant.access_point,355 floating=self.floating_ip_access)356 self._check_connectivity(access_point_ssh, ip)357 # test that reverse traffic is still blocked358 self._test_cross_tenant_block(dest_tenant, source_tenant)359 # allow reverse traffic and check360 self._create_security_group_rule(361 secgroup=source_tenant.security_groups['default'],362 client=source_tenant.manager.network_client,363 **ruleset364 )365 access_point_ssh_2 = self._connect_to_access_point(dest_tenant)366 ip = self._get_server_ip(source_tenant.access_point,367 floating=self.floating_ip_access)368 self._check_connectivity(access_point_ssh_2, ip)369 def _verify_mac_addr(self, tenant):370 """371 verify that VM (tenant's access point) has the same ip,mac as listed in372 port list373 """374 access_point_ssh = self._connect_to_access_point(tenant)375 mac_addr = access_point_ssh.get_mac_address()376 mac_addr = mac_addr.strip().lower()377 # Get the fixed_ips and mac_address fields of all ports. Select378 # only those two columns to reduce the size of the response.379 port_list = self._list_ports(fields=['fixed_ips', 'mac_address'])380 port_detail_list = [381 (port['fixed_ips'][0]['subnet_id'],382 port['fixed_ips'][0]['ip_address'],383 port['mac_address'].lower())384 for port in port_list if port['fixed_ips']385 ]386 server_ip = self._get_server_ip(tenant.access_point)387 subnet_id = tenant.subnet.id388 self.assertIn((subnet_id, server_ip, mac_addr), port_detail_list)389 @test.attr(type='smoke')390 @test.services('compute', 'network')391 def test_cross_tenant_traffic(self):392 try:393 # deploy new tenant394 self._deploy_tenant(self.alt_tenant)395 self._verify_network_details(self.alt_tenant)396 self._verify_mac_addr(self.alt_tenant)397 # cross tenant check398 source_tenant = self.primary_tenant399 dest_tenant = self.alt_tenant400 self._test_cross_tenant_block(source_tenant, dest_tenant)401 self._test_cross_tenant_allow(source_tenant, dest_tenant)402 except Exception:403 for tenant in self.tenants.values():404 self._log_console_output(servers=tenant.servers)405 raise406 @test.attr(type='smoke')407 @test.services('compute', 'network')408 def test_in_tenant_traffic(self):409 try:410 self._create_tenant_servers(self.primary_tenant, num=1)411 # in-tenant check412 self._test_in_tenant_block(self.primary_tenant)413 self._test_in_tenant_allow(self.primary_tenant)414 except Exception:415 for tenant in self.tenants.values():416 self._log_console_output(servers=tenant.servers)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!