Best Python code snippet using localstack_python
provider.py
Source:provider.py
...84 if version.startswith("Elasticsearch_"):85 return version.split("_")[1]86 else:87 return version88def _instancetype_to_opensearch(instance_type: Optional[str]) -> Optional[str]:89 if instance_type is not None:90 return instance_type.replace("elasticsearch", "search")91def _instancetype_from_opensearch(instance_type: Optional[str]) -> Optional[str]:92 if instance_type is not None:93 return instance_type.replace("search", "elasticsearch")94def _clusterconfig_from_opensearch(95 cluster_config: Optional[ClusterConfig],96) -> Optional[ElasticsearchClusterConfig]:97 if cluster_config is not None:98 # Just take the whole typed dict and typecast it to our target type99 result = cast(ElasticsearchClusterConfig, cluster_config)100 # Adjust the instance type names101 result["InstanceType"] = _instancetype_from_opensearch(cluster_config.get("InstanceType"))102 result["DedicatedMasterType"] = _instancetype_from_opensearch(103 cluster_config.get("DedicatedMasterType")104 )105 result["WarmType"] = _instancetype_from_opensearch(cluster_config.get("WarmType"))106 return result107def _domainstatus_from_opensearch(108 domain_status: Optional[DomainStatus],109) -> Optional[ElasticsearchDomainStatus]:110 if domain_status is not None:111 # Just take the whole typed dict and typecast it to our target type112 result = cast(ElasticsearchDomainStatus, domain_status)113 # Only specifically handle keys which are named differently or their values differ (version and clusterconfig)114 result["ElasticsearchVersion"] = _version_from_opensearch(115 domain_status.get("EngineVersion")116 )117 result["ElasticsearchClusterConfig"] = _clusterconfig_from_opensearch(118 domain_status.get("ClusterConfig")119 )120 result.pop("EngineVersion", None)121 result.pop("ClusterConfig", None)122 return result123def _clusterconfig_to_opensearch(124 elasticsearch_cluster_config: Optional[ElasticsearchClusterConfig],125) -> Optional[ClusterConfig]:126 if elasticsearch_cluster_config is not None:127 result = cast(ClusterConfig, elasticsearch_cluster_config)128 result["InstanceType"] = _instancetype_to_opensearch(result.get("InstanceType"))129 result["DedicatedMasterType"] = _instancetype_to_opensearch(130 result.get("DedicatedMasterType")131 )132 result["WarmType"] = _instancetype_to_opensearch(result.get("WarmType"))133 return result134def _domainconfig_from_opensearch(135 domain_config: Optional[DomainConfig],136) -> Optional[ElasticsearchDomainConfig]:137 if domain_config is not None:138 result = cast(ElasticsearchDomainConfig, domain_config)139 engine_version = domain_config.get("EngineVersion", {})140 result["ElasticsearchVersion"] = ElasticsearchVersionStatus(141 Options=_version_from_opensearch(engine_version.get("Options")),142 Status=cast(OptionStatus, engine_version.get("Status")),143 )144 cluster_config = domain_config.get("ClusterConfig", {})145 result["ElasticsearchClusterConfig"] = ElasticsearchClusterConfigStatus(146 Options=_clusterconfig_from_opensearch(cluster_config.get("Options")),147 Status=cluster_config.get("Status"),148 )149 result.pop("EngineVersion", None)150 result.pop("ClusterConfig", None)151 return result152def _compatible_version_list_from_opensearch(153 compatible_version_list: Optional[CompatibleVersionsList],154) -> Optional[CompatibleElasticsearchVersionsList]:155 if compatible_version_list is not None:156 return [157 CompatibleVersionsMap(158 SourceVersion=_version_from_opensearch(version_map["SourceVersion"]),159 TargetVersions=[160 _version_from_opensearch(target_version)161 for target_version in version_map["TargetVersions"]162 ],163 )164 for version_map in compatible_version_list165 ]166@contextmanager167def exception_mapper():168 """Maps an exception thrown by the OpenSearch client to an exception thrown by the ElasticSearch API."""169 try:170 yield171 except ClientError as err:172 exception_types = {173 "AccessDeniedException": AccessDeniedException,174 "BaseException": EsBaseException,175 "ConflictException": ConflictException,176 "DisabledOperationException": DisabledOperationException,177 "InternalException": InternalException,178 "InvalidPaginationTokenException": InvalidPaginationTokenException,179 "InvalidTypeException": InvalidTypeException,180 "LimitExceededException": LimitExceededException,181 "ResourceAlreadyExistsException": ResourceAlreadyExistsException,182 "ResourceNotFoundException": ResourceNotFoundException,183 "ValidationException": ValidationException,184 }185 mapped_exception_type = exception_types.get(err.response["Error"]["Code"], EsBaseException)186 raise mapped_exception_type(err.response["Error"]["Message"])187class EsProvider(EsApi):188 def create_elasticsearch_domain(189 self,190 context: RequestContext,191 domain_name: DomainName,192 elasticsearch_version: ElasticsearchVersionString = None,193 elasticsearch_cluster_config: ElasticsearchClusterConfig = None,194 ebs_options: EBSOptions = None,195 access_policies: PolicyDocument = None,196 snapshot_options: SnapshotOptions = None,197 vpc_options: VPCOptions = None,198 cognito_options: CognitoOptions = None,199 encryption_at_rest_options: EncryptionAtRestOptions = None,200 node_to_node_encryption_options: NodeToNodeEncryptionOptions = None,201 advanced_options: AdvancedOptions = None,202 log_publishing_options: LogPublishingOptions = None,203 domain_endpoint_options: DomainEndpointOptions = None,204 advanced_security_options: AdvancedSecurityOptionsInput = None,205 auto_tune_options: AutoTuneOptionsInput = None,206 tag_list: TagList = None,207 ) -> CreateElasticsearchDomainResponse:208 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)209 # If no version is given, we set our default elasticsearch version210 engine_version = (211 _version_to_opensearch(elasticsearch_version)212 if elasticsearch_version213 else constants.ELASTICSEARCH_DEFAULT_VERSION214 )215 kwargs = {216 "DomainName": domain_name,217 "EngineVersion": engine_version,218 "ClusterConfig": _clusterconfig_to_opensearch(elasticsearch_cluster_config),219 "EBSOptions": ebs_options,220 "AccessPolicies": access_policies,221 "SnapshotOptions": snapshot_options,222 "VPCOptions": vpc_options,223 "CognitoOptions": cognito_options,224 "EncryptionAtRestOptions": encryption_at_rest_options,225 "NodeToNodeEncryptionOptions": node_to_node_encryption_options,226 "AdvancedOptions": advanced_options,227 "LogPublishingOptions": log_publishing_options,228 "DomainEndpointOptions": domain_endpoint_options,229 "AdvancedSecurityOptions": advanced_security_options,230 "AutoTuneOptions": auto_tune_options,231 "TagList": tag_list,232 }233 # Filter the kwargs to not set None values at all (boto doesn't like that)234 kwargs = {key: value for key, value in kwargs.items() if value is not None}235 with exception_mapper():236 domain_status = opensearch_client.create_domain(**kwargs)["DomainStatus"]237 # record event238 event_publisher.fire_event(239 event_publisher.EVENT_ES_CREATE_DOMAIN,240 payload={"n": event_publisher.get_hash(domain_name)},241 )242 status = _domainstatus_from_opensearch(domain_status)243 return CreateElasticsearchDomainResponse(DomainStatus=status)244 def delete_elasticsearch_domain(245 self, context: RequestContext, domain_name: DomainName246 ) -> DeleteElasticsearchDomainResponse:247 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)248 with exception_mapper():249 domain_status = opensearch_client.delete_domain(250 DomainName=domain_name,251 )["DomainStatus"]252 # record event253 event_publisher.fire_event(254 event_publisher.EVENT_ES_DELETE_DOMAIN,255 payload={"n": event_publisher.get_hash(domain_name)},256 )257 status = _domainstatus_from_opensearch(domain_status)258 return DeleteElasticsearchDomainResponse(DomainStatus=status)259 def describe_elasticsearch_domain(260 self, context: RequestContext, domain_name: DomainName261 ) -> DescribeElasticsearchDomainResponse:262 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)263 with exception_mapper():264 opensearch_status = opensearch_client.describe_domain(265 DomainName=domain_name,266 )["DomainStatus"]267 status = _domainstatus_from_opensearch(opensearch_status)268 return DescribeElasticsearchDomainResponse(DomainStatus=status)269 @handler("UpdateElasticsearchDomainConfig", expand=False)270 def update_elasticsearch_domain_config(271 self, context: RequestContext, payload: UpdateElasticsearchDomainConfigRequest272 ) -> UpdateElasticsearchDomainConfigResponse:273 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)274 payload: Dict275 if "ElasticsearchClusterConfig" in payload:276 payload["ClusterConfig"] = payload["ElasticsearchClusterConfig"]277 payload["ClusterConfig"]["InstanceType"] = _instancetype_to_opensearch(278 payload["ClusterConfig"]["InstanceType"]279 )280 payload.pop("ElasticsearchClusterConfig")281 with exception_mapper():282 opensearch_config = opensearch_client.update_domain_config(**payload)["DomainConfig"]283 config = _domainconfig_from_opensearch(opensearch_config)284 return UpdateElasticsearchDomainConfigResponse(DomainConfig=config)285 def describe_elasticsearch_domains(286 self, context: RequestContext, domain_names: DomainNameList287 ) -> DescribeElasticsearchDomainsResponse:288 opensearch_client = aws_stack.connect_to_service("opensearch", region_name=context.region)289 with exception_mapper():290 opensearch_status_list = opensearch_client.describe_domains(291 DomainNames=domain_names,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!