Best Python code snippet using localstack_python
plugins.py
Source:plugins.py
...102 ):103 self.plugin_name = name104 self.start_function = start105 self.listener = listener106 self.check_function = check if check is not _default else local_api_checker(name)107 self.default_active = active108 self.stop_function = stop109 self.lifecycle_hook = lifecycle_hook or ServiceLifecycleHook()110 call_safe(self.lifecycle_hook.on_after_init)111 def start(self, asynchronous):112 call_safe(self.lifecycle_hook.on_before_start)113 if not self.start_function:114 return115 if self.start_function is _default:116 # fallback start method that simply adds the listener function to the list of proxy listeners if it exists117 if not self.listener:118 return119 from localstack.services.infra import add_service_proxy_listener120 add_service_proxy_listener(self.plugin_name, self.listener)121 return122 kwargs = {"asynchronous": asynchronous}123 if self.listener:124 kwargs["update_listener"] = self.listener125 return self.start_function(**kwargs)126 def stop(self):127 call_safe(self.lifecycle_hook.on_before_stop)128 if not self.stop_function:129 return130 return self.stop_function()131 def check(self, expect_shutdown=False, print_error=False):132 if not self.check_function:133 return134 return self.check_function(expect_shutdown=expect_shutdown, print_error=print_error)135 def name(self):136 return self.plugin_name137 def is_enabled(self):138 if self.default_active:139 return True140 return is_api_enabled(self.name())141class ServiceState(Enum):142 UNKNOWN = "unknown"143 AVAILABLE = "available"144 DISABLED = "disabled"145 STARTING = "starting"146 RUNNING = "running"147 STOPPING = "stopping"148 STOPPED = "stopped"149 ERROR = "error"150class ServiceContainer:151 """152 Holds a service, its state, and exposes lifecycle methods of the service.153 """154 service: Service155 state: ServiceState156 lock: threading.RLock157 errors: List[Exception]158 def __init__(self, service: Service, state=ServiceState.UNKNOWN):159 self.service = service160 self.state = state161 self.lock = threading.RLock()162 self.errors = list()163 def get(self) -> Service:164 return self.service165 def start(self) -> bool:166 try:167 self.state = ServiceState.STARTING168 self.service.start(asynchronous=True)169 except Exception as e:170 self.state = ServiceState.ERROR171 self.errors.append(e)172 LOG.error("error while starting service %s: %s", self.service.name(), e)173 return False174 return self.check()175 def check(self) -> bool:176 try:177 self.service.check(print_error=True)178 self.state = ServiceState.RUNNING179 return True180 except Exception as e:181 self.state = ServiceState.ERROR182 self.errors.append(e)183 LOG.error("error while checking service %s: %s", self.service.name(), e)184 return False185 def stop(self):186 try:187 self.state = ServiceState.STOPPING188 self.service.stop()189 self.state = ServiceState.STOPPED190 except Exception as e:191 self.state = ServiceState.ERROR192 self.errors.append(e)193class ServiceManager:194 def __init__(self) -> None:195 super().__init__()196 self._services = dict()197 self._mutex = threading.RLock()198 def get_service_container(self, name: str) -> Optional[ServiceContainer]:199 return self._services.get(name)200 def get_service(self, name: str) -> Optional[Service]:201 container = self.get_service_container(name)202 return container.service if container else None203 def add_service(self, service: Service) -> bool:204 state = ServiceState.AVAILABLE if service.is_enabled() else ServiceState.DISABLED205 self._services[service.name()] = ServiceContainer(service, state)206 return True207 def list_available(self) -> List[str]:208 return list(self._services.keys())209 def exists(self, name: str) -> bool:210 return name in self._services211 def is_running(self, name: str) -> bool:212 return self.get_state(name) == ServiceState.RUNNING213 def check(self, name: str) -> bool:214 if self.get_state(name) in [ServiceState.RUNNING, ServiceState.ERROR]:215 return self.get_service_container(name).check()216 def check_all(self):217 return any([self.check(service_name) for service_name in self.list_available()])218 def get_state(self, name: str) -> Optional[ServiceState]:219 container = self.get_service_container(name)220 return container.state if container else None221 def get_states(self) -> Dict[str, ServiceState]:222 return {name: self.get_state(name) for name in self.list_available()}223 @log_duration()224 def require(self, name: str) -> Service:225 """226 High level function that always returns a running service, or raises an error. If the service is in a state227 that it could be transitioned into a running state, then invoking this function will attempt that transition,228 e.g., by starting the service if it is available.229 """230 container = self.get_service_container(name)231 if not container:232 raise ValueError("no such service %s" % name)233 if container.state == ServiceState.STARTING:234 if not poll_condition(lambda: container.state != ServiceState.STARTING, timeout=30):235 raise TimeoutError("gave up waiting for service %s to start" % name)236 with container.lock:237 if container.state == ServiceState.DISABLED:238 raise ServiceDisabled("service %s is disabled" % name)239 if container.state == ServiceState.RUNNING:240 return container.service241 if container.state == ServiceState.ERROR:242 # raise any capture error243 raise container.errors[-1]244 if container.state == ServiceState.AVAILABLE:245 if container.start():246 return container.service247 else:248 raise container.errors[-1]249 raise ServiceStateException(250 "service %s is not ready (%s) and could not be started" % (name, container.state)251 )252 # legacy map compatibility253 def items(self):254 return {255 container.service.name(): container.service for container in self._services.values()256 }.items()257 def keys(self):258 return self._services.keys()259 def values(self):260 return [container.service for container in self._services.values()]261 def get(self, key):262 return self.get_service(key)263 def __iter__(self):264 return self._services265class ServicePlugin(Plugin):266 service: Service267 api: str268 @abc.abstractmethod269 def create_service(self) -> Service:270 raise NotImplementedError271 def load(self):272 self.service = self.create_service()273 return self.service274class ServicePluginAdapter(ServicePlugin):275 def __init__(276 self,277 api: str,278 create_service: Callable[[], Service],279 should_load: Callable[[], bool] = None,280 ) -> None:281 super().__init__()282 self.api = api283 self._create_service = create_service284 self._should_load = should_load285 def should_load(self) -> bool:286 if self._should_load:287 return self._should_load()288 return True289 def create_service(self) -> Service:290 return self._create_service()291def aws_provider(api: str = None, name="default", should_load: Callable[[], bool] = None):292 """293 Decorator for marking methods that create a Service instance as a ServicePlugin. Methods marked with this294 decorator are discoverable as a PluginSpec within the namespace "localstack.aws.provider", with the name295 "<api>:<name>". If api is not explicitly specified, then the method name is used as api name.296 """297 def wrapper(fn):298 # sugar for being able to name the function like the api299 _api = api or fn.__name__300 # this causes the plugin framework into pointing the entrypoint to the original function rather than the301 # nested factory function302 @functools.wraps(fn)303 def factory() -> ServicePluginAdapter:304 return ServicePluginAdapter(api=_api, should_load=should_load, create_service=fn)305 return PluginSpec(PLUGIN_NAMESPACE, f"{_api}:{name}", factory=factory)306 return wrapper307class ServicePluginErrorCollector(PluginLifecycleListener):308 """309 A PluginLifecycleListener that collects errors related to service plugins.310 """311 errors: Dict[Tuple[str, str], Exception] # keys are: (api, provider)312 def __init__(self, errors: Dict[str, Exception] = None) -> None:313 super().__init__()314 self.errors = errors or dict()315 def get_key(self, plugin_name) -> Tuple[str, str]:316 # the convention is <api>:<provider>, currently we don't really expose the provider317 # TODO: faulty plugin names would break this318 return tuple(plugin_name.split(":", maxsplit=1))319 def on_resolve_exception(self, namespace: str, entrypoint, exception: Exception):320 self.errors[self.get_key(entrypoint.name)] = exception321 def on_init_exception(self, plugin_spec: PluginSpec, exception: Exception):322 self.errors[self.get_key(plugin_spec.name)] = exception323 def on_load_exception(self, plugin_spec: PluginSpec, plugin: Plugin, exception: Exception):324 self.errors[self.get_key(plugin_spec.name)] = exception325 def has_errors(self, api: str, provider: str = None) -> bool:326 for e_api, e_provider in self.errors.keys():327 if api == e_api:328 if not provider:329 return True330 else:331 return e_provider == provider332 return False333class ServicePluginManager(ServiceManager):334 plugin_manager: PluginManager[ServicePlugin]335 plugin_errors: ServicePluginErrorCollector336 def __init__(337 self,338 plugin_manager: PluginManager[ServicePlugin] = None,339 provider_config: ServiceProviderConfig = None,340 ) -> None:341 super().__init__()342 self.plugin_errors = ServicePluginErrorCollector()343 self.plugin_manager = plugin_manager or PluginManager(344 PLUGIN_NAMESPACE, listener=self.plugin_errors345 )346 self._api_provider_specs = None347 self.provider_config = provider_config or config.SERVICE_PROVIDER_CONFIG348 def get_active_provider(self, service: str) -> str:349 return self.provider_config.get_provider(service)350 # TODO make the abstraction clearer, to provide better information if service is available versus discoverable351 # especially important when considering pro services352 def list_available(self) -> List[str]:353 """354 List all available services, which have an available, configured provider355 :return: List of service names356 """357 return [358 service359 for service, providers in self.api_provider_specs.items()360 if self.get_active_provider(service) in providers361 ]362 def exists(self, name: str) -> bool:363 return name in self.list_available()364 def get_state(self, name: str) -> Optional[ServiceState]:365 if name in self._services:366 # ServiceContainer exists, which means the plugin has been loaded367 return super().get_state(name)368 if not self.exists(name):369 # there's definitely no service with this name370 return None371 # if a PluginSpec exists, then we can get the container and check whether there was an error loading the plugin372 provider = self.get_active_provider(name)373 if self.plugin_errors.has_errors(name, provider):374 return ServiceState.ERROR375 return ServiceState.AVAILABLE376 def get_service_container(self, name: str) -> Optional[ServiceContainer]:377 container = super().get_service_container(name)378 if container:379 return container380 if not self.exists(name):381 return None382 # this is where we start lazy loading. we now know the PluginSpec for the API exists,383 # but the ServiceContainer has not been created384 plugin = self._load_service_plugin(name)385 if not plugin or not plugin.service:386 return None387 with self._mutex:388 if plugin.service not in self._services:389 super().add_service(plugin.service)390 return super().get_service_container(name)391 @property392 def api_provider_specs(self) -> Dict[str, List[str]]:393 """394 Returns all provider names within the service plugin namespace and parses their name according to the convention,395 that is "<api>:<provider>". The result is a dictionary that maps api => List[str (name of a provider)].396 """397 if self._api_provider_specs is not None:398 return self._api_provider_specs399 with self._mutex:400 if self._api_provider_specs is None:401 self._api_provider_specs = self._resolve_api_provider_specs()402 return self._api_provider_specs403 @log_duration()404 def _load_service_plugin(self, name: str) -> Optional[ServicePlugin]:405 providers = self.api_provider_specs.get(name)406 if not providers:407 # no providers for this api408 return None409 preferred_provider = self.get_active_provider(name)410 if preferred_provider in providers:411 provider = preferred_provider412 else:413 LOG.warning(414 "Configured provider (%s) does not exist for service (%s). Available options are: %s",415 preferred_provider,416 name,417 providers,418 )419 return None420 plugin_name = f"{name}:{provider}"421 plugin = self.plugin_manager.load(plugin_name)422 plugin.name = plugin_name423 return plugin424 @log_duration()425 def _resolve_api_provider_specs(self) -> Dict[str, List[str]]:426 result = defaultdict(list)427 for spec in self.plugin_manager.list_plugin_specs():428 api, provider = spec.name.split(429 ":"430 ) # TODO: error handling, faulty plugins could break the runtime431 result[api].append(provider)432 return result433 def apis_with_provider(self, provider: str) -> List[str]:434 """435 Lists all apis where a given provider exists for.436 :param provider: Name of the provider437 :return: List of apis the given provider provides438 """439 apis = list()440 for api, providers in self.api_provider_specs.items():441 if provider in providers:442 apis.append(api)443 return apis444 def stop_services(self, services: List[str] = None):445 """446 Stops services for this service manager, if they are currently active.447 Will not stop services not already started or in and error state.448 :param services: Service names to stop. If not provided, all services for this manager will be stopped.449 """450 for service_name in services:451 if self.get_state(service_name) in [ServiceState.STARTING, ServiceState.RUNNING]:452 service_container = self.get_service_container(service_name)453 service_container.stop()454 def stop_all_services(self) -> None:455 """456 Stops all services for this service manager, if they are currently active.457 Will not stop services not already started or in and error state.458 """459 services = self.list_available()460 self.stop_services(services)461# map of service plugins, mapping from service name to plugin details462SERVICE_PLUGINS: ServicePluginManager = ServicePluginManager()463# -----------------------------464# INFRASTRUCTURE HEALTH CHECKS465# -----------------------------466def wait_for_infra_shutdown():467 apis = get_enabled_apis()468 names = [name for name, plugin in SERVICE_PLUGINS.items() if name in apis]469 def check(name):470 check_service_health(api=name, expect_shutdown=True)471 LOG.debug("[shutdown] api %s has shut down", name)472 # no special significance to 10 workers, seems like a reasonable number given the number of services we have473 with ThreadPoolExecutor(max_workers=10) as executor:474 executor.map(check, names)475def check_service_health(api, expect_shutdown=False):476 status = SERVICE_PLUGINS.check(api)477 if status == expect_shutdown:478 if not expect_shutdown:479 LOG.warning('Service "%s" not yet available, retrying...', api)480 else:481 LOG.warning('Service "%s" still shutting down, retrying...', api)482 raise Exception("Service check failed for api: %s" % api)483def local_api_checker(service: str) -> Callable:484 """485 Creates a health check method for the given service that works under the assumption that the real backend service486 ports are locatable through the PROXY_LISTENER global.487 """488 from localstack.services.infra import PROXY_LISTENERS489 if config.EAGER_SERVICE_LOADING:490 # most services don't have a real health check, and if they would, that would dramatically increase the491 # startup time, since health checks are done sequentially at startup. however, the health checks are needed492 # for the lazy-loading cold start.493 return lambda *args, **kwargs: None494 def _check(expect_shutdown=False, print_error=False):495 try:496 if service not in PROXY_LISTENERS:497 LOG.debug("cannot find backend port for service %s", service)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!