Best Python code snippet using tempest_python
base.py
Source:base.py
...170 policy_fix, self.vn2_fix)171 def config_md5(self, host, auth_data):172 rparam = self.vnc_lib.bgp_router_read(id=host).bgp_router_parameters173 list_uuid = self.vnc_lib.bgp_router_read(id=host)174 rparam.set_auth_data(auth_data)175 list_uuid.set_bgp_router_parameters(rparam)176 self.vnc_lib.bgp_router_update(list_uuid)177 @retry(delay=10, tries=9)178 def check_bgp_status(self, is_mx_present=False):179 result = True180 self.cn_inspect = self.connections.cn_inspect181 # Verify the connection between all control nodes and MX(if182 # present)183 host = self.inputs.bgp_ips[0]184 cn_bgp_entry = self.cn_inspect[host].get_cn_bgp_neigh_entry()185 if not is_mx_present:186 if self.inputs.ext_routers:187 for bgpnodes in cn_bgp_entry:188 bgpnode = str(bgpnodes)189 for individual_bgp_node in self.inputs.ext_routers:190 if individual_bgp_node[0] in bgpnode:191 cn_bgp_entry.remove(bgpnodes)192 str_bgp_entry = str(cn_bgp_entry)193 est = re.findall(' \'state\': \'(\w+)\', \'flap_count', str_bgp_entry)194 for ip in est:195 if not ('Established' in ip):196 result = False197 self.logger.debug("Check the BGP connection on %s", host)198 return result199 @retry(delay=10, tries=9)200 def check_tcp_status(self):201 result = True202 #testcases which check tcp status quickly change keys and check for tcp status. 203 #internally, tcp session is restarted when md5 keys are changed,204 #as tcp session may take some time to come up, adding some sleep.205 sleep(10)206 for node in self.inputs.bgp_control_ips:207 try:208 if self.is_mx_present and self.inputs.use_devicemanager_for_md5:209 cmd = 'netstat -tnp | grep :179 | awk \"{print $6}\"'210 else:211 cmd = 'netstat -tnp | grep :179 | '212 for ext_router in self.inputs.inputs.ext_routers:213 cmd = cmd + 'grep -v %s | ' % ext_router[1]214 cmd = cmd + 'awk \"{print $6}\"'215 except Exception as e:216 cmd = 'netstat -tnp | grep :179 | '217 for ext_router in self.inputs.inputs.ext_routers:218 cmd = cmd + 'grep -v %s | ' % ext_router[1]219 220 cmd = cmd + 'awk \"{print $6}\"'221 tcp_status = self.inputs.run_cmd_on_server(node, cmd,222 container='control')223 tcp_status=tcp_status.split('\n')224 for one_status in tcp_status:225 one_status=one_status.split(' ')[-2]226 if not ('ESTABLISHED' in one_status):227 result = False228 self.logger.debug("Check the TCP connection on %s", node)229 return result230 def config_per_peer(self, auth_data):231 uuid = self.vnc_lib.bgp_routers_list()232 uuid = str(uuid)233 list_uuid = re.findall('u\'uuid\': u\'([a-zA-Z0-9-]+)\'', uuid)234 #Same routine is used for DM and non DM testcases. Check if DM knob is enabled.235 if self.check_dm:236 list_uuid.append(self.phy_router_fixture.bgp_router.uuid)237 for node in list_uuid:238 if (self.vnc_lib.bgp_router_read(id=node).get_bgp_router_parameters().get_vendor()) == 'contrail':239 list_uuid1 = self.vnc_lib.bgp_router_read(id=node)240 iterrrefs = list_uuid1.get_bgp_router_refs()241 for str1 in iterrrefs:242 sess = str1['attr'].get_session()243 firstsess = sess[0]244 firstattr = firstsess.get_attributes()245 firstattr[0].set_auth_data(auth_data)246 list_uuid1._pending_field_updates.add('bgp_router_refs')247 self.vnc_lib.bgp_router_update(list_uuid1) 248 @classmethod249 def remove_mx_group_config(cls):250 if cls.inputs.ext_routers:251 router_params = cls.inputs.physical_routers_data.values()[0]252 cmd = []253 cmd.append('delete groups md5_tests')254 cmd.append('delete apply-groups md5_tests')255 mx_handle = NetconfConnection(host = router_params['mgmt_ip'])256 mx_handle.connect()257 cli_output = mx_handle.config(stmts = cmd, timeout = 120)258 def remove_configured_md5(self):259 auth_data=None...
test_auth_plugin.py
Source:test_auth_plugin.py
1# vim: tabstop=4 shiftwidth=4 softtabstop=42# Copyright 2013 OpenStack LLC3#4# Licensed under the Apache License, Version 2.0 (the "License"); you may5# not use this file except in compliance with the License. You may obtain6# a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the13# License for the specific language governing permissions and limitations14# under the License.15import uuid16from keystone.common import logging17from keystone import auth18from keystone import config19from keystone import exception20from keystone import test21# for testing purposes only22METHOD_NAME = 'simple-challenge-response'23EXPECTED_RESPONSE = uuid.uuid4().hex24DEMO_USER_ID = uuid.uuid4().hex25class SimpleChallengeResponse(auth.AuthMethodHandler):26 def authenticate(self, context, auth_payload, user_context):27 if 'response' in auth_payload:28 if auth_payload['response'] != EXPECTED_RESPONSE:29 raise exception.Unauthorized('Wrong answer')30 user_context['user_id'] = DEMO_USER_ID31 else:32 return {"challenge": "What's the name of your high school?"}33class TestAuthPlugin(test.TestCase):34 def setUp(self):35 super(TestAuthPlugin, self).setUp()36 self.config([37 test.etcdir('keystone.conf.sample'),38 test.testsdir('test_overrides.conf'),39 test.testsdir('backend_sql.conf'),40 test.testsdir('backend_sql_disk.conf'),41 test.testsdir('test_auth_plugin.conf')])42 self.load_backends()43 auth.controllers.AUTH_METHODS[METHOD_NAME] = SimpleChallengeResponse()44 self.api = auth.controllers.Auth()45 def test_unsupported_auth_method(self):46 method_name = uuid.uuid4().hex47 auth_data = {'methods': [method_name]}48 auth_data[method_name] = {'test': 'test'}49 auth_data = {'identity': auth_data}50 self.assertRaises(exception.AuthMethodNotSupported,51 auth.controllers.AuthInfo,52 None,53 auth_data)54 def test_addition_auth_steps(self):55 auth_data = {'methods': ['simple-challenge-response']}56 auth_data['simple-challenge-response'] = {57 'test': 'test'}58 auth_data = {'identity': auth_data}59 auth_info = auth.controllers.AuthInfo(None, auth_data)60 auth_context = {'extras': {}, 'method_names': []}61 try:62 self.api.authenticate({}, auth_info, auth_context)63 except exception.AdditionalAuthRequired as e:64 self.assertTrue('methods' in e.authentication)65 self.assertTrue(METHOD_NAME in e.authentication['methods'])66 self.assertTrue(METHOD_NAME in e.authentication)67 self.assertTrue('challenge' in e.authentication[METHOD_NAME])68 # test correct response69 auth_data = {'methods': ['simple-challenge-response']}70 auth_data['simple-challenge-response'] = {71 'response': EXPECTED_RESPONSE}72 auth_data = {'identity': auth_data}73 auth_info = auth.controllers.AuthInfo(None, auth_data)74 auth_context = {'extras': {}, 'method_names': []}75 self.api.authenticate({}, auth_info, auth_context)76 self.assertEqual(auth_context['user_id'], DEMO_USER_ID)77 # test incorrect response78 auth_data = {'methods': ['simple-challenge-response']}79 auth_data['simple-challenge-response'] = {80 'response': uuid.uuid4().hex}81 auth_data = {'identity': auth_data}82 auth_info = auth.controllers.AuthInfo(None, auth_data)83 auth_context = {'extras': {}, 'method_names': []}84 self.assertRaises(exception.Unauthorized,85 self.api.authenticate,86 {},87 auth_info,...
common.py
Source:common.py
1#!/usr/bin/python2# -*- coding: utf-8 -*-3import ConfigParser4import shlex5import subprocess6import MySQLdb7import os8import errno9def mkdir_recursive(path):10 try:11 os.makedirs(path)12 except OSError as exc: # Python >2.513 if exc.errno == errno.EEXIST and os.path.isdir(path):14 pass15 else: raise16def get_hosts(con):17 con.execute("SELECT id, name, hostnames, custom, SQLpass, FTPpass, root, FTPenabled, SQLenabled, ApacheEnabled "18 "FROM vhosts WHERE enabled=1")19 accs = []20 columns = tuple( [d[0] for d in con.description] )21 for row in con:22 #User can have multuple host names; let's get the first one23 acc = dict(zip(columns, row))24 if acc["hostnames"]=="":25 acc["hostname"] = ""26 else:27 acc["hostname"] = acc["hostnames"].split()[0]28 accs.append(acc)29 return accs30def mysql_connect(auth_data):31 try:32 db = MySQLdb.connect(host=auth_data['mysql_host'], user=auth_data['mysql_username'],33 passwd=auth_data['mysql_password'], db=auth_data['mysql_db'])34 cursor = db.cursor()35 except MySQLdb.Error:36 print db.error()37 return db, cursor38def run_script(script, input):39 args = shlex.split(script)40 process = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)41 output = process.communicate(input=input)[0].strip()42 return output43def get_config():44 config = ConfigParser.ConfigParser()45 config.read('config.cfg')46 auth_data = {}47 auth_data['mysql_username'] = config.get('main', 'user')48 auth_data['mysql_password'] = config.get('main', 'passwd')49 auth_data['mysql_db'] = config.get('main', 'db')50 auth_data['mysql_host'] = config.get('main', 'host')51 auth_data['ftp_username'] = config.get('main', 'backuser')52 auth_data['ftp_password'] = config.get('main', 'backpass')53 auth_data['ftp_host'] = config.get('main', 'backhost')54 auth_data['backup_dir'] = config.get('main', 'directory')55 return auth_data56def chownR(path, group, user):57 os.chown(path, group, user)58 for root, dirs, files in os.walk(path): 59 for momo in dirs: 60 os.chown(os.path.join(root, momo), user, group)61 for momo in files:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!