Best Python code snippet using localstack_python
opensearchTools.py
Source:opensearchTools.py
1'''2Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.3Licensed under the Apache License, Version 2.0 (the "License");4you may not use this file except in compliance with the License.5You may obtain a copy of the License at6 http://www.apache.org/licenses/LICENSE-2.07Unless required by applicable law or agreed to in writing, software8distributed under the License is distributed on an "AS IS" BASIS,9WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.10See the License for the specific language governing permissions and11limitations under the License.12'''13import json14import logging15import os16import re17import boto318import requests19from crhelper import CfnResource20from requests_aws4auth import AWS4Auth21logger = logging.getLogger(__name__)22helper_config = CfnResource(json_logging=False,23 log_level='DEBUG',24 boto_level='CRITICAL',25 sleep_on_delete=120)26region = os.environ['AWS_REGION']27KIBANA_HEADERS = {'Content-Type': 'application/json', 'kbn-xsrf': 'true'}28OPENSEARCH_DASHBOARD_HEADERS = {'Content-Type': 'application/json', 'osd-xsrf': 'true'}29def auth_opensearch(opensearch_endpoint):30 service = 'es'31 credentials = boto3.Session().get_credentials()32 awsauth = AWS4Auth(credentials.access_key,33 credentials.secret_key,34 region,35 service,36 session_token=credentials.token)37 return awsauth38def output_message(key, res):39 return (f'{key}: status={res.status_code}, message={res.text}')40def query_opensearch(opensearch_endpoint,41 awsauth,42 method=None,43 path=None,44 payload=None,45 headers=None):46 if not headers:47 headers = {'Content-Type': 'application/json'}48 url = 'https://' + opensearch_endpoint + '/' + path49 if method.lower() == 'get':50 res = requests.get(url, auth=awsauth, stream=True)51 elif method.lower() == 'post':52 res = requests.post(url, auth=awsauth, json=payload, headers=headers)53 elif method.lower() == 'put':54 res = requests.put(url, auth=awsauth, json=payload, headers=headers)55 elif method.lower() == 'patch':56 res = requests.put(url, auth=awsauth, json=payload, headers=headers)57 elif method.lower() == 'head':58 res = requests.head(url, auth=awsauth, json=payload, headers=headers)59 return (res)60def set_tenant_get_cookies(engineType, opensearch_endpoint, tenant, auth):61 if engineType == 'OpenSearch':62 base_url = f'https://{opensearch_endpoint}/_dashboards'63 headers = OPENSEARCH_DASHBOARD_HEADERS64 else:65 base_url = f'https://{opensearch_endpoint}/_plugin/kibana'66 headers = KIBANA_HEADERS67 if isinstance(auth, dict):68 url = f'{base_url}/auth/login?security_tenant={tenant}'69 response = requests.post(url,70 headers=headers,71 json=json.dumps(auth))72 elif isinstance(auth, AWS4Auth):73 url = f'{base_url}/app/dashboards?security_tenant={tenant}'74 response = requests.get(url, headers=headers, auth=auth)75 else:76 logger.error('There is no valid authentication')77 return False78 if response.status_code in (200, ):79 logger.info('Authentication success to access kibana')80 return response.cookies81 else:82 print(response.cookies)83 logger.error("Authentication failed to access kibana")84 logger.error(response.reason)85 return False86def configure_aes_total_fields_limit(opensearch_endpoint, indexName, limit_number,87 awsauth):88 payload = {"index.mapping.total_fields.limit": limit_number}89 path = str(indexName) + '/_settings'90 res = query_opensearch(opensearch_endpoint, awsauth, 'PUT', path, payload)91 if res.status_code == 200:92 logger.info("Change total_fields limit for index: %s to %d." %93 (indexName, limit_number))94 return 'success'95 else:96 logger.error(output_message(path, res))97 return 'fail'98@helper_config.create99@helper_config.update100def update_total_fields_limit(opensearch_endpoint, indexName, limit_number):101 awsauth = auth_opensearch(opensearch_endpoint)102 state = configure_aes_total_fields_limit(opensearch_endpoint, indexName,103 limit_number, awsauth)104 return state105@helper_config.create106@helper_config.update107def index_exists(opensearch_endpoint, indexName):108 awsauth = auth_opensearch(opensearch_endpoint)109 path = str(indexName)110 res = query_opensearch(opensearch_endpoint, awsauth, 'HEAD', path)111 if res.status_code == 200:112 logger.info("Index name: %s exists." % (indexName))113 return 'success'114 else:115 logger.info("Index name: %s doesn't exist." % (indexName))116 return 'fail'117@helper_config.create118@helper_config.update119def create_index(opensearch_endpoint, indexName):120 awsauth = auth_opensearch(opensearch_endpoint)121 payload = {122 "settings": {123 "index.mapping.ignore_malformed": "true"124 }125 }126 path = str(indexName)127 res = query_opensearch(opensearch_endpoint, awsauth, 'PUT', path, payload)128 if res.status_code == 200:129 logger.info("Create Index %s." % (indexName))...
opensearch_index_id.py
Source:opensearch_index_id.py
1from __future__ import annotations2class OpensearchIndexId:3 """4 Build OpenSearch Index Id using given endpoint and index name or resolve the index name from given resource Id.5 """6 def __init__(self, opensearch_endpoint: str, index_name: str) -> None:7 self.opensearch_endpoint = opensearch_endpoint8 self.index_name = index_name9 def make_resource_id(self):10 """11 Make resource id of OpenSearch index by concatenating given endpoint and index name.12 OpenSearch endpoint and index name concatenated using delimiter '||'.13 :param opensearch_domain: OpenSearch domain endpoint.14 :param index_name: Index name.15 :return: Resource id of OpenSearch index.16 """17 return f'{self.opensearch_endpoint}||{self.index_name}'18 @staticmethod19 def resource_id(resource_id: str) -> OpensearchIndexId:20 """21 Split given resource_id using delimiter '||' and initialize a class.22 :param resource_id: OpenSearch index resource id e.g. opensearch.eu-central-1.es.amazonaws.com||posts-3qs1999pg-c23 :return: OpensearchIndexId class instance.24 """...
opensearch_client.py
Source:opensearch_client.py
1import pytest2from b_aws_testing_framework.credentials import Credentials3from b_cfn_opensearch_index_layer.opensearch_client import OpensearchClient4from b_cfn_opensearch_index_tests.integration.infrastructure.main_stack import MainStack5@pytest.fixture(scope='session')6def opensearch_client() -> OpensearchClient:7 opensearch_endpoint = MainStack.get_output(MainStack.OPENSEARCH_DOMAIN_ENDPOINT)8 return OpensearchClient(9 boto3_session=Credentials().boto_session,10 opensearch_endpoint=opensearch_endpoint11 )12__all__ = [13 'opensearch_client'...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!