Best Python code snippet using localstack_python
kerberos_common.py
Source:kerberos_common.py
1"""2Licensed to the Apache Software Foundation (ASF) under one3or more contributor license agreements. See the NOTICE file4distributed with this work for additional information5regarding copyright ownership. The ASF licenses this file6to you under the Apache License, Version 2.0 (the7"License"); you may not use this file except in compliance8with the License. You may obtain a copy of the License at9 http://www.apache.org/licenses/LICENSE-2.010Unless required by applicable law or agreed to in writing, software11distributed under the License is distributed on an "AS IS" BASIS,12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13See the License for the specific language governing permissions and14limitations under the License.15"""16import base6417import getpass18import os19import string20import subprocess21import sys22import tempfile23from tempfile import gettempdir24from resource_management import *25from utils import get_property_value26from ambari_commons.os_utils import remove_file27from ambari_agent import Constants28class KerberosScript(Script):29 KRB5_REALM_PROPERTIES = [30 'kdc',31 'admin_server',32 'default_domain',33 'master_kdc'34 ]35 KRB5_SECTION_NAMES = [36 'libdefaults',37 'logging',38 'realms',39 'domain_realm',40 'capaths',41 'ca_paths',42 'appdefaults',43 'plugins'44 ]45 @staticmethod46 def create_random_password():47 import random48 chars = string.digits + string.ascii_letters49 return ''.join(random.choice(chars) for x in range(13))50 @staticmethod51 def write_conf_section(output_file, section_name, section_data):52 if section_name is not None:53 output_file.write('[%s]\n' % section_name)54 if section_data is not None:55 for key, value in section_data.iteritems():56 output_file.write(" %s = %s\n" % (key, value))57 @staticmethod58 def _write_conf_realm(output_file, realm_name, realm_data):59 """ Writes out realm details60 Example:61 EXAMPLE.COM = {62 kdc = kerberos.example.com63 admin_server = kerberos.example.com64 }65 """66 if realm_name is not None:67 output_file.write(" %s = {\n" % realm_name)68 if realm_data is not None:69 for key, value in realm_data.iteritems():70 if key in KerberosScript.KRB5_REALM_PROPERTIES:71 output_file.write(" %s = %s\n" % (key, value))72 output_file.write(" }\n")73 @staticmethod74 def write_conf_realms_section(output_file, section_name, realms_data):75 if section_name is not None:76 output_file.write('[%s]\n' % section_name)77 if realms_data is not None:78 for realm, realm_data in realms_data.iteritems():79 KerberosScript._write_conf_realm(output_file, realm, realm_data)80 output_file.write('\n')81 @staticmethod82 def write_krb5_conf():83 import params84 Directory(params.krb5_conf_dir,85 owner='root',86 create_parents = True,87 group='root',88 mode=075589 )90 content = InlineTemplate(params.krb5_conf_template)91 File(params.krb5_conf_path,92 content=content,93 owner='root',94 group='root',95 mode=064496 )97 @staticmethod98 def invoke_kadmin(query, admin_identity=None, default_realm=None):99 """100 Executes the kadmin or kadmin.local command (depending on whether auth_identity is set or not101 and returns command result code and standard out data.102 :param query: the kadmin query to execute103 :param admin_identity: the identity for the administrative user (optional)104 :param default_realm: the default realm to assume105 :return: return_code, out106 """107 if (query is not None) and (len(query) > 0):108 auth_principal = None109 auth_keytab_file = None110 if admin_identity is not None:111 auth_principal = get_property_value(admin_identity, 'principal')112 if auth_principal is None:113 kadmin = 'kadmin.local'114 credential = ''115 else:116 kadmin = 'kadmin -p "%s"' % auth_principal117 auth_password = get_property_value(admin_identity, 'password')118 if auth_password is None:119 auth_keytab = get_property_value(admin_identity, 'keytab')120 if auth_keytab is not None:121 (fd, auth_keytab_file) = tempfile.mkstemp()122 keytab_file_path = keytab_file_path.replace("_HOST", params.hostname)123 os.write(fd, base64.b64decode(auth_keytab))124 os.close(fd)125 credential = '-k -t %s' % auth_keytab_file126 else:127 credential = '-w "%s"' % auth_password128 if (default_realm is not None) and (len(default_realm) > 0):129 realm = '-r %s' % default_realm130 else:131 realm = ''132 try:133 command = '%s %s %s -q "%s"' % (kadmin, credential, realm, query.replace('"', '\\"'))134 return shell.checked_call(command)135 except:136 raise137 finally:138 if auth_keytab_file is not None:139 os.remove(auth_keytab_file)140 @staticmethod141 def create_keytab_file(principal, path, auth_identity=None):142 success = False143 if (principal is not None) and (len(principal) > 0):144 if (auth_identity is None) or (len(auth_identity) == 0):145 norandkey = '-norandkey'146 else:147 norandkey = ''148 if (path is not None) and (len(path) > 0):149 keytab_file = '-k %s' % path150 else:151 keytab_file = ''152 try:153 result_code, output = KerberosScript.invoke_kadmin(154 'ktadd %s %s %s' % (keytab_file, norandkey, principal),155 auth_identity)156 success = (result_code == 0)157 except:158 raise Fail("Failed to create keytab for principal: %s (in %s)" % (principal, path))159 return success160 @staticmethod161 def create_keytab(principal, auth_identity=None):162 keytab = None163 (fd, temp_path) = tempfile.mkstemp()164 os.remove(temp_path)165 try:166 if KerberosScript.create_keytab_file(principal, temp_path, auth_identity):167 with open(temp_path, 'r') as f:168 keytab = base64.b64encode(f.read())169 finally:170 if os.path.isfile(temp_path):171 os.remove(temp_path)172 return keytab173 @staticmethod174 def principal_exists(identity, auth_identity=None):175 exists = False176 if identity is not None:177 principal = get_property_value(identity, 'principal')178 if (principal is not None) and (len(principal) > 0):179 try:180 result_code, output = KerberosScript.invoke_kadmin('getprinc %s' % principal,181 auth_identity)182 exists = (output is not None) and (("Principal: %s" % principal) in output)183 except:184 raise Fail("Failed to determine if principal exists: %s" % principal)185 return exists186 @staticmethod187 def change_principal_password(identity, auth_identity=None):188 success = False189 if identity is not None:190 principal = get_property_value(identity, 'principal')191 if (principal is not None) and (len(principal) > 0):192 password = get_property_value(identity, 'password')193 if password is None:194 credentials = '-randkey'195 else:196 credentials = '-pw "%s"' % password197 try:198 result_code, output = KerberosScript.invoke_kadmin(199 'change_password %s %s' % (credentials, principal),200 auth_identity)201 success = (result_code == 0)202 except:203 raise Fail("Failed to create principal: %s" % principal)204 return success205 @staticmethod206 def create_principal(identity, auth_identity=None):207 success = False208 if identity is not None:209 principal = get_property_value(identity, 'principal')210 if (principal is not None) and (len(principal) > 0):211 password = get_property_value(identity, 'password')212 if password is None:213 credentials = '-randkey'214 else:215 credentials = '-pw "%s"' % password216 try:217 result_code, out = KerberosScript.invoke_kadmin(218 'addprinc %s %s' % (credentials, principal),219 auth_identity)220 success = (result_code == 0)221 except:222 raise Fail("Failed to create principal: %s" % principal)223 return success224 @staticmethod225 def clear_tmp_cache():226 tmp_dir = Constants.AGENT_TMP_DIR227 if tmp_dir is None:228 tmp_dir = gettempdir()229 curl_krb_cache_path = os.path.join(tmp_dir, "curl_krb_cache")230 Directory(curl_krb_cache_path, action="delete")231 @staticmethod232 def create_principals(identities, auth_identity=None):233 if identities is not None:234 for identity in identities:235 KerberosScript.create_principal(identity, auth_identity)236 @staticmethod237 def create_or_update_administrator_identity():238 import params239 if params.realm is not None:240 admin_identity = params.get_property_value(params.realm, 'admin_identity')241 if KerberosScript.principal_exists(admin_identity):242 KerberosScript.change_principal_password(admin_identity)243 else:244 KerberosScript.create_principal(admin_identity)245 @staticmethod246 def test_kinit(identity, user="root"):247 principal = get_property_value(identity, 'principal')248 kinit_path_local = functions.get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))249 kdestroy_path_local = functions.get_kdestroy_path(default('/configurations/kerberos-env/executable_search_paths', None))250 if principal is not None:251 keytab_file = get_property_value(identity, 'keytab_file')252 keytab = get_property_value(identity, 'keytab')253 password = get_property_value(identity, 'password')254 # If a test keytab file is available, simply use it255 if (keytab_file is not None) and (os.path.isfile(keytab_file)):256 keytab_file = keytab_file.replace("_HOST", params.hostname)257 command = '%s -k -t %s %s' % (kinit_path_local, keytab_file, principal)258 Execute(command,259 user = user,260 )261 return shell.checked_call(kdestroy_path_local)262 # If base64-encoded test keytab data is available; then decode it, write it to a temporary file263 # use it, and then remove the temporary file264 elif keytab is not None:265 (fd, test_keytab_file) = tempfile.mkstemp()266 os.write(fd, base64.b64decode(keytab))267 os.close(fd)268 try:269 command = '%s -k -t %s %s' % (kinit_path_local, test_keytab_file, principal)270 Execute(command,271 user = user,272 )273 return shell.checked_call(kdestroy_path_local)274 except:275 raise276 finally:277 if test_keytab_file is not None:278 os.remove(test_keytab_file)279 # If no keytab data is available and a password was supplied, simply use it.280 elif password is not None:281 process = subprocess.Popen([kinit_path_local, principal], stdin=subprocess.PIPE)282 stdout, stderr = process.communicate(password)283 if process.returncode:284 err_msg = Logger.filter_text("Execution of kinit returned %d. %s" % (process.returncode, stderr))285 raise Fail(err_msg)286 else:287 return shell.checked_call(kdestroy_path_local)288 else:289 return 0, ''290 else:291 return 0, ''292 def write_keytab_file(self):293 import params294 import stat295 if params.kerberos_command_params is not None:296 for item in params.kerberos_command_params:297 keytab_content_base64 = get_property_value(item, 'keytab_content_base64')298 if (keytab_content_base64 is not None) and (len(keytab_content_base64) > 0):299 keytab_file_path = get_property_value(item, 'keytab_file_path')300 if (keytab_file_path is not None) and (len(keytab_file_path) > 0):301 keytab_file_path = keytab_file_path.replace("_HOST", params.hostname)302 head, tail = os.path.split(keytab_file_path)303 if head:304 Directory(head, create_parents = True, mode=0755, owner="root", group="root")305 owner = "root"306 group = "root"307 mode = 0308 mode |= stat.S_IREAD | stat.S_IWRITE309 mode |= stat.S_IRGRP | stat.S_IWGRP310 keytab_content = base64.b64decode(keytab_content_base64)311 # to hide content in command output312 def make_lambda(data):313 return lambda: data314 File(keytab_file_path,315 content=make_lambda(keytab_content),316 mode=mode,317 owner=owner,318 group=group)319 principal = get_property_value(item, 'principal')320 if principal is not None:321 curr_content = Script.structuredOut322 if "keytabs" not in curr_content:323 curr_content['keytabs'] = {}324 curr_content['keytabs'][principal.replace("_HOST", params.hostname)] = keytab_file_path325 self.put_structured_out(curr_content)326 def delete_keytab_file(self):327 import params328 if params.kerberos_command_params is not None:329 for item in params.kerberos_command_params:330 keytab_file_path = get_property_value(item, 'keytab_file_path')331 if (keytab_file_path is not None) and (len(keytab_file_path) > 0):332 keytab_file_path = keytab_file_path.replace("_HOST", params.hostname)333 # Delete the keytab file334 File(keytab_file_path, action="delete")335 principal = get_property_value(item, 'principal')336 if principal is not None:337 curr_content = Script.structuredOut338 if "keytabs" not in curr_content:339 curr_content['keytabs'] = {}340 curr_content['keytabs'][principal.replace("_HOST", params.hostname)] = '_REMOVED_'...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!