Best Python code snippet using tempest_python
auth.py
Source:auth.py
...198 auth_params = self._auth_params()199 # returns token, auth_data200 token, auth_data = auth_func(**auth_params)201 return token, auth_data202 def _parse_expiry_time(self, expiry_string):203 expiry = None204 for date_format in self.EXPIRY_DATE_FORMATS:205 try:206 expiry = datetime.datetime.strptime(207 expiry_string, date_format)208 except ValueError:209 pass210 if expiry is None:211 raise ValueError(212 "time data '{data}' does not match any of the"213 "expected formats: {formats}".format(214 data=expiry_string, formats=self.EXPIRY_DATE_FORMATS))215 return expiry216 def get_token(self):217 return self.auth_data[0]218class KeystoneV2AuthProvider(KeystoneAuthProvider):219 def _auth_client(self, auth_url):220 return json_v2id.TokenClient(221 auth_url, disable_ssl_certificate_validation=self.dsvm,222 ca_certs=self.ca_certs, trace_requests=self.trace_requests)223 def _auth_params(self):224 return dict(225 user=self.credentials.username,226 password=self.credentials.password,227 tenant=self.credentials.tenant_name,228 auth_data=True)229 def _fill_credentials(self, auth_data_body):230 tenant = auth_data_body['token']['tenant']231 user = auth_data_body['user']232 if self.credentials.tenant_name is None:233 self.credentials.tenant_name = tenant['name']234 if self.credentials.tenant_id is None:235 self.credentials.tenant_id = tenant['id']236 if self.credentials.username is None:237 self.credentials.username = user['name']238 if self.credentials.user_id is None:239 self.credentials.user_id = user['id']240 def base_url(self, filters, auth_data=None):241 """Base URL from catalog242 Filters can be:243 - service: compute, image, etc244 - region: the service region245 - endpoint_type: adminURL, publicURL, internalURL246 - api_version: replace catalog version with this247 - skip_path: take just the base URL248 """249 if auth_data is None:250 auth_data = self.auth_data251 token, _auth_data = auth_data252 service = filters.get('service')253 region = filters.get('region')254 endpoint_type = filters.get('endpoint_type', 'publicURL')255 if service is None:256 raise exceptions.EndpointNotFound("No service provided")257 _base_url = None258 for ep in _auth_data['serviceCatalog']:259 if ep["type"] == service:260 for _ep in ep['endpoints']:261 if region is not None and _ep['region'] == region:262 _base_url = _ep.get(endpoint_type)263 if not _base_url:264 # No region matching, use the first265 _base_url = ep['endpoints'][0].get(endpoint_type)266 break267 if _base_url is None:268 raise exceptions.EndpointNotFound(service)269 parts = urlparse.urlparse(_base_url)270 if filters.get('api_version', None) is not None:271 path = "/" + filters['api_version']272 noversion_path = "/".join(parts.path.split("/")[2:])273 if noversion_path != "":274 path += "/" + noversion_path275 _base_url = _base_url.replace(parts.path, path)276 if filters.get('skip_path', None) is not None and parts.path != '':277 _base_url = _base_url.replace(parts.path, "/")278 return _base_url279 def is_expired(self, auth_data):280 _, access = auth_data281 expiry = self._parse_expiry_time(access['token']['expires'])282 return (expiry - self.token_expiry_threshold <=283 datetime.datetime.utcnow())284class KeystoneV3AuthProvider(KeystoneAuthProvider):285 def _auth_client(self, auth_url):286 return json_v3id.V3TokenClient(287 auth_url, disable_ssl_certificate_validation=self.dsvm,288 ca_certs=self.ca_certs, trace_requests=self.trace_requests)289 def _auth_params(self):290 return dict(291 user_id=self.credentials.user_id,292 username=self.credentials.username,293 password=self.credentials.password,294 project_id=self.credentials.project_id,295 project_name=self.credentials.project_name,296 user_domain_id=self.credentials.user_domain_id,297 user_domain_name=self.credentials.user_domain_name,298 project_domain_id=self.credentials.project_domain_id,299 project_domain_name=self.credentials.project_domain_name,300 domain_id=self.credentials.domain_id,301 domain_name=self.credentials.domain_name,302 auth_data=True)303 def _fill_credentials(self, auth_data_body):304 # project or domain, depending on the scope305 project = auth_data_body.get('project', None)306 domain = auth_data_body.get('domain', None)307 # user is always there308 user = auth_data_body['user']309 # Set project fields310 if project is not None:311 if self.credentials.project_name is None:312 self.credentials.project_name = project['name']313 if self.credentials.project_id is None:314 self.credentials.project_id = project['id']315 if self.credentials.project_domain_id is None:316 self.credentials.project_domain_id = project['domain']['id']317 if self.credentials.project_domain_name is None:318 self.credentials.project_domain_name = (319 project['domain']['name'])320 # Set domain fields321 if domain is not None:322 if self.credentials.domain_id is None:323 self.credentials.domain_id = domain['id']324 if self.credentials.domain_name is None:325 self.credentials.domain_name = domain['name']326 # Set user fields327 if self.credentials.username is None:328 self.credentials.username = user['name']329 if self.credentials.user_id is None:330 self.credentials.user_id = user['id']331 if self.credentials.user_domain_id is None:332 self.credentials.user_domain_id = user['domain']['id']333 if self.credentials.user_domain_name is None:334 self.credentials.user_domain_name = user['domain']['name']335 def base_url(self, filters, auth_data=None):336 """Base URL from catalog337 Filters can be:338 - service: compute, image, etc339 - region: the service region340 - endpoint_type: adminURL, publicURL, internalURL341 - api_version: replace catalog version with this342 - skip_path: take just the base URL343 """344 if auth_data is None:345 auth_data = self.auth_data346 token, _auth_data = auth_data347 service = filters.get('service')348 region = filters.get('region')349 endpoint_type = filters.get('endpoint_type', 'public')350 if service is None:351 raise exceptions.EndpointNotFound("No service provided")352 if 'URL' in endpoint_type:353 endpoint_type = endpoint_type.replace('URL', '')354 _base_url = None355 catalog = _auth_data['catalog']356 # Select entries with matching service type357 service_catalog = [ep for ep in catalog if ep['type'] == service]358 if len(service_catalog) > 0:359 service_catalog = service_catalog[0]['endpoints']360 else:361 # No matching service362 raise exceptions.EndpointNotFound(service)363 # Filter by endpoint type (interface)364 filtered_catalog = [ep for ep in service_catalog if365 ep['interface'] == endpoint_type]366 if len(filtered_catalog) == 0:367 # No matching type, keep all and try matching by region at least368 filtered_catalog = service_catalog369 # Filter by region370 filtered_catalog = [ep for ep in filtered_catalog if371 ep['region'] == region]372 if len(filtered_catalog) == 0:373 # No matching region, take the first endpoint374 filtered_catalog = [service_catalog[0]]375 # There should be only one match. If not take the first.376 _base_url = filtered_catalog[0].get('url', None)377 if _base_url is None:378 raise exceptions.EndpointNotFound(service)379 parts = urlparse.urlparse(_base_url)380 if filters.get('api_version', None) is not None:381 path = "/" + filters['api_version']382 noversion_path = "/".join(parts.path.split("/")[2:])383 if noversion_path != "":384 path += "/" + noversion_path385 _base_url = _base_url.replace(parts.path, path)386 if filters.get('skip_path', None) is not None:387 _base_url = _base_url.replace(parts.path, "/")388 return _base_url389 def is_expired(self, auth_data):390 _, access = auth_data391 expiry = self._parse_expiry_time(access['expires_at'])392 return (expiry - self.token_expiry_threshold <=393 datetime.datetime.utcnow())394def is_identity_version_supported(identity_version):395 return identity_version in IDENTITY_VERSION396def get_credentials(auth_url, fill_in=True, identity_version='v2',397 disable_ssl_certificate_validation=None, ca_certs=None,398 trace_requests=None, **kwargs):399 """Builds a credentials object based on the configured auth_version400 :param auth_url (string): Full URI of the OpenStack Identity API(Keystone)401 which is used to fetch the token from Identity service.402 :param fill_in (boolean): obtain a token and fill in all credential403 details provided by the identity service. When fill_in is not404 specified, credentials are not validated. Validation can be invoked405 by invoking ``is_valid()``...
common.py
Source:common.py
...35 self.logger.info('Saving auth data to cache...')36 json.dump(auth_data, cache_file)37 def is_token_expired(self, auth_data):38 LOCK.acquire()39 expiry = self._parse_expiry_time(auth_data['access']['token']['expires'])40 r = expiry <= datetime.datetime.utcnow()41 if r:42 self.logger.info('Token expired, will renew token...')43 LOCK.release()44 return r45 def _parse_expiry_time(self, expiry_string):46 expiry = None47 for date_format in EXPIRY_DATE_FORMATS:48 try:49 expiry = datetime.datetime.strptime(50 expiry_string, date_format)51 except ValueError:52 pass53 if expiry is None:54 raise ValueError(55 "time data '{data}' does not match any of the"56 "expected formats: {formats}".format(57 data=expiry_string, formats=self.EXPIRY_DATE_FORMATS))58 return expiry59 def get_token(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!