Best Python code snippet using localstack_python
test_authorization.py
Source:test_authorization.py
...26 # SAMPLE_V3_TOKEN has OS-TRUST:trust in it.27 token_data = test_token_provider.SAMPLE_V3_TOKEN28 token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,29 token_data=token_data)30 auth_context = authorization.token_to_auth_context(token)31 self.assertEqual(token, auth_context['token'])32 self.assertTrue(auth_context['is_delegated_auth'])33 self.assertEqual(token_data['token']['user']['id'],34 auth_context['user_id'])35 self.assertEqual(token_data['token']['user']['domain']['id'],36 auth_context['user_domain_id'])37 self.assertEqual(token_data['token']['project']['id'],38 auth_context['project_id'])39 self.assertEqual(token_data['token']['project']['domain']['id'],40 auth_context['project_domain_id'])41 self.assertNotIn('domain_id', auth_context)42 self.assertNotIn('domain_name', auth_context)43 self.assertEqual(token_data['token']['OS-TRUST:trust']['id'],44 auth_context['trust_id'])45 self.assertEqual(46 token_data['token']['OS-TRUST:trust']['trustor_user_id'],47 auth_context['trustor_id'])48 self.assertEqual(49 token_data['token']['OS-TRUST:trust']['trustee_user_id'],50 auth_context['trustee_id'])51 self.assertItemsEqual(52 [r['name'] for r in token_data['token']['roles']],53 auth_context['roles'])54 self.assertIsNone(auth_context['consumer_id'])55 self.assertIsNone(auth_context['access_token_id'])56 self.assertNotIn('group_ids', auth_context)57 def test_token_is_domain_scoped(self):58 # Check contents of auth_context when token is domain-scoped.59 token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)60 del token_data['token']['project']61 domain_id = uuid.uuid4().hex62 domain_name = uuid.uuid4().hex63 token_data['token']['domain'] = {'id': domain_id, 'name': domain_name}64 token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,65 token_data=token_data)66 auth_context = authorization.token_to_auth_context(token)67 self.assertNotIn('project_id', auth_context)68 self.assertNotIn('project_domain_id', auth_context)69 self.assertEqual(domain_id, auth_context['domain_id'])70 self.assertEqual(domain_name, auth_context['domain_name'])71 def test_token_is_unscoped(self):72 # Check contents of auth_context when the token is unscoped.73 token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)74 del token_data['token']['project']75 token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,76 token_data=token_data)77 auth_context = authorization.token_to_auth_context(token)78 self.assertNotIn('project_id', auth_context)79 self.assertNotIn('project_domain_id', auth_context)80 self.assertNotIn('domain_id', auth_context)81 self.assertNotIn('domain_name', auth_context)82 def test_token_is_for_federated_user(self):83 # When the token is for a federated user then group_ids is in84 # auth_context.85 token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)86 group_ids = [uuid.uuid4().hex for x in range(1, 5)]87 federation_data = {'identity_provider': {'id': uuid.uuid4().hex},88 'protocol': {'id': 'saml2'},89 'groups': [{'id': gid} for gid in group_ids]}90 token_data['token']['user'][federation_constants.FEDERATION] = (91 federation_data)92 token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,93 token_data=token_data)94 auth_context = authorization.token_to_auth_context(token)95 self.assertItemsEqual(group_ids, auth_context['group_ids'])96 def test_oauth_variables_set_for_oauth_token(self):97 token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)98 access_token_id = uuid.uuid4().hex99 consumer_id = uuid.uuid4().hex100 token_data['token']['OS-OAUTH1'] = {'access_token_id': access_token_id,101 'consumer_id': consumer_id}102 token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,103 token_data=token_data)104 auth_context = authorization.token_to_auth_context(token)105 self.assertEqual(access_token_id, auth_context['access_token_id'])106 self.assertEqual(consumer_id, auth_context['consumer_id'])107 def test_oauth_variables_not_set(self):108 token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)109 token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,110 token_data=token_data)111 auth_context = authorization.token_to_auth_context(token)112 self.assertIsNone(auth_context['access_token_id'])113 self.assertIsNone(auth_context['consumer_id'])114 def test_token_is_not_KeystoneToken_raises_exception(self):115 # If the token isn't a KeystoneToken then an UnexpectedError exception116 # is raised.117 self.assertRaises(exception.UnexpectedError,118 authorization.token_to_auth_context, {})119 def test_user_id_missing_in_token_raises_exception(self):120 # If there's no user ID in the token then an Unauthorized121 # exception is raised.122 token_data = copy.deepcopy(test_token_provider.SAMPLE_V3_TOKEN)123 del token_data['token']['user']['id']124 token = token_model.KeystoneToken(token_id=uuid.uuid4().hex,125 token_data=token_data)...
func.py
Source:func.py
1import datetime2import io3import json4import logging5import oci6import base647from datetime import timedelta8import requests9from fdk import response10from requests.auth import HTTPBasicAuth11oauth_apps = {}12def initContext(context):13 # This method takes elements from the Application Context and from OCI Vault to create the OAuth App Clients object.14 if (len(oauth_apps) < 2):15 logging.getLogger().info('Retriving details about the API and backend OAuth Apps')16 try:17 logging.getLogger().info('initContext: Initializing context')18 oauth_apps['idcs'] = {'introspection_endpoint': context['idcs_introspection_endpoint'], 19 'client_id': context['idcs_app_client_id'], 20 'client_secret': getSecret(context['idcs_app_client_secret_ocid'])}21 oauth_apps['oic'] = {'token_endpoint': context['back_end_token_endpoint'], 22 'client_id': context['back_end_app_client_id'], 23 'client_secret': getSecret(context['back_end_client_secret_ocid'])}24 except Exception as ex:25 logging.getLogger().error('initContext: Failed to get config or secrets')26 print("ERROR [initContext]: Failed to get the configs", ex, flush=True)27 raise28 else:29 logging.getLogger().info('OAuth Apps already stored')30 31def getSecret(ocid):32 signer = oci.auth.signers.get_resource_principals_signer()33 try:34 client = oci.secrets.SecretsClient({}, signer=signer)35 secret_content = client.get_secret_bundle(ocid).data.secret_bundle_content.content.encode('utf-8')36 decrypted_secret_content = base64.b64decode(secret_content).decode('utf-8')37 except Exception as ex:38 logging.getLogger().error("getSecret: Failed to get Secret" + ex)39 print("Error [getSecret]: failed to retrieve", ex, flush=True)40 raise41 return decrypted_secret_content42def introspectToken(access_token, introspection_endpoint, client_id, client_secret):43 # This method handles the introspection of the received auth token to IDCS. 44 payload = {'token': access_token}45 headers = {'Content-Type' : 'application/x-www-form-urlencoded;charset=UTF-8', 46 'Accept': 'application/json'}47 48 try:49 token = requests.post(introspection_endpoint, 50 data=payload, 51 headers=headers, 52 auth=HTTPBasicAuth(client_id, 53 client_secret))54 except Exception as ex:55 logging.getLogger().error("introspectToken: Failed to introspect token" + ex)56 raise57 return token.json()58def getBackEndAuthToken(token_endpoint, client_id, client_secret):59 # This method gets the token from the back-end system (oic in this case)60 payload = {'grant_type': 'client_credentials'}61 headers = {'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8'}62 try:63 backend_token = json.loads(requests.post(token_endpoint, 64 data=payload, 65 headers=headers, 66 auth=HTTPBasicAuth(client_id, client_secret)).text)67 except Exception as ex:68 logging.getLogger().error("getBackEndAuthToken: Failed to get oic token" + ex)69 raise70 71 return backend_token72def getAuthContext(token, client_apps):73 # This method populates the Auth Context that will be returned to the gateway.74 auth_context = {}75 # Calling IDCS to validate the token and retrieve the client info76 try:77 token_info = introspectToken(token[len('Bearer '):], client_apps['idcs']['introspection_endpoint'], client_apps['idcs']['client_id'], client_apps['idcs']['client_secret'])78 except Exception as ex:79 logging.getLogger().error("getAuthContext: Failed to introspect token" + ex)80 raise81 # If IDCS confirmed the token valid and active, we can proceed to populate the auth context82 if (token_info['active'] == True):83 auth_context['active'] = True84 auth_context['principal'] = token_info['sub']85 auth_context['scope'] = token_info['scope']86 # Retrieving the back-end Token87 backend_token = getBackEndAuthToken(client_apps['oic']['token_endpoint'], client_apps['oic']['client_id'], client_apps['oic']['client_secret'])88 89 # The maximum TTL for this auth is the lesser of the API Client Auth (IDCS) and the Gateway Client Auth (oic)90 if (datetime.datetime.fromtimestamp(token_info['exp']) < (datetime.datetime.utcnow() + timedelta(seconds=backend_token['expires_in']))):91 auth_context['expiresAt'] = (datetime.datetime.fromtimestamp(token_info['exp'])).replace(tzinfo=datetime.timezone.utc).astimezone().replace(microsecond=0).isoformat()92 else:93 auth_context['expiresAt'] = (datetime.datetime.utcnow() + timedelta(seconds=backend_token['expires_in'])).replace(tzinfo=datetime.timezone.utc).astimezone().replace(microsecond=0).isoformat()94 # Storing the back_end_token in the context of the auth decision so we can map it to Authorization header using the request/response transformation policy95 auth_context['context'] = {'back_end_token': ('Bearer ' + str(backend_token['access_token']))}96 else:97 # API Client token is not active, so we will go ahead and respond with the wwwAuthenticate header98 auth_context['active'] = False99 auth_context['wwwAuthenticate'] = 'Bearer realm=\"identity.oraclecloud.com\"'100 return(auth_context)101def handler(ctx, data: io.BytesIO=None):102 logging.getLogger().info('Entered Handler')103 initContext(dict(ctx.Config()))104 105 auth_context = {}106 try:107 gateway_auth = json.loads(data.getvalue())108 auth_context = getAuthContext(gateway_auth['token'], oauth_apps)109 if (auth_context['active']):110 logging.getLogger().info('Authorizer returning 200...')111 return response.Response(112 ctx,113 response_data=json.dumps(auth_context),114 status_code = 200,115 headers={"Content-Type": "application/json"}116 )117 else:118 logging.getLogger().info('Authorizer returning 401...')119 return response.Response(120 ctx,121 response_data=json.dumps(str(auth_context)),122 status_code = 401,123 headers={"Content-Type": "application/json"}124 )125 except (Exception, ValueError) as ex:126 logging.getLogger().info('error parsing json payload: ' + str(ex))127 return response.Response(128 ctx,129 response_data=json.dumps(str(auth_context)),130 status_code = 401,131 headers={"Content-Type": "application/json"}...
authorization.py
Source:authorization.py
...45* ``group_ids`` (optional): list of group IDs for which the API user has46 membership if token was for a federated user47"""48LOG = log.getLogger(__name__)49def token_to_auth_context(token):50 if not isinstance(token, token_model.KeystoneToken):51 raise exception.UnexpectedError(_('token reference must be a '52 'KeystoneToken type, got: %s') %53 type(token))54 auth_context = {'token': token,55 'is_delegated_auth': False}56 try:57 auth_context['user_id'] = token.user_id58 except KeyError:59 LOG.warning(_LW('RBAC: Invalid user data in token'))60 raise exception.Unauthorized()61 auth_context['user_domain_id'] = token.user_domain_id62 if token.project_scoped:63 auth_context['project_id'] = token.project_id...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!