Best Python code snippet using localstack_python
install.py
Source:install.py
...24# set up logger25LOGGER = logging.getLogger(__name__)26def install_elasticsearch():27 if not os.path.exists(INSTALL_DIR_ES):28 log_install_msg('Elasticsearch')29 mkdir(INSTALL_DIR_INFRA)30 # download and extract archive31 tmp_archive = os.path.join(tempfile.gettempdir(), 'localstack.es.zip')32 download_and_extract_with_retry(ELASTICSEARCH_JAR_URL, tmp_archive, INSTALL_DIR_INFRA)33 elasticsearch_dir = glob.glob(os.path.join(INSTALL_DIR_INFRA, 'elasticsearch*'))34 if not elasticsearch_dir:35 raise Exception('Unable to find Elasticsearch folder in %s' % INSTALL_DIR_INFRA)36 shutil.move(elasticsearch_dir[0], INSTALL_DIR_ES)37 for dir_name in ('data', 'logs', 'modules', 'plugins', 'config/scripts'):38 dir_path = '%s/%s' % (INSTALL_DIR_ES, dir_name)39 mkdir(dir_path)40 chmod_r(dir_path, 0o777)41 # install default plugins42 for plugin in ELASTICSEARCH_PLUGIN_LIST:43 if is_alpine():44 # https://github.com/pires/docker-elasticsearch/issues/5645 os.environ['ES_TMPDIR'] = '/tmp'46 plugin_binary = os.path.join(INSTALL_DIR_ES, 'bin', 'elasticsearch-plugin')47 run('%s install %s' % (plugin_binary, plugin))48def install_elasticmq():49 if not os.path.exists(INSTALL_DIR_ELASTICMQ):50 log_install_msg('ElasticMQ')51 mkdir(INSTALL_DIR_ELASTICMQ)52 # download archive53 tmp_archive = os.path.join(tempfile.gettempdir(), 'elasticmq-server.jar')54 if not os.path.exists(tmp_archive):55 download(ELASTICMQ_JAR_URL, tmp_archive)56 shutil.copy(tmp_archive, INSTALL_DIR_ELASTICMQ)57def install_kinesalite():58 target_dir = '%s/kinesalite' % INSTALL_DIR_NPM59 if not os.path.exists(target_dir):60 log_install_msg('Kinesis')61 run('cd "%s" && npm install' % ROOT_PATH)62def install_stepfunctions_local():63 if not os.path.exists(INSTALL_DIR_STEPFUNCTIONS):64 log_install_msg('Step Functions')65 tmp_archive = os.path.join(tempfile.gettempdir(), 'stepfunctions.zip')66 download_and_extract_with_retry(67 STEPFUNCTIONS_ZIP_URL, tmp_archive, INSTALL_DIR_STEPFUNCTIONS)68def install_dynamodb_local():69 if not os.path.exists(INSTALL_DIR_DDB):70 log_install_msg('DynamoDB')71 # download and extract archive72 tmp_archive = os.path.join(tempfile.gettempdir(), 'localstack.ddb.zip')73 download_and_extract_with_retry(DYNAMODB_JAR_URL, tmp_archive, INSTALL_DIR_DDB)74 # fix for Alpine, otherwise DynamoDBLocal fails with:75 # DynamoDBLocal_lib/libsqlite4java-linux-amd64.so: __memcpy_chk: symbol not found76 if is_alpine():77 ddb_libs_dir = '%s/DynamoDBLocal_lib' % INSTALL_DIR_DDB78 patched_marker = '%s/alpine_fix_applied' % ddb_libs_dir79 if not os.path.exists(patched_marker):80 patched_lib = ('https://rawgit.com/bhuisgen/docker-alpine/master/alpine-dynamodb/' +81 'rootfs/usr/local/dynamodb/DynamoDBLocal_lib/libsqlite4java-linux-amd64.so')82 patched_jar = ('https://rawgit.com/bhuisgen/docker-alpine/master/alpine-dynamodb/' +83 'rootfs/usr/local/dynamodb/DynamoDBLocal_lib/sqlite4java.jar')84 run("curl -L -o %s/libsqlite4java-linux-amd64.so '%s'" % (ddb_libs_dir, patched_lib))85 run("curl -L -o %s/sqlite4java.jar '%s'" % (ddb_libs_dir, patched_jar))86 save_file(patched_marker, '')87 # fix logging configuration for DynamoDBLocal88 log4j2_config = """<Configuration status="WARN">89 <Appenders>90 <Console name="Console" target="SYSTEM_OUT">91 <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>92 </Console>93 </Appenders>94 <Loggers>95 <Root level="WARN"><AppenderRef ref="Console"/></Root>96 </Loggers>97 </Configuration>"""98 log4j2_file = os.path.join(INSTALL_DIR_DDB, 'log4j2.xml')99 save_file(log4j2_file, log4j2_config)100 run('cd "%s" && zip -u DynamoDBLocal.jar log4j2.xml || true' % INSTALL_DIR_DDB)101def install_amazon_kinesis_client_libs():102 # install KCL/STS JAR files103 if not os.path.exists(INSTALL_DIR_KCL):104 mkdir(INSTALL_DIR_KCL)105 tmp_archive = os.path.join(tempfile.gettempdir(), 'aws-java-sdk-sts.jar')106 if not os.path.exists(tmp_archive):107 download(STS_JAR_URL, tmp_archive)108 shutil.copy(tmp_archive, INSTALL_DIR_KCL)109 # Compile Java files110 from localstack.utils.kinesis import kclipy_helper111 classpath = kclipy_helper.get_kcl_classpath()112 java_files = '%s/utils/kinesis/java/com/atlassian/*.java' % ROOT_PATH113 class_files = '%s/utils/kinesis/java/com/atlassian/*.class' % ROOT_PATH114 if not glob.glob(class_files):115 run('javac -cp "%s" %s' % (classpath, java_files))116def install_lambda_java_libs():117 # install LocalStack "fat" JAR file (contains all dependencies)118 if not os.path.exists(INSTALL_PATH_LOCALSTACK_FAT_JAR):119 log_install_msg('LocalStack Java libraries', verbatim=True)120 download(URL_LOCALSTACK_FAT_JAR, INSTALL_PATH_LOCALSTACK_FAT_JAR)121def install_component(name):122 installers = {123 'kinesis': install_kinesalite,124 'dynamodb': install_dynamodb_local,125 'es': install_elasticsearch,126 'sqs': install_elasticmq,127 'stepfunctions': install_stepfunctions_local128 }129 installer = installers.get(name)130 if installer:131 installer()132def install_components(names):133 parallelize(install_component, names)134 install_lambda_java_libs()135def install_all_components():136 install_components(DEFAULT_SERVICE_PORTS.keys())137# -----------------138# HELPER FUNCTIONS139# -----------------140def log_install_msg(component, verbatim=False):141 component = component if verbatim else 'local %s server' % component142 LOGGER.info('Downloading and installing %s. This may take some time.' % component)143def is_alpine():144 try:145 run('cat /etc/issue | grep Alpine', print_error=False)146 return True147 except Exception:148 return False149def download_and_extract_with_retry(archive_url, tmp_archive, target_dir):150 mkdir(target_dir)151 def download_and_extract():152 if not os.path.exists(tmp_archive):153 download(archive_url, tmp_archive)154 unzip(tmp_archive, target_dir)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!