Best Python code snippet using localstack_python
cluster.py
Source:cluster.py
...26 tmp: str27 mods: str28 data: str29 backup: str30def get_cluster_health_status(url: str) -> Optional[str]:31 """32 Queries the health endpoint of OpenSearch/Elasticsearch and returns either the status ('green', 'yellow',33 ...) or None if the response returned a non-200 response.34 """35 resp = requests.get(url + "/_cluster/health")36 if resp and resp.ok:37 opensearch_status = resp.json()38 opensearch_status = opensearch_status["status"]39 return opensearch_status40 return None41def init_directories(dirs: Directories):42 """Makes sure the directories exist and have the necessary permissions."""43 LOG.debug("initializing cluster directories %s", dirs)44 chmod_r(dirs.install, 0o777)45 if not dirs.data.startswith(config.dirs.data):46 # only clear previous data if it's not in DATA_DIR47 rm_rf(dirs.data)48 rm_rf(dirs.tmp)49 mkdir(dirs.tmp)50 chmod_r(dirs.tmp, 0o777)51 mkdir(dirs.data)52 chmod_r(dirs.data, 0o777)53 mkdir(dirs.backup)54 chmod_r(dirs.backup, 0o777)55 # clear potentially existing lock files (which cause problems since ES 7.10)56 for d, dirs, files in os.walk(dirs.data, True):57 for f in files:58 if f.endswith(".lock"):59 rm_rf(os.path.join(d, f))60def resolve_directories(version: str, cluster_path: str, data_root: str = None) -> Directories:61 """62 Determines directories to find the opensearch binary as well as where to store the instance data.63 :param version: the full OpenSearch/Elasticsearch version (to resolve the install dir)64 :param cluster_path: the path between data_root and the actual data directories65 :param data_root: the root of the data dir (will be resolved to TMP_PATH or DATA_DIR by default)66 :returns: a Directories data structure67 """68 # where to find cluster binary and the modules69 engine_type, install_version = versions.get_install_type_and_version(version)70 if engine_type == EngineType.OpenSearch:71 install_dir = install.get_opensearch_install_dir(install_version)72 else:73 # Elasticsearch version74 install_dir = install.get_elasticsearch_install_dir(install_version)75 modules_dir = os.path.join(install_dir, "modules")76 if not data_root:77 data_root = config.dirs.data or config.dirs.tmp78 if engine_type == EngineType.OpenSearch:79 data_path = os.path.join(data_root, "opensearch", cluster_path)80 else:81 data_path = os.path.join(data_root, "elasticsearch", cluster_path)82 tmp_dir = os.path.join(data_path, "tmp")83 data_dir = os.path.join(data_path, "data")84 backup_dir = os.path.join(data_path, "backup")85 return Directories(install_dir, tmp_dir, modules_dir, data_dir, backup_dir)86def build_cluster_run_command(cluster_bin: str, settings: CommandSettings) -> List[str]:87 """88 Takes the command settings dict and builds the actual command (which can then be executed as a shell command).89 :param cluster_bin: path to the OpenSearch/Elasticsearch binary (including the binary)90 :param settings: dictionary where each item will be set as a command arguments91 :return: list of strings for the command with the settings to be executed as a shell command92 """93 cmd_settings = [f"-E {k}={v}" for k, v, in settings.items()]94 return [cluster_bin] + cmd_settings95class OpensearchCluster(Server):96 """Manages an OpenSearch cluster which is installed an operated by LocalStack."""97 # TODO: legacy default port should be removed here98 def __init__(99 self, port=4571, host="localhost", version: str = None, directories: Directories = None100 ) -> None:101 super().__init__(port, host)102 self._version = version or self.default_version103 self.command_settings = {}104 self.directories = directories or self._resolve_directories()105 @property106 def default_version(self) -> str:107 return constants.OPENSEARCH_DEFAULT_VERSION108 @property109 def version(self) -> str:110 return self._version111 @property112 def install_version(self) -> str:113 _, install_version = versions.get_install_type_and_version(self._version)114 return install_version115 @property116 def bin_name(self) -> str:117 return "opensearch"118 @property119 def os_user(self):120 return constants.OS_USER_OPENSEARCH121 def health(self) -> Optional[str]:122 return get_cluster_health_status(self.url)123 def do_start_thread(self) -> FuncThread:124 self._ensure_installed()125 self._init_directories()126 cmd = self._create_run_command(additional_settings=self.command_settings)127 cmd = " ".join(cmd)128 if is_root() and self.os_user:129 # run the opensearch process as a non-root user (when running in docker)130 cmd = f"su {self.os_user} -c '{cmd}'"131 env_vars = self._create_env_vars()132 LOG.info("starting %s: %s with env %s", self.bin_name, cmd, env_vars)133 t = ShellCommandThread(134 cmd,135 env_vars=env_vars,136 strip_color=True,137 log_listener=self._log_listener,138 )139 t.start()140 return t141 def _resolve_directories(self) -> Directories:142 return resolve_directories(version=self.version, cluster_path=self.version)143 def _ensure_installed(self):144 install.install_opensearch(self.version)145 def _init_directories(self):146 init_directories(self.directories)147 def _base_settings(self, dirs) -> CommandSettings:148 settings = {149 "http.port": self.port,150 "http.publish_port": self.port,151 "transport.port": "0",152 "network.host": self.host,153 "http.compression": "false",154 "path.data": f'"{dirs.data}"',155 "path.repo": f'"{dirs.backup}"',156 "plugins.security.disabled": "true",157 }158 if os.path.exists(os.path.join(dirs.mods, "x-pack-ml")):159 settings["xpack.ml.enabled"] = "false"160 return settings161 def _create_run_command(162 self, additional_settings: Optional[CommandSettings] = None163 ) -> List[str]:164 # delete opensearch data that may be cached locally from a previous test run165 dirs = self.directories166 bin_path = os.path.join(self.directories.install, "bin", self.bin_name)167 settings = self._base_settings(dirs)168 if additional_settings:169 settings.update(additional_settings)170 cmd = build_cluster_run_command(bin_path, settings)171 return cmd172 def _create_env_vars(self) -> Dict:173 return {174 "OPENSEARCH_JAVA_OPTS": os.environ.get("OPENSEARCH_JAVA_OPTS", "-Xms200m -Xmx600m"),175 "OPENSEARCH_TMPDIR": self.directories.tmp,176 }177 def _log_listener(self, line, **_kwargs):178 # logging the port before each line to be able to connect logs to specific instances179 LOG.info("[%s] %s", self.port, line.rstrip())180class CustomEndpoint:181 """182 Encapsulates a custom endpoint (combines CustomEndpoint and CustomEndpointEnabled within the DomainEndpointOptions183 of the cluster, i.e. combines two fields from the AWS OpenSearch service model).184 """185 enabled: bool186 endpoint: str187 def __init__(self, enabled: bool, endpoint: str) -> None:188 """189 :param enabled: true if the custom endpoint is enabled (refers to DomainEndpointOptions#CustomEndpointEnabled)190 :param endpoint: defines the endpoint (i.e. the URL - refers to DomainEndpointOptions#CustomEndpoint)191 """192 self.enabled = enabled193 self.endpoint = endpoint194 if self.endpoint:195 self.url = urlparse(endpoint)196 else:197 self.url = None198class EdgeProxiedOpensearchCluster(Server):199 """200 Opensearch-backed Server that can be routed through the edge proxy using an UrlMatchingForwarder to forward201 requests to the backend cluster.202 """203 def __init__(self, url: str, version=None, directories: Directories = None) -> None:204 self._url = urlparse(url)205 super().__init__(206 host=self._url.hostname,207 port=self._url.port,208 )209 self._version = version or self.default_version210 self.cluster = None211 self.cluster_port = None212 self.proxy = None213 self.directories = directories214 @property215 def version(self):216 return self._version217 @property218 def default_version(self):219 return constants.OPENSEARCH_DEFAULT_VERSION220 @property221 def url(self) -> str:222 return self._url.geturl()223 def is_up(self):224 # check service lifecycle225 if not self.cluster:226 return False227 if not self.cluster.is_up():228 return False229 return super().is_up()230 def health(self):231 """calls the health endpoint of cluster through the proxy, making sure implicitly that both are running"""232 return get_cluster_health_status(self.url)233 def _backend_cluster(self) -> OpensearchCluster:234 return OpensearchCluster(235 port=self.cluster_port,236 host=DEFAULT_BACKEND_HOST,237 version=self.version,238 directories=self.directories,239 )240 def do_run(self):241 self.cluster_port = get_free_tcp_port()242 self.cluster = self._backend_cluster()243 self.cluster.start()244 self.proxy = EndpointProxy(self.url, self.cluster.url)245 LOG.info("registering an endpoint proxy for %s => %s", self.url, self.cluster.url)246 self.proxy.register()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!