Best Python code snippet using tempest_python
test_elasticsearch_dbi.py
Source:test_elasticsearch_dbi.py
1"""2ElasticsearchDBI test case.3"""4import unittest5from util import config6from util.elasticsearch.elasticsearch_dbi import ElasticsearchDBI7class TestElasticsearchDBI(unittest.TestCase):8 """9 Unit test case for Elasticsearch Database Interface - ElasticsearchDBI class.10 """11 def setUp(self) -> None:12 """13 Create ElasticsearchDBI object + setup test index name.14 @return: None15 """16 self.es_dbi = ElasticsearchDBI.get_instance(config.ELASTICSEARCH_HOST, config.ELASTICSEARCH_PORT)17 self.test_index_name = "test_index"18 def tearDown(self) -> None:19 """20 Delete test index.21 @return: None22 """23 if self.es_dbi.index_exists(self.test_index_name):24 self.es_dbi.delete_index(self.test_index_name)25 self.es_dbi = None26 def test_create_index(self) -> None:27 """28 Test create test index.29 @return: None30 """31 if self.es_dbi.index_exists(self.test_index_name):32 self.es_dbi.delete_index(self.test_index_name)33 self.assertTrue(self.es_dbi.create_index(self.test_index_name))34 def test_refresh_index(self) -> None:35 """36 Test refresh index.37 @return: None38 """39 self.test_create_index()40 self.assertTrue(self.es_dbi.refresh_index(self.test_index_name))41 def test_get_indices(self) -> None:42 """43 Test get dictionary with all index names.44 @return: None45 """46 indices = self.es_dbi.get_indices()47 self.assertTrue(indices)48 self.assertTrue(isinstance(indices, dict))49 def test_index_exists(self) -> None:50 """51 Test index existence check.52 @return: None53 """54 indices = self.es_dbi.get_indices()55 self.assertTrue(indices)56 existing_index = list(indices.keys())[0]57 self.assertTrue(self.es_dbi.index_exists(existing_index))58 self.assertFalse(self.es_dbi.index_exists("inexistent_index_name"))59 def test_delete_index(self) -> None:60 """61 Test delete index.62 @return: None63 """64 if not self.es_dbi.index_exists(self.test_index_name):65 self.es_dbi.create_index(self.test_index_name)66 self.assertTrue(self.es_dbi.delete_index(self.test_index_name))67 def test_add_index_settings(self) -> None:68 """69 Test add index settings -> refresh_interval 1 second.70 @return: None71 """72 settings = {73 "settings": {74 "refresh_interval": "1s"75 }76 }77 if not self.es_dbi.index_exists(self.test_index_name):78 self.es_dbi.create_index(self.test_index_name)79 self.es_dbi.add_index_settings(index=self.test_index_name, settings=settings)80 def test_put_mapping(self) -> None:81 """82 Test put index mapping. Mappings must be specified when index is created.83 @return: None84 """85 temp_test_index = "test_index2"86 mapping = {87 "mappings": {88 "properties": {89 "some_string": {90 "type": "text"91 },92 "some_bool": {93 "type": "boolean"94 }95 }96 }97 }98 # create temp index99 self.es_dbi.create_index(temp_test_index)100 # put mapping101 self.es_dbi.put_mapping(index=temp_test_index, mapping=mapping)102 # insert valid document103 self.assertTrue(self.es_dbi.create_document(index=temp_test_index, _id=1, document={"some_string": "Hello",104 "some_bool": True}))105 # try insert malformed document106 self.assertFalse(self.es_dbi.create_document(index=temp_test_index, _id=1, document={"some_string": 100,107 "some_bool": "not_bool"}))108 # delete temp index109 self.es_dbi.delete_index(temp_test_index)110 def test_create_document(self) -> None:111 """112 Test create elasticsearch document in test index.113 @return: None114 """115 if not self.es_dbi.index_exists(self.test_index_name):116 self.es_dbi.create_index(self.test_index_name)117 document_id = self.es_dbi.create_document(index=self.test_index_name, _id="test_id", document={"test": "value"})118 self.assertEqual(document_id, "test_id")119 def test_get_by_id(self) -> None:120 """121 Test document retrieval by id.122 @return: None123 """124 document_id = self.es_dbi.create_document(index=self.test_index_name, _id="test_id", document={"test": "value"})125 es_document = self.es_dbi.get_document_by_id(index=self.test_index_name, _id=document_id)126 self.assertTrue(es_document)127 self.assertEqual(es_document.get("_id", None), document_id)128 def test_update_document(self) -> None:129 """130 Test update document.131 @return: None132 """133 document_id = self.es_dbi.create_document(index=self.test_index_name, _id="test_id", document={"test": "value"})134 updated = self.es_dbi.update_document(index=self.test_index_name, _id=document_id,135 document={"test": "new_value"})136 self.assertTrue(updated)137 updated_document = self.es_dbi.get_document_by_id(index=self.test_index_name, _id=document_id)138 self.assertTrue(updated_document)139 self.assertEqual(updated_document.get("_source", {}).get("test", None), "new_value")140 def test_delete_document(self) -> None:141 """142 Test delete document.143 @return: None144 """145 document_id = self.es_dbi.create_document(index=self.test_index_name, _id="test_id", document={"test": "value"})146 self.assertTrue(document_id)147 deleted = self.es_dbi.delete_document(index=self.test_index_name, _id=document_id)148 self.assertTrue(deleted)149 deleted_document = self.es_dbi.get_document_by_id(index=self.test_index_name, _id=document_id)150 self.assertFalse(deleted_document)151 def test_mget_by_id(self) -> None:152 """153 Test mget documents by ids batch.154 @return: None155 """156 document_id_1 = self.es_dbi.create_document(index=self.test_index_name, _id="test_id",157 document={"test": "value"})158 document_id_2 = self.es_dbi.create_document(index=self.test_index_name, _id="test_id2",159 document={"test": "value2"})160 documents = self.es_dbi.mget_documents_by_id(index=self.test_index_name, ids=[document_id_1, document_id_2])161 self.assertTrue(documents)162 for document in documents:163 self.assertTrue(document["found"])164 def test_search_documents(self) -> None:165 """166 Test documents search.167 @return: None168 """169 self.assertTrue(self.es_dbi.create_document(index=self.test_index_name,170 _id="test_id",171 document={"score": 100},172 refresh=True))173 self.assertTrue(self.es_dbi.create_document(index=self.test_index_name,174 _id="test_id2",175 document={"score": 50},176 refresh=True))177 # existing document178 searched_documents = self.es_dbi.search_documents(index=self.test_index_name, query_body={179 "query": {"bool": {"must": [{"term": {"score": 100}}]}}180 })181 self.assertTrue(searched_documents)182 self.assertTrue(searched_documents["hits"]["hits"])183 for searched_document in searched_documents["hits"]["hits"]:184 self.assertEqual(searched_document["_source"]["score"], 100)185 # non-existing document186 searched_documents = self.es_dbi.search_documents(index=self.test_index_name, query_body={187 "query": {"bool": {"must": [{"term": {"score": 75}}]}}188 })189 self.assertTrue(searched_documents)190 self.assertFalse(searched_documents["hits"]["hits"])191 def test_scroll_search_documents(self) -> None:192 """193 Test scroll search using generator.194 @return: None195 """196 at_least_one_doc = False197 self.assertTrue(self.es_dbi.create_document(index=self.test_index_name,198 _id="test_id",199 document={"test": "value"},200 refresh=True))201 for es_doc in self.es_dbi.scroll_search_documents_generator(index=self.test_index_name):202 at_least_one_doc = True203 self.assertTrue(es_doc)204 self.assertTrue(es_doc["_id"])205 self.assertTrue(es_doc["_source"])206 self.assertTrue(at_least_one_doc)207 def test_bulk(self) -> None:208 """209 Test bulk operations: insert, update, delete.210 @return: None211 """212 # bulk insert213 actions = [214 {"_index": self.test_index_name,215 "_id": "document_id_1",216 "_source": {"test": "value"}},217 {"_index": self.test_index_name,218 "_id": "document_id_2",219 "_source": {"test": "value"}},220 ]221 success, failed = self.es_dbi.bulk(actions=actions, chunk_size=len(actions))222 self.assertEqual(success, 2)223 self.assertFalse(failed)224 # bulk update225 actions = [226 {"_op_type": "update",227 "_index": self.test_index_name,228 "_id": "document_id_1",229 "doc": {"test": "value2"}},230 {"_op_type": "update",231 "_index": self.test_index_name,232 "_id": "document_id_2",233 "doc": {"test": "value3"}},234 ]235 success, failed = self.es_dbi.bulk(actions=actions, chunk_size=len(actions))236 self.assertEqual(success, 2)237 self.assertFalse(failed)238 # bulk delete239 actions = [240 {"_op_type": "delete",241 "_index": self.test_index_name,242 "_id": "document_id_1"},243 {"_op_type": "delete",244 "_index": self.test_index_name,245 "_id": "document_id_2"},246 ]247 success, failed = self.es_dbi.bulk(actions=actions, chunk_size=len(actions))248 self.assertEqual(success, 2)249 self.assertFalse(failed)250if __name__ == "__main__":...
opensearch_test.py
Source:opensearch_test.py
1# -*- coding: utf-8 -*-2# Copyright 2020 Google LLC3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# https://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15"""Tests for opensearch datastore."""16import unittest17import mock18from opensearchpy import exceptions19from dfdewey.datastore.opensearch import OpenSearchDataStore20TEST_INDEX_NAME = ''.join(('es', 'd41d8cd98f00b204e9800998ecf8427e'))21class OpenSearchTest(unittest.TestCase):22 """Tests for OpenSearch datastore."""23 def _get_datastore(self):24 """Get a mock opensearch datastore.25 Returns:26 Mock opensearch datastore.27 """28 es = OpenSearchDataStore()29 return es30 def test_build_query(self):31 """Test build query method."""32 es = self._get_datastore()33 query_string = 'test'34 query = es.build_query(query_string)35 query_dsl = {36 'query': {37 'bool': {38 'must': [{39 'query_string': {40 'query': 'test'41 }42 }]43 }44 }45 }46 self.assertEqual(query, query_dsl)47 @mock.patch('opensearchpy.client.IndicesClient.create')48 @mock.patch('opensearchpy.client.IndicesClient.exists')49 def test_create_index(self, mock_exists, mock_create):50 """Test create index method."""51 es = self._get_datastore()52 mock_exists.return_value = False53 result = es.create_index(TEST_INDEX_NAME)54 self.assertEqual(result, TEST_INDEX_NAME)55 mock_create.side_effect = exceptions.ConnectionError56 with self.assertRaises(RuntimeError):57 result = es.create_index(TEST_INDEX_NAME)58 @mock.patch('opensearchpy.client.IndicesClient.delete')59 @mock.patch('opensearchpy.client.IndicesClient.exists')60 def test_delete_index(self, mock_exists, mock_delete):61 """Test delete index method."""62 es = self._get_datastore()63 mock_exists.return_value = True64 es.delete_index(TEST_INDEX_NAME)65 mock_delete.assert_called_once_with(index=TEST_INDEX_NAME)66 mock_delete.side_effect = exceptions.ConnectionError67 with self.assertRaises(RuntimeError):68 es.delete_index(TEST_INDEX_NAME)69 def test_import_event(self):70 """Test import event method."""71 es = self._get_datastore()72 with mock.patch.object(es.client, 'bulk') as mock_bulk:73 result = es.import_event(TEST_INDEX_NAME)74 self.assertEqual(result, 0)75 mock_bulk.assert_not_called()76 es.import_events = [{77 'index': {78 '_index': 'esd41d8cd98f00b204e9800998ecf8427e'79 }80 }, {81 'image': 'd41d8cd98f00b204e9800998ecf8427e',82 'offset': 1048579,83 'file_offset': None,84 'data': 'NTFS \n'85 }, {86 'index': {87 '_index': 'esd41d8cd98f00b204e9800998ecf8427e'88 }89 }, {90 'index': {91 '_index': 'esd41d8cd98f00b204e9800998ecf8427e'92 }93 }, {94 'image': 'd41d8cd98f00b204e9800998ecf8427e',95 'offset': 1048755,96 'file_offset': None,97 'data': 'press any key to try again ... \n'98 }]99 result = es.import_event(TEST_INDEX_NAME)100 self.assertEqual(result, 0)101 mock_bulk.assert_called_once()102 test_event = {103 'image': 'd41d8cd98f00b204e9800998ecf8427e',104 'offset': 1048579,105 'file_offset': None,106 'data': 'NTFS \n'107 }108 result = es.import_event(TEST_INDEX_NAME, test_event, flush_interval=1)109 self.assertEqual(result, 1)110 @mock.patch('opensearchpy.client.IndicesClient.exists')111 def test_index_exists(self, mock_exists):112 """Test index exists method."""113 es = self._get_datastore()114 es.index_exists(TEST_INDEX_NAME)115 mock_exists.assert_called_once_with(TEST_INDEX_NAME)116 @mock.patch('opensearchpy.OpenSearch.search')117 @mock.patch('opensearchpy.client.IndicesClient.exists')118 def test_search(self, mock_exists, mock_search):119 """Test search method."""120 es = self._get_datastore()121 mock_exists.return_value = True122 search_results = {123 'took': 24,124 'timed_out': False,125 '_shards': {126 'total': 1,127 'successful': 1,128 'skipped': 0,129 'failed': 0130 },131 'hits': {132 'total': {133 'value': 2,134 'relation': 'eq'135 },136 'max_score':137 13.436,138 'hits': [{139 '_index': 'esd41d8cd98f00b204e9800998ecf8427e',140 '_type': '_doc',141 '_id': '7gST1HUBuaTSqxk-XzDA',142 '_score': 13.436,143 '_source': {144 'image': 'd41d8cd98f00b204e9800998ecf8427e',145 'offset': 1048755,146 'file_offset': None,147 'data': 'press any key to try again ... \n'148 }149 }, {150 '_index': 'esd41d8cd98f00b204e9800998ecf8427e',151 '_type': '_doc',152 '_id': 'oAST1HUBuaTSqxk-XzLD',153 '_score': 13.436,154 '_source': {155 'image': 'd41d8cd98f00b204e9800998ecf8427e',156 'offset': 10485427,157 'file_offset': None,158 'data': 'press any key to try again ... \n'159 }160 }]161 }162 }163 mock_search.return_value = search_results164 results = es.search(TEST_INDEX_NAME, '"any key"')165 self.assertEqual(results, search_results)166if __name__ == '__main__':...
test_change_index.py
Source:test_change_index.py
1import pytest2from chpip import exception3from chpip.core import DEFAULT_INDEX_NAME, DEFAULT_INDEX_URL4class TestChangeIndex(object):5 TEST_INDEX_NAME = 'test'6 TEST_INDEX_URL = 'https://test.com'7 def test_change_no_available_indexes(self, chpip_manager):8 with pytest.raises(exception.NoAvailableIndex):9 chpip_manager.change_index()10 def test_change_index_not_found(self, chpip_manager):11 chpip_manager.set_index(self.TEST_INDEX_NAME, self.TEST_INDEX_URL)12 with pytest.raises(exception.IndexNameNotFound):13 chpip_manager.change_index('not_found')14 def test_change_index_without_name(self, chpip_manager):15 chpip_manager.set_index(self.TEST_INDEX_NAME, self.TEST_INDEX_URL)16 name = chpip_manager.change_index()17 assert name == self.TEST_INDEX_NAME18 pip_conf = chpip_manager.get_pip_config()19 assert pip_conf['global']['index-url'] == self.TEST_INDEX_URL20 name = chpip_manager.change_index()21 assert name == DEFAULT_INDEX_NAME22 pip_conf = chpip_manager.get_pip_config()23 assert pip_conf['global']['index-url'] == DEFAULT_INDEX_URL24 name = chpip_manager.change_index()25 assert name == self.TEST_INDEX_NAME26 pip_conf = chpip_manager.get_pip_config()27 assert pip_conf['global']['index-url'] == self.TEST_INDEX_URL28 def test_change_index_with_name(self, chpip_manager):29 chpip_manager.set_index(self.TEST_INDEX_NAME, self.TEST_INDEX_URL)30 name = chpip_manager.change_index(self.TEST_INDEX_NAME)31 assert name == self.TEST_INDEX_NAME32 pip_conf = chpip_manager.get_pip_config()33 assert pip_conf['global']['index-url'] == self.TEST_INDEX_URL34 name = chpip_manager.change_index(DEFAULT_INDEX_NAME)35 assert name == DEFAULT_INDEX_NAME36 pip_conf = chpip_manager.get_pip_config()37 assert pip_conf['global']['index-url'] == DEFAULT_INDEX_URL38 name = chpip_manager.change_index(DEFAULT_INDEX_NAME)39 assert name == DEFAULT_INDEX_NAME40 pip_conf = chpip_manager.get_pip_config()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!