How to use _domainconfig_from_opensearch method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...130 result.get("DedicatedMasterType")131 )132 result["WarmType"] = _instancetype_to_opensearch(result.get("WarmType"))133 return result134def _domainconfig_from_opensearch(135 domain_config: Optional[DomainConfig],136) -> Optional[ElasticsearchDomainConfig]:137 if domain_config is not None:138 result = cast(ElasticsearchDomainConfig, domain_config)139 engine_version = domain_config.get("EngineVersion", {})140 result["ElasticsearchVersion"] = ElasticsearchVersionStatus(141 Options=_version_from_opensearch(engine_version.get("Options")),142 Status=cast(OptionStatus, engine_version.get("Status")),143 )144 cluster_config = domain_config.get("ClusterConfig", {})145 result["ElasticsearchClusterConfig"] = ElasticsearchClusterConfigStatus(146 Options=_clusterconfig_from_opensearch(cluster_config.get("Options")),147 Status=cluster_config.get("Status"),148 )149 result.pop("EngineVersion", None)150 result.pop("ClusterConfig", None)151 return result152def _compatible_version_list_from_opensearch(153 compatible_version_list: Optional[CompatibleVersionsList],154) -> Optional[CompatibleElasticsearchVersionsList]:155 if compatible_version_list is not None:156 return [157 CompatibleVersionsMap(158 SourceVersion=_version_from_opensearch(version_map["SourceVersion"]),159 TargetVersions=[160 _version_from_opensearch(target_version)161 for target_version in version_map["TargetVersions"]162 ],163 )164 for version_map in compatible_version_list165 ]166@contextmanager167def exception_mapper():168 """Maps an exception thrown by the OpenSearch client to an exception thrown by the ElasticSearch API."""169 try:170 yield171 except ClientError as err:172 exception_types = {173 "AccessDeniedException": AccessDeniedException,174 "BaseException": EsBaseException,175 "ConflictException": ConflictException,176 "DisabledOperationException": DisabledOperationException,177 "InternalException": InternalException,178 "InvalidPaginationTokenException": InvalidPaginationTokenException,179 "InvalidTypeException": InvalidTypeException,180 "LimitExceededException": LimitExceededException,181 "ResourceAlreadyExistsException": ResourceAlreadyExistsException,182 "ResourceNotFoundException": ResourceNotFoundException,183 "ValidationException": ValidationException,184 }185 mapped_exception_type = exception_types.get(err.response["Error"]["Code"], EsBaseException)186 raise mapped_exception_type(err.response["Error"]["Message"])187class EsProvider(EsApi):188 def create_elasticsearch_domain(189 self,190 context: RequestContext,191 domain_name: DomainName,192 elasticsearch_version: ElasticsearchVersionString = None,193 elasticsearch_cluster_config: ElasticsearchClusterConfig = None,194 ebs_options: EBSOptions = None,195 access_policies: PolicyDocument = None,196 snapshot_options: SnapshotOptions = None,197 vpc_options: VPCOptions = None,198 cognito_options: CognitoOptions = None,199 encryption_at_rest_options: EncryptionAtRestOptions = None,200 node_to_node_encryption_options: NodeToNodeEncryptionOptions = None,201 advanced_options: AdvancedOptions = None,202 log_publishing_options: LogPublishingOptions = None,203 domain_endpoint_options: DomainEndpointOptions = None,204 advanced_security_options: AdvancedSecurityOptionsInput = None,205 auto_tune_options: AutoTuneOptionsInput = None,206 tag_list: TagList = None,207 ) -> CreateElasticsearchDomainResponse:208 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)209 # If no version is given, we set our default elasticsearch version210 engine_version = (211 _version_to_opensearch(elasticsearch_version)212 if elasticsearch_version213 else constants.ELASTICSEARCH_DEFAULT_VERSION214 )215 kwargs = {216 "DomainName": domain_name,217 "EngineVersion": engine_version,218 "ClusterConfig": _clusterconfig_to_opensearch(elasticsearch_cluster_config),219 "EBSOptions": ebs_options,220 "AccessPolicies": access_policies,221 "SnapshotOptions": snapshot_options,222 "VPCOptions": vpc_options,223 "CognitoOptions": cognito_options,224 "EncryptionAtRestOptions": encryption_at_rest_options,225 "NodeToNodeEncryptionOptions": node_to_node_encryption_options,226 "AdvancedOptions": advanced_options,227 "LogPublishingOptions": log_publishing_options,228 "DomainEndpointOptions": domain_endpoint_options,229 "AdvancedSecurityOptions": advanced_security_options,230 "AutoTuneOptions": auto_tune_options,231 "TagList": tag_list,232 }233 # Filter the kwargs to not set None values at all (boto doesn't like that)234 kwargs = {key: value for key, value in kwargs.items() if value is not None}235 with exception_mapper():236 domain_status = opensearch_client.create_domain(**kwargs)["DomainStatus"]237 # record event238 event_publisher.fire_event(239 event_publisher.EVENT_ES_CREATE_DOMAIN,240 payload={"n": event_publisher.get_hash(domain_name)},241 )242 status = _domainstatus_from_opensearch(domain_status)243 return CreateElasticsearchDomainResponse(DomainStatus=status)244 def delete_elasticsearch_domain(245 self, context: RequestContext, domain_name: DomainName246 ) -> DeleteElasticsearchDomainResponse:247 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)248 with exception_mapper():249 domain_status = opensearch_client.delete_domain(250 DomainName=domain_name,251 )["DomainStatus"]252 # record event253 event_publisher.fire_event(254 event_publisher.EVENT_ES_DELETE_DOMAIN,255 payload={"n": event_publisher.get_hash(domain_name)},256 )257 status = _domainstatus_from_opensearch(domain_status)258 return DeleteElasticsearchDomainResponse(DomainStatus=status)259 def describe_elasticsearch_domain(260 self, context: RequestContext, domain_name: DomainName261 ) -> DescribeElasticsearchDomainResponse:262 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)263 with exception_mapper():264 opensearch_status = opensearch_client.describe_domain(265 DomainName=domain_name,266 )["DomainStatus"]267 status = _domainstatus_from_opensearch(opensearch_status)268 return DescribeElasticsearchDomainResponse(DomainStatus=status)269 @handler("UpdateElasticsearchDomainConfig", expand=False)270 def update_elasticsearch_domain_config(271 self, context: RequestContext, payload: UpdateElasticsearchDomainConfigRequest272 ) -> UpdateElasticsearchDomainConfigResponse:273 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)274 payload: Dict275 if "ElasticsearchClusterConfig" in payload:276 payload["ClusterConfig"] = payload["ElasticsearchClusterConfig"]277 payload["ClusterConfig"]["InstanceType"] = _instancetype_to_opensearch(278 payload["ClusterConfig"]["InstanceType"]279 )280 payload.pop("ElasticsearchClusterConfig")281 with exception_mapper():282 opensearch_config = opensearch_client.update_domain_config(**payload)["DomainConfig"]283 config = _domainconfig_from_opensearch(opensearch_config)284 return UpdateElasticsearchDomainConfigResponse(DomainConfig=config)285 def describe_elasticsearch_domains(286 self, context: RequestContext, domain_names: DomainNameList287 ) -> DescribeElasticsearchDomainsResponse:288 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)289 with exception_mapper():290 opensearch_status_list = opensearch_client.describe_domains(291 DomainNames=domain_names,292 )["DomainStatusList"]293 status_list = [_domainstatus_from_opensearch(s) for s in opensearch_status_list]294 return DescribeElasticsearchDomainsResponse(DomainStatusList=status_list)295 def list_domain_names(296 self, context: RequestContext, engine_type: EngineType = None297 ) -> ListDomainNamesResponse:298 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)299 # Only hand the EngineType param to boto if it's set300 kwargs = {}301 if engine_type:302 kwargs["EngineType"] = engine_type303 with exception_mapper():304 domain_names = opensearch_client.list_domain_names(**kwargs)["DomainNames"]305 return ListDomainNamesResponse(DomainNames=cast(Optional[DomainInfoList], domain_names))306 def list_elasticsearch_versions(307 self,308 context: RequestContext,309 max_results: MaxResults = None,310 next_token: NextToken = None,311 ) -> ListElasticsearchVersionsResponse:312 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)313 # Construct the arguments as kwargs to not set None values at all (boto doesn't like that)314 kwargs = {315 key: value316 for key, value in {"MaxResults": max_results, "NextToken": next_token}.items()317 if value is not None318 }319 with exception_mapper():320 versions = opensearch_client.list_versions(**kwargs)321 return ListElasticsearchVersionsResponse(322 ElasticsearchVersions=[323 _version_from_opensearch(version) for version in versions["Versions"]324 ],325 NextToken=versions.get(next_token),326 )327 def get_compatible_elasticsearch_versions(328 self, context: RequestContext, domain_name: DomainName = None329 ) -> GetCompatibleElasticsearchVersionsResponse:330 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)331 # Only hand the DomainName param to boto if it's set332 kwargs = {}333 if domain_name:334 kwargs["DomainName"] = domain_name335 with exception_mapper():336 compatible_versions_response = opensearch_client.get_compatible_versions(**kwargs)337 compatible_versions = compatible_versions_response.get("CompatibleVersions")338 return GetCompatibleElasticsearchVersionsResponse(339 CompatibleElasticsearchVersions=_compatible_version_list_from_opensearch(340 compatible_versions341 )342 )343 def describe_elasticsearch_domain_config(344 self, context: RequestContext, domain_name: DomainName345 ) -> DescribeElasticsearchDomainConfigResponse:346 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)347 with exception_mapper():348 domain_config = opensearch_client.describe_domain_config(DomainName=domain_name).get(349 "DomainConfig"350 )351 return DescribeElasticsearchDomainConfigResponse(352 DomainConfig=_domainconfig_from_opensearch(domain_config)353 )354 def add_tags(self, context: RequestContext, arn: ARN, tag_list: TagList) -> None:355 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)356 with exception_mapper():357 opensearch_client.add_tags(ARN=arn, TagList=tag_list)358 def list_tags(self, context: RequestContext, arn: ARN) -> ListTagsResponse:359 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)360 with exception_mapper():361 response = opensearch_client.list_tags(ARN=arn)362 return ListTagsResponse(TagList=response.get("TagList"))363 def remove_tags(self, context: RequestContext, arn: ARN, tag_keys: StringList) -> None:364 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)365 with exception_mapper():366 opensearch_client.remove_tags(ARN=arn, TagKeys=tag_keys)

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful