Best Python code snippet using autotest_python
em_migration_controller.py
Source:em_migration_controller.py
1import em_path_inject # noqa2import os3import sys4import traceback5from future.moves.urllib.parse import quote6import tarfile7import time8import em_common9import em_constants10import em_utils11from em_migration import migration_handlers_dict, BaseMigrationHandler12from em_migration.em_model_migration_metadata import MigrationMetadata13from em_migration.process_control import (14 NonMigrationDataInputsController,15 SavedsearchController,16 ProcessControlInternalException17)18from modinput_wrapper.base_modularinput import BaseModularInput19from rest_handler.session import session20from service_manager.splunkd.conf import ConfManager21from splunk import getDefault22from splunk.clilib.bundle_paths import make_splunkhome_path23from splunklib.client import Service24from logging_utils import log25APP_CONF_FILE = 'app'26BACKUP_DIR_NAME = 'migration_backup'27class KnownMigrationException(Exception):28 def __init__(self, msg):29 super(KnownMigrationException, self).__init__(msg)30class EMMigrationController(BaseModularInput):31 app = em_constants.APP_NAME32 name = 'sai_migration_controller'33 title = 'Splunk App for Infrastructure - Version Migration Controller'34 description = 'Modular input to handle SAI migrations from the previous ' + \35 'version to the current version'36 use_external_validation = False37 use_kvstore_checkpointer = False38 use_hec_event_writer = False39 use_single_instance = True40 def __init__(self):41 super(EMMigrationController, self).__init__()42 def do_additional_setup(self):43 log_level = self.inputs.get('job').get('log_level', 'INFO')44 self.logger = log.getLogger(logger_name=self.name,45 log_level=log_level)46 self.check_kvstore_readiness()47 self.session_key = session['authtoken']48 self.dry_run = self.inputs.get('job').get('dry_run', '').lower() in ('1', 'true')49 server_uri = em_common.get_server_uri()50 self.splunkd_service = Service(51 port=getDefault('port'),52 token=self.session_key,53 owner='nobody',54 app=em_constants.APP_NAME,55 )56 self.app_conf_manager = ConfManager(57 conf_file=APP_CONF_FILE,58 server_uri=server_uri,59 session_key=self.session_key,60 app=em_constants.APP_NAME61 )62 self.migration_metadata = MigrationMetadata.get()63 self.current_version = self.migration_metadata.latest_migrated_version64 self.new_version = self.app_conf_manager.get_stanza('launcher')['entry'][0]['content']['version']65 self.data_inputs_controller = NonMigrationDataInputsController()66 self.savedsearch_controller = SavedsearchController()67 @property68 def current_version(self):69 return self._current_version70 @current_version.setter71 def current_version(self, version):72 self.migration_metadata.latest_migrated_version = version73 self.migration_metadata.save()74 self._current_version = version75 def do_execute(self):76 migration_started = False77 try:78 # Only run on the search head captain in a search head cluster79 if not em_common.modular_input_should_run(self.session_key):80 self.logger.info('SAI migration skipped on non-search head captain in search head cluster')81 return82 # initialize latest migrated version83 if self.current_version == '?.0.0' and self.is_fresh_installation():84 self.current_version = self.new_version85 if self.current_version != self.new_version:86 migration_started = True87 self.set_migration_running_status(running=True)88 self.backup_app()89 self.disable_non_migration_processes()90 for version in migration_handlers_dict:91 if self.current_version == '?.0.0' or em_utils.is_lower_version(self.current_version, version):92 self.execute_migration(version)93 self.current_version = version94 # ensure that the latest migrated version is the latest app version even95 # when there's no migration needed for new version96 self.current_version = self.new_version97 self.post_migration_success_message()98 except KnownMigrationException as e:99 self.logger.error(e)100 self.post_migration_failure_message(e)101 # Unknown migration errors102 except Exception as e:103 self.logger.error('Failed to migrate to version %s - Error: %s' % (self.new_version, e))104 self.logger.debug(traceback.format_exc())105 query = quote('index=_internal source="*%s.log*"' % self.name)106 search_link = 'Check [[/app/splunk_app_infrastructure/search?q=%s|SAI migration logs]] for details' % query107 self.post_migration_failure_message(search_link)108 finally:109 if migration_started:110 self.set_migration_running_status(running=False)111 self.enable_non_migration_processes()112 def check_kvstore_readiness(self):113 try:114 em_common.check_kvstore_readiness(session['authtoken'])115 except em_common.KVStoreNotReadyException as e:116 self.logger.error('Migration failed because KVStore is not ready - Error: %s' % e)117 query = quote('index=_internal source="*%s.log*"' % self.name)118 search_link = 'Check [[/app/splunk_app_infrastructure/search?q=%s|SAI migration logs]] for details' % query119 fail_message = 'SAI migration failed because KVStore is not ready. %s' % search_link120 self.post_message('error', fail_message)121 sys.exit(1)122 def is_fresh_installation(self):123 entity_store = self.splunkd_service.kvstore['em_entities'].data124 group_store = self.splunkd_service.kvstore[em_constants.STORE_GROUPS].data125 entities = entity_store.query(limit=1)126 groups = group_store.query(limit=1)127 de_entity_store = self.splunkd_service.kvstore[em_constants.STORE_ENTITY_CACHE].data128 de_entities = de_entity_store.query(limit=1)129 # if there's no entities and no groups, there should be no alerts130 return len(entities) == 0 and len(groups) == 0 and len(de_entities) == 0131 def set_migration_running_status(self, running):132 """133 Update migration status that indicates if migration is ongoing134 :param running - boolean indicating if migration is running135 :type boolean136 """137 if not isinstance(running, bool):138 raise KnownMigrationException('Invalid migration status set')139 self.migration_metadata.is_running = int(running)140 self.migration_metadata.save()141 def backup_app(self):142 """143 Create a backup tarball of the app's local configurations144 """145 self.logger.info('Start creating backup of existing app installation...')146 sai_app_path = make_splunkhome_path(['etc', 'apps', em_constants.APP_NAME])147 backup_folder = os.path.join(sai_app_path, BACKUP_DIR_NAME)148 backup_file_path = os.path.join(backup_folder, 'sai_app_backup_%d.tgz' % time.time())149 if not os.path.exists(backup_folder):150 os.makedirs(backup_folder)151 with tarfile.open(backup_file_path, 'w:gz') as tar:152 local_conf_dir = os.path.join(sai_app_path, 'local')153 local_meta = os.path.join(sai_app_path, 'metadata', 'local.meta')154 tar.add(local_conf_dir, recursive=True, arcname=os.path.join(em_constants.APP_NAME, 'local'))155 tar.add(local_meta, recursive=True, arcname=os.path.join(em_constants.APP_NAME, 'metadata', 'local.meta'))156 self.post_message(157 'info',158 'SAI is migrating to version %s. Saved backup of current version at %s.' % (159 self.new_version,160 os.path.join('$SPLUNK_HOME', 'etc', 'apps', em_constants.APP_NAME, BACKUP_DIR_NAME)161 )162 )163 self.logger.info('Created backup file of existing app installation at %s.' % backup_file_path)164 def disable_non_migration_processes(self):165 self.data_inputs_controller.disable()166 self.savedsearch_controller.disable()167 def enable_non_migration_processes(self):168 try:169 self.data_inputs_controller.enable()170 self.savedsearch_controller.enable()171 except ProcessControlInternalException as e:172 self.logger.error('Failed to re-enable non-migration processes - Error: %s' % e)173 self.post_message(174 'warn',175 'Failed to enable processes that were disabled during migration. Please re-enable them manually.'176 )177 def execute_migration(self, migrate_to_version):178 """179 Executes migration handlers for the input target version180 :param migrate_to_version - target version to migrate to181 :type string182 """183 migration_dict = migration_handlers_dict.get(migrate_to_version, {})184 # default minimal version to 1.0.0, versions before that are not supported185 minimal_version = migration_dict.get('minimal_version', '1.0.0')186 migration_handlers = migration_dict.get('handlers', [])187 if not self.should_execute(migrate_to_version, minimal_version, migration_handlers):188 self.logger.info('No migration needs to be applied from version %s to %s, skipping...' % (189 self.current_version,190 migrate_to_version191 ))192 return193 self.logger.info('Starting SAI migration from version %s to %s...' % (self.current_version, migrate_to_version))194 for handler_cls in migration_handlers:195 migration_handler_obj = handler_cls(self.logger, self.session_key)196 migration_handler_obj.execute(dry_run=self.dry_run)197 if self.dry_run:198 self.logger.info('SAI tested migration from version %s to %s. No changes have been applied.' % (199 self.current_version,200 migrate_to_version201 ))202 else:203 self.logger.info('SAI migration from version %s to %s succeeded' % (204 self.current_version,205 migrate_to_version206 ))207 def should_execute(self, migrate_to_version, minimal_version, migration_handlers):208 # Check if there was an error in loading versions209 if not self.current_version:210 raise KnownMigrationException('Migration unable to identify app version')211 # check if version satisfies minimal version212 if self.current_version != '?.0.0' and em_utils.is_lower_version(self.current_version, minimal_version):213 raise KnownMigrationException('Migration cannot migrate from this base version')214 if len(migration_handlers) == 0:215 return False216 # Check that each migration in the migration handler is supported by this version217 for handler_cls in migration_handlers:218 if not issubclass(handler_cls, BaseMigrationHandler):219 raise KnownMigrationException('Migration handler of invalid type found')220 return self.current_version != migrate_to_version221 def post_migration_success_message(self):222 success_msg = 'SAI successfully migrated to version %s' % self.new_version223 self.post_message('info', success_msg)224 def post_migration_failure_message(self, complementary_message):225 backup_dir_path = os.path.join('$SPLUNK_HOME', 'etc', 'apps', em_constants.APP_NAME, BACKUP_DIR_NAME)226 fail_message = 'SAI failed to migrate to version %s. %s. Saved backup of old version at %s.' % (227 self.new_version,228 complementary_message,229 backup_dir_path230 )231 self.post_message('error', fail_message)232 def post_message(self, severity, message):233 self.splunkd_service.messages.create(234 '[SAI migration]',235 severity=severity,236 value=message237 )238if __name__ == '__main__':239 instance = EMMigrationController()...
compat.py
Source:compat.py
1"""Provides a simple migration layer for pipeline jsons.2Given a pipeline version, a mapping will be used to establish if the3pipeline should be migrated, that is, modified in place, until it is4brought to the latest version used in Orchest.5If you are to implement a new migration add another _migrate_...6function following the given convention: 1.0.0 -> _migrate_1_0_0. Then7add a mapping to the "_version_to_migration_function" dictionary to8map the current version to the migration function and the version to9which it will migrate.10Note that the system only supports forward migrations.11"""12from _orchest.internals import utils as _utils13def _migrate_1_0_0(pipeline: dict) -> None:14 """Pre-k8s migrations"""15 # Take care of old pipelines with no defined params.16 if "parameters" not in pipeline:17 pipeline["parameters"] = {}18 # Pipelines had no constraints related to env var names.19 services = pipeline.get("services", {})20 for service in services.values():21 env_variables = service.get("env_variables", {})22 if not _utils.are_environment_variables_valid(env_variables):23 tmp = {}24 for key, value in env_variables.items():25 valid_key = _utils.make_env_var_name_valid(key)26 # Do not lose variables to collisions.27 i = 228 while valid_key in tmp:29 valid_key = f"{valid_key}_{i}"30 i += 131 tmp[valid_key] = value32 service["env_variables"] = tmp33 env_vars_inherit = service.get("env_variables_inherit", [])34 tmp = set()35 for var in env_vars_inherit:36 valid_var = _utils.make_env_var_name_valid(var)37 i = 238 while valid_var in tmp:39 valid_var = f"{valid_var}_{i}"40 i += 141 tmp.add(valid_var)42 service["env_variables_inherit"] = list(tmp)43 for step in pipeline["steps"].values():44 if (45 "kernel" in step46 and "name" in step["kernel"]47 and step["kernel"]["name"] == "ir"48 ):49 step["kernel"]["name"] = "r"50def _migrate_1_1_0(pipeline: dict) -> None:51 """Migrate from pre-k8s to k8s."""52 services = pipeline.get("services", {})53 for service in services.values():54 # Can't use a get() with a default value because it would imply55 # a different thing. Migrate from pre-k8s to k8s.56 if "entrypoint" in service:57 if "command" in service:58 service["args"] = service["command"]59 del service["command"]60 service["command"] = service["entrypoint"]61 del service["entrypoint"]62 if not service.get("ports", []):63 service["ports"] = [8080]64 service["exposed"] = service.get("exposed", False)65 service["requires_authentication"] = service.get(66 "requires_authentication", True67 )68# version: (migration function, version to which it's migrated)69_version_to_migration_function = {70 "1.0.0": (_migrate_1_0_0, "1.1.0"),71 "1.1.0": (_migrate_1_1_0, "1.2.0"),72}73# Make sure no forward version is repeated.74__to_versions = set([item[1] for item in _version_to_migration_function.items()])75assert len(_version_to_migration_function) == len(__to_versions)76# Make sure no migration function is repeated.77__migration_functions = set(78 [item[0] for item in _version_to_migration_function.items()]79)80assert len(_version_to_migration_function) == len(__migration_functions)81def migrate_pipeline(pipeline: dict):82 """Migrates a pipeline in place to the latest version."""83 if not pipeline.get("version", ""):84 pipeline["version"] = "1.0.0"85 while pipeline["version"] in _version_to_migration_function:86 migration_func, migrate_to_version = _version_to_migration_function[87 pipeline["version"]88 ]89 migration_func(pipeline)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!