Best Python code snippet using localstack_python
provider.py
Source:provider.py
...161 def __init__(self):162 self.opensearch_domains = {}163def get_domain_config(domain_key) -> DomainConfig:164 status = get_domain_status(domain_key)165 return _status_to_config(status)166def _status_to_config(status: DomainStatus) -> DomainConfig:167 cluster_cfg = status.get("ClusterConfig") or {}168 default_cfg = DEFAULT_OPENSEARCH_CLUSTER_CONFIG169 config_status = get_domain_config_status()170 return DomainConfig(171 AccessPolicies=AccessPoliciesStatus(172 Options=PolicyDocument(""),173 Status=config_status,174 ),175 AdvancedOptions=AdvancedOptionsStatus(176 Options={177 "override_main_response_version": "false",178 "rest.action.multi.allow_explicit_index": "true",179 },180 Status=config_status,181 ),182 EBSOptions=EBSOptionsStatus(183 Options=EBSOptions(184 EBSEnabled=True,185 VolumeSize=100,186 VolumeType=VolumeType.gp2,187 ),188 Status=config_status,189 ),190 ClusterConfig=ClusterConfigStatus(191 Options=ClusterConfig(192 DedicatedMasterCount=cluster_cfg.get(193 "DedicatedMasterCount", default_cfg["DedicatedMasterCount"]194 ),195 DedicatedMasterEnabled=cluster_cfg.get(196 "DedicatedMasterEnabled", default_cfg["DedicatedMasterEnabled"]197 ),198 DedicatedMasterType=cluster_cfg.get(199 "DedicatedMasterType", default_cfg["DedicatedMasterType"]200 ),201 InstanceCount=cluster_cfg.get("InstanceCount", default_cfg["InstanceCount"]),202 InstanceType=cluster_cfg.get("InstanceType", default_cfg["InstanceType"]),203 ZoneAwarenessEnabled=cluster_cfg.get(204 "ZoneAwarenessEnabled", default_cfg["ZoneAwarenessEnabled"]205 ),206 ),207 Status=config_status,208 ),209 CognitoOptions=CognitoOptionsStatus(210 Options=CognitoOptions(Enabled=False), Status=config_status211 ),212 EngineVersion=VersionStatus(Options=status.get("EngineVersion"), Status=config_status),213 EncryptionAtRestOptions=EncryptionAtRestOptionsStatus(214 Options=EncryptionAtRestOptions(Enabled=False),215 Status=config_status,216 ),217 LogPublishingOptions=LogPublishingOptionsStatus(218 Options={},219 Status=config_status,220 ),221 SnapshotOptions=SnapshotOptionsStatus(222 Options=SnapshotOptions(AutomatedSnapshotStartHour=randint(0, 23)),223 Status=config_status,224 ),225 VPCOptions=VPCDerivedInfoStatus(226 Options={},227 Status=config_status,228 ),229 DomainEndpointOptions=DomainEndpointOptionsStatus(230 Options=status.get("DomainEndpointOptions", {}),231 Status=config_status,232 ),233 NodeToNodeEncryptionOptions=NodeToNodeEncryptionOptionsStatus(234 Options=NodeToNodeEncryptionOptions(Enabled=False),235 Status=config_status,236 ),237 AdvancedSecurityOptions=AdvancedSecurityOptionsStatus(238 Options=status.get("AdvancedSecurityOptions", {}), Status=config_status239 ),240 AutoTuneOptions=AutoTuneOptionsStatus(241 Options=AutoTuneOptions(242 DesiredState=AutoTuneDesiredState.ENABLED,243 RollbackOnDisable=RollbackOnDisable.NO_ROLLBACK,244 MaintenanceSchedules=[],245 ),246 Status=AutoTuneStatus(247 CreationDate=config_status.get("CreationDate"),248 UpdateDate=config_status.get("UpdateDate"),249 UpdateVersion=config_status.get("UpdateVersion"),250 State=AutoTuneState.ENABLED,251 PendingDeletion=config_status.get("PendingDeletion"),252 ),253 ),254 )255def get_domain_config_status() -> OptionStatus:256 return OptionStatus(257 CreationDate=datetime.now(),258 PendingDeletion=False,259 State=OptionState.Active,260 UpdateDate=datetime.now(),261 UpdateVersion=randint(1, 100),262 )263def get_domain_status(domain_key: DomainKey, deleted=False) -> DomainStatus:264 region = OpenSearchServiceBackend.get(domain_key.region)265 stored_status: DomainStatus = (266 region.opensearch_domains.get(domain_key.domain_name) or DomainStatus()267 )268 cluster_cfg = stored_status.get("ClusterConfig") or {}269 default_cfg = DEFAULT_OPENSEARCH_CLUSTER_CONFIG270 new_status = DomainStatus(271 ARN=domain_key.arn,272 Created=True,273 Deleted=deleted,274 Processing=stored_status.get("Processing", True),275 DomainId=f"{domain_key.account}/{domain_key.domain_name}",276 DomainName=domain_key.domain_name,277 ClusterConfig=ClusterConfig(278 DedicatedMasterCount=cluster_cfg.get(279 "DedicatedMasterCount", default_cfg["DedicatedMasterCount"]280 ),281 DedicatedMasterEnabled=cluster_cfg.get(282 "DedicatedMasterEnabled", default_cfg["DedicatedMasterEnabled"]283 ),284 DedicatedMasterType=cluster_cfg.get(285 "DedicatedMasterType", default_cfg["DedicatedMasterType"]286 ),287 InstanceCount=cluster_cfg.get("InstanceCount", default_cfg["InstanceCount"]),288 InstanceType=cluster_cfg.get("InstanceType", default_cfg["InstanceType"]),289 ZoneAwarenessEnabled=cluster_cfg.get(290 "ZoneAwarenessEnabled", default_cfg["ZoneAwarenessEnabled"]291 ),292 WarmEnabled=False,293 ColdStorageOptions=ColdStorageOptions(Enabled=False),294 ),295 EngineVersion=stored_status.get("EngineVersion") or OPENSEARCH_DEFAULT_VERSION,296 Endpoint=stored_status.get("Endpoint", None),297 EBSOptions=EBSOptions(EBSEnabled=True, VolumeType=VolumeType.gp2, VolumeSize=10, Iops=0),298 CognitoOptions=CognitoOptions(Enabled=False),299 UpgradeProcessing=False,300 AccessPolicies="",301 SnapshotOptions=SnapshotOptions(AutomatedSnapshotStartHour=0),302 EncryptionAtRestOptions=EncryptionAtRestOptions(Enabled=False),303 NodeToNodeEncryptionOptions=NodeToNodeEncryptionOptions(Enabled=False),304 AdvancedOptions={305 "override_main_response_version": "false",306 "rest.action.multi.allow_explicit_index": "true",307 },308 ServiceSoftwareOptions=ServiceSoftwareOptions(309 CurrentVersion="",310 NewVersion="",311 UpdateAvailable=False,312 Cancellable=False,313 UpdateStatus=DeploymentStatus.COMPLETED,314 Description="There is no software update available for this domain.",315 AutomatedUpdateDate=datetime.fromtimestamp(0, tz=timezone.utc),316 OptionalDeployment=True,317 ),318 DomainEndpointOptions=DomainEndpointOptions(319 EnforceHTTPS=False,320 TLSSecurityPolicy=TLSSecurityPolicy.Policy_Min_TLS_1_0_2019_07,321 CustomEndpointEnabled=False,322 ),323 AdvancedSecurityOptions=AdvancedSecurityOptions(324 Enabled=False, InternalUserDatabaseEnabled=False325 ),326 AutoTuneOptions=AutoTuneOptionsOutput(State=AutoTuneState.ENABLE_IN_PROGRESS),327 )328 if stored_status.get("Endpoint"):329 new_status["Endpoint"] = new_status.get("Endpoint")330 return new_status331def _ensure_domain_exists(arn: ARN) -> None:332 """333 Checks if the domain for the given ARN exists. Otherwise, a ValidationException is raised.334 :param arn: ARN string to lookup the domain for335 :return: None if the domain exists, otherwise raises an exception336 :raises: ValidationException if the domain for the given ARN cannot be found337 """338 domain_key = DomainKey.from_arn(arn)339 region = OpenSearchServiceBackend.get(domain_key.region)340 domain_status = region.opensearch_domains.get(domain_key.domain_name)341 if domain_status is None:342 raise ValidationException("Invalid ARN. Domain not found.")343def _update_domain_config_request_to_status(request: UpdateDomainConfigRequest) -> DomainStatus:344 request: Dict345 request.pop("DryRun", None)346 request.pop("DomainName", None)347 return request348class OpensearchProvider(OpensearchApi):349 def create_domain(350 self,351 context: RequestContext,352 domain_name: DomainName,353 engine_version: VersionString = None,354 cluster_config: ClusterConfig = None,355 ebs_options: EBSOptions = None,356 access_policies: PolicyDocument = None,357 snapshot_options: SnapshotOptions = None,358 vpc_options: VPCOptions = None,359 cognito_options: CognitoOptions = None,360 encryption_at_rest_options: EncryptionAtRestOptions = None,361 node_to_node_encryption_options: NodeToNodeEncryptionOptions = None,362 advanced_options: AdvancedOptions = None,363 log_publishing_options: LogPublishingOptions = None,364 domain_endpoint_options: DomainEndpointOptions = None,365 advanced_security_options: AdvancedSecurityOptionsInput = None,366 tag_list: TagList = None,367 auto_tune_options: AutoTuneOptionsInput = None,368 ) -> CreateDomainResponse:369 region = OpenSearchServiceBackend.get()370 with _domain_mutex:371 if domain_name in region.opensearch_domains:372 raise ResourceAlreadyExistsException(373 f"domain {domain_name} already exists in region {region.name}"374 )375 domain_key = DomainKey(376 domain_name=domain_name,377 region=context.region,378 account=context.account_id,379 )380 # "create" domain data381 region.opensearch_domains[domain_name] = get_domain_status(domain_key)382 # lazy-init the cluster (sets the Endpoint and Processing flag of the domain status)383 # TODO handle additional parameters (cluster config,...)384 create_cluster(domain_key, engine_version, domain_endpoint_options)385 # set the tags386 self.add_tags(context, domain_key.arn, tag_list)387 # get the (updated) status388 status = get_domain_status(domain_key)389 # record event390 event_publisher.fire_event(391 event_publisher.EVENT_OPENSEARCH_CREATE_DOMAIN,392 payload={"n": event_publisher.get_hash(domain_name)},393 )394 return CreateDomainResponse(DomainStatus=status)395 def delete_domain(396 self, context: RequestContext, domain_name: DomainName397 ) -> DeleteDomainResponse:398 domain_key = DomainKey(399 domain_name=domain_name,400 region=context.region,401 account=context.account_id,402 )403 region = OpenSearchServiceBackend.get(domain_key.region)404 with _domain_mutex:405 if domain_name not in region.opensearch_domains:406 raise ResourceNotFoundException(f"Domain not found: {domain_name}")407 status = get_domain_status(domain_key, deleted=True)408 _remove_cluster(domain_key)409 # record event410 event_publisher.fire_event(411 event_publisher.EVENT_OPENSEARCH_DELETE_DOMAIN,412 payload={"n": event_publisher.get_hash(domain_name)},413 )414 return DeleteDomainResponse(DomainStatus=status)415 def describe_domain(416 self, context: RequestContext, domain_name: DomainName417 ) -> DescribeDomainResponse:418 domain_key = DomainKey(419 domain_name=domain_name,420 region=context.region,421 account=context.account_id,422 )423 region = OpenSearchServiceBackend.get(domain_key.region)424 with _domain_mutex:425 if domain_name not in region.opensearch_domains:426 raise ResourceNotFoundException(f"Domain not found: {domain_name}")427 status = get_domain_status(domain_key)428 return DescribeDomainResponse(DomainStatus=status)429 @handler("UpdateDomainConfig", expand=False)430 def update_domain_config(431 self, context: RequestContext, payload: UpdateDomainConfigRequest432 ) -> UpdateDomainConfigResponse:433 domain_key = DomainKey(434 domain_name=payload["DomainName"],435 region=context.region,436 account=context.account_id,437 )438 region = OpenSearchServiceBackend.get(domain_key.region)439 with _domain_mutex:440 domain_status = region.opensearch_domains.get(domain_key.domain_name, None)441 if domain_status is None:442 raise ResourceNotFoundException(f"Domain not found: {domain_key.domain_name}")443 status_update: Dict = _update_domain_config_request_to_status(payload)444 domain_status.update(status_update)445 return UpdateDomainConfigResponse(DomainConfig=_status_to_config(domain_status))446 def describe_domains(447 self, context: RequestContext, domain_names: DomainNameList448 ) -> DescribeDomainsResponse:449 status_list = []450 with _domain_mutex:451 for domain_name in domain_names:452 try:453 domain_status = self.describe_domain(context, domain_name)["DomainStatus"]454 status_list.append(domain_status)455 except ResourceNotFoundException:456 # ResourceNotFoundExceptions are ignored, we just look for the next domain.457 # If no domain can be found, the result will just be empty.458 pass459 return DescribeDomainsResponse(DomainStatusList=status_list)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!