Best Python code snippet using localstack_python
indexManagementClient.py
Source:indexManagementClient.py
1#!/bin/python2import os, sys3import json4from elasticsearch import Elasticsearch5from ssl import create_default_context6def getEsClient():7 esService = os.environ['ES_SERVICE']8 connectTimeout = os.getenv('CONNECT_TIMEOUT', 30)9 tokenFile = open('/var/run/secrets/kubernetes.io/serviceaccount/token', 'r')10 bearer_token = tokenFile.read()11 context = create_default_context(cafile='/etc/indexmanagement/keys/admin-ca')12 es_client = Elasticsearch([esService],13 timeout=connectTimeout,14 max_retries=5,15 retry_on_timeout=True,16 headers={"authorization": f"Bearer {bearer_token}"},17 ssl_context=context)18 return es_client19def getAlias(alias):20 try:21 es_client = getEsClient()22 return json.dumps(es_client.indices.get_alias(name=alias))23 except:24 return ""25def getIndicesAgeForAlias(alias):26 try:27 es_client = getEsClient()28 return json.dumps(es_client.indices.get_settings(index=alias, name="index.creation_date"))29 except:30 return ""31def deleteIndices(index):32 original_stdout = sys.stdout33 try:34 es_client = getEsClient()35 response = es_client.indices.delete(index=index)36 return True37 except Exception as e:38 sys.stdout = open('/tmp/response.txt', 'w')39 print(e)40 sys.stdout = original_stdout41 return False42def removeAsWriteIndexForAlias(index, alias):43 es_client = getEsClient()44 response = es_client.indices.update_aliases({45 "actions": [46 {"add":{"index": f"{index}", "alias": f"{alias}", "is_write_index": False}}47 ]48 })49 return response['acknowledged']50def catWriteAliases(policy):51 try:52 es_client = getEsClient()53 alias_name = f"{policy}*-write"54 response = es_client.cat.aliases(name=alias_name, h="alias")55 response_list = list(response.split("\n"))56 return " ".join(sorted(set(response_list)))57 except:58 return ""59def rolloverForPolicy(alias, decoded):60 original_stdout = sys.stdout61 try:62 es_client = getEsClient()63 response = es_client.indices.rollover(alias=alias, body=decoded)64 sys.stdout = open('/tmp/response.txt', 'w')65 print(json.dumps(response))66 sys.stdout = original_stdout67 return True68 except:69 return False70def checkIndexExists(index):71 original_stdout = sys.stdout72 try:73 es_client = getEsClient()74 return es_client.indices.exists(index=index)75 except Exception as e:76 sys.stdout = open('/tmp/response.txt', 'w')77 print(e)78 sys.stdout = original_stdout79 return False80def updateWriteIndex(currentIndex, nextIndex, alias):81 original_stdout = sys.stdout82 try:83 es_client = getEsClient()84 response = es_client.indices.update_aliases({85 "actions": [86 {"add":{"index": f"{currentIndex}", "alias": f"{alias}", "is_write_index": False}},87 {"add":{"index": f"{nextIndex}", "alias": f"{alias}", "is_write_index": True}}88 ]89 })90 return response['acknowledged']91 except Exception as e:92 sys.stdout = open('/tmp/response.txt', 'w')93 print(e)94 sys.stdout = original_stdout...
elastic.py
Source:elastic.py
...4from tests.functional.settings import settings5from tests.functional.testdata.data_for_es import data_for_elastic6from tests.functional.testdata.indexes import indexes7@pytest.fixture(scope='session')8async def es_client():9 client = AsyncElasticsearch(hosts=f'{settings.ES_HOST}:{settings.ES_PORT}')10 yield client11 await client.close()12@pytest.fixture(autouse=True)13async def create_es_data(es_client):14 for item in ['movies', 'genres', 'persons']:15 index = indexes.get(item)16 if not await es_client.indices.exists(index=item):17 await es_client.indices.create(index=item, body=index)18 await es_client.bulk(data_for_elastic())19 await asyncio.sleep(1)20 yield...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!