How to use config_file method in molecule

Best Python code snippet using molecule_python

base_tests.py

Source:base_tests.py Github

copy

Full Screen

1import os2import tempfile3import unittest4from tempfile import TemporaryDirectory5from mkdocs import exceptions6from mkdocs.config import base, defaults7from mkdocs.config.config_options import BaseConfigOption8class ConfigBaseTests(unittest.TestCase):9 def test_unrecognised_keys(self):10 c = base.Config(schema=defaults.DEFAULT_SCHEMA)11 c.load_dict({12 'not_a_valid_config_option': "test"13 })14 failed, warnings = c.validate()15 self.assertEqual(warnings, [16 ('not_a_valid_config_option',17 'Unrecognised configuration name: not_a_valid_config_option')18 ])19 def test_missing_required(self):20 c = base.Config(schema=defaults.DEFAULT_SCHEMA)21 errors, warnings = c.validate()22 self.assertEqual(len(errors), 1)23 self.assertEqual(errors[0][0], 'site_name')24 self.assertEqual(str(errors[0][1]), 'Required configuration not provided.')25 self.assertEqual(len(warnings), 0)26 def test_load_from_file(self):27 """28 Users can explicitly set the config file using the '--config' option.29 Allows users to specify a config other than the default `mkdocs.yml`.30 """31 temp_dir = TemporaryDirectory()32 config_file = open(os.path.join(temp_dir.name, 'mkdocs.yml'), 'w')33 os.mkdir(os.path.join(temp_dir.name, 'docs'))34 try:35 config_file.write("site_name: MkDocs Test\n")36 config_file.flush()37 config_file.close()38 cfg = base.load_config(config_file=config_file.name)39 self.assertTrue(isinstance(cfg, base.Config))40 self.assertEqual(cfg['site_name'], 'MkDocs Test')41 finally:42 os.remove(config_file.name)43 temp_dir.cleanup()44 def test_load_from_missing_file(self):45 self.assertRaises(exceptions.ConfigurationError,46 base.load_config, config_file='missing_file.yml')47 def test_load_from_open_file(self):48 """49 `load_config` can accept an open file descriptor.50 """51 temp_dir = TemporaryDirectory()52 temp_path = temp_dir.name53 config_fname = os.path.join(temp_path, 'mkdocs.yml')54 config_file = open(config_fname, 'w+')55 os.mkdir(os.path.join(temp_path, 'docs'))56 try:57 config_file.write("site_name: MkDocs Test\n")58 config_file.flush()59 cfg = base.load_config(config_file=config_file)60 self.assertTrue(isinstance(cfg, base.Config))61 self.assertEqual(cfg['site_name'], 'MkDocs Test')62 # load_config will always close the file63 self.assertTrue(config_file.closed)64 finally:65 temp_dir.cleanup()66 def test_load_from_closed_file(self):67 """68 The `serve` command with auto-reload may pass in a closed file descriptor.69 Ensure `load_config` reloads the closed file.70 """71 temp_dir = TemporaryDirectory()72 config_file = open(os.path.join(temp_dir.name, 'mkdocs.yml'), 'w')73 os.mkdir(os.path.join(temp_dir.name, 'docs'))74 try:75 config_file.write("site_name: MkDocs Test\n")76 config_file.flush()77 config_file.close()78 cfg = base.load_config(config_file=config_file)79 self.assertTrue(isinstance(cfg, base.Config))80 self.assertEqual(cfg['site_name'], 'MkDocs Test')81 finally:82 temp_dir.cleanup()83 def test_load_from_deleted_file(self):84 """85 Deleting the config file could trigger a server reload.86 """87 config_file = tempfile.NamedTemporaryFile('w', delete=False)88 try:89 config_file.write("site_name: MkDocs Test\n")90 config_file.flush()91 config_file.close()92 finally:93 os.remove(config_file.name)94 self.assertRaises(exceptions.ConfigurationError,95 base.load_config, config_file=config_file)96 def test_load_missing_required(self):97 """98 `site_name` is a required setting.99 """100 config_file = tempfile.NamedTemporaryFile('w', delete=False)101 try:102 config_file.write(103 "site_dir: output\nsite_uri: https://www.mkdocs.org\n")104 config_file.flush()105 config_file.close()106 self.assertRaises(exceptions.ConfigurationError,107 base.load_config, config_file=config_file.name)108 finally:109 os.remove(config_file.name)110 def test_pre_validation_error(self):111 class InvalidConfigOption(BaseConfigOption):112 def pre_validation(self, config, key_name):113 raise base.ValidationError('pre_validation error')114 c = base.Config(schema=(('invalid_option', InvalidConfigOption()), ))115 errors, warnings = c.validate()116 self.assertEqual(len(errors), 1)117 self.assertEqual(errors[0][0], 'invalid_option')118 self.assertEqual(str(errors[0][1]), 'pre_validation error')119 self.assertTrue(isinstance(errors[0][1], base.ValidationError))120 self.assertEqual(len(warnings), 0)121 def test_run_validation_error(self):122 class InvalidConfigOption(BaseConfigOption):123 def run_validation(self, value):124 raise base.ValidationError('run_validation error')125 c = base.Config(schema=(('invalid_option', InvalidConfigOption()), ))126 errors, warnings = c.validate()127 self.assertEqual(len(errors), 1)128 self.assertEqual(errors[0][0], 'invalid_option')129 self.assertEqual(str(errors[0][1]), 'run_validation error')130 self.assertTrue(isinstance(errors[0][1], base.ValidationError))131 self.assertEqual(len(warnings), 0)132 def test_post_validation_error(self):133 class InvalidConfigOption(BaseConfigOption):134 def post_validation(self, config, key_name):135 raise base.ValidationError('post_validation error')136 c = base.Config(schema=(('invalid_option', InvalidConfigOption()), ))137 errors, warnings = c.validate()138 self.assertEqual(len(errors), 1)139 self.assertEqual(errors[0][0], 'invalid_option')140 self.assertEqual(str(errors[0][1]), 'post_validation error')141 self.assertTrue(isinstance(errors[0][1], base.ValidationError))142 self.assertEqual(len(warnings), 0)143 def test_pre_and_run_validation_errors(self):144 """ A pre_validation error does not stop run_validation from running. """145 class InvalidConfigOption(BaseConfigOption):146 def pre_validation(self, config, key_name):147 raise base.ValidationError('pre_validation error')148 def run_validation(self, value):149 raise base.ValidationError('run_validation error')150 c = base.Config(schema=(('invalid_option', InvalidConfigOption()), ))151 errors, warnings = c.validate()152 self.assertEqual(len(errors), 2)153 self.assertEqual(errors[0][0], 'invalid_option')154 self.assertEqual(str(errors[0][1]), 'pre_validation error')155 self.assertTrue(isinstance(errors[0][1], base.ValidationError))156 self.assertEqual(errors[1][0], 'invalid_option')157 self.assertEqual(str(errors[1][1]), 'run_validation error')158 self.assertTrue(isinstance(errors[1][1], base.ValidationError))159 self.assertEqual(len(warnings), 0)160 def test_run_and_post_validation_errors(self):161 """ A run_validation error stops post_validation from running. """162 class InvalidConfigOption(BaseConfigOption):163 def run_validation(self, value):164 raise base.ValidationError('run_validation error')165 def post_validation(self, config, key_name):166 raise base.ValidationError('post_validation error')167 c = base.Config(schema=(('invalid_option', InvalidConfigOption()), ))168 errors, warnings = c.validate()169 self.assertEqual(len(errors), 1)170 self.assertEqual(errors[0][0], 'invalid_option')171 self.assertEqual(str(errors[0][1]), 'run_validation error')172 self.assertTrue(isinstance(errors[0][1], base.ValidationError))173 self.assertEqual(len(warnings), 0)174 def test_validation_warnings(self):175 class InvalidConfigOption(BaseConfigOption):176 def pre_validation(self, config, key_name):177 self.warnings.append('pre_validation warning')178 def run_validation(self, value):179 self.warnings.append('run_validation warning')180 def post_validation(self, config, key_name):181 self.warnings.append('post_validation warning')182 c = base.Config(schema=(('invalid_option', InvalidConfigOption()), ))183 errors, warnings = c.validate()184 self.assertEqual(len(errors), 0)185 self.assertEqual(warnings, [186 ('invalid_option', 'pre_validation warning'),187 ('invalid_option', 'run_validation warning'),188 ('invalid_option', 'post_validation warning'),189 ])190 def test_load_from_file_with_relative_paths(self):191 """192 When explicitly setting a config file, paths should be relative to the193 config file, not the working directory.194 """195 config_dir = TemporaryDirectory()196 config_fname = os.path.join(config_dir.name, 'mkdocs.yml')197 docs_dir = os.path.join(config_dir.name, 'src')198 os.mkdir(docs_dir)199 config_file = open(config_fname, 'w')200 try:201 config_file.write("docs_dir: src\nsite_name: MkDocs Test\n")202 config_file.flush()203 config_file.close()204 cfg = base.load_config(config_file=config_file)205 self.assertTrue(isinstance(cfg, base.Config))206 self.assertEqual(cfg['site_name'], 'MkDocs Test')207 self.assertEqual(cfg['docs_dir'], docs_dir)208 self.assertEqual(cfg.config_file_path, config_fname)209 self.assertIsInstance(cfg.config_file_path, str)210 finally:...

Full Screen

Full Screen

ConfigFileHandler.py

Source:ConfigFileHandler.py Github

copy

Full Screen

1# -*- coding: UTF-8 -*-2import os3import arrow4from flask import request, current_app5from flask_restful import Resource6from werkzeug.exceptions import BadRequest7from SysManager.configs import SSHConfig8from SysManager.executor import Executor9from app import db10from app.models import ConfigFile, ConfigType, TradeSystem11from restful.errors import (DataNotJsonError,12 DataNotNullError,13 DataEnumValueError,14 ApiError)15from restful.protocol import RestProtocol16class ConfigFileListApi(Resource):17 def __init__(self):18 super(ConfigFileListApi, self).__init__()19 self.not_null_list = ['name', 'dir', 'file', 'config_type', 'sys_id']20 self.system_list = []21 def gather_systems(self, parent_sys):22 self.system_list.append(parent_sys)23 for child_sys in parent_sys.child_systems:24 if child_sys.disabled is False:25 self.gather_systems(child_sys)26 def get(self, **kwargs):27 system = TradeSystem.find(**kwargs)28 if system:29 self.gather_systems(system)30 ''' for sys in self.system_list:31 # config_files = ConfigFile.query.filter_by(sys_id=sys.id).all()32 self.config_file_list.extend(sys.config_files) '''33 conf_file_list = ConfigFile.query.filter(34 ConfigFile.sys_id.in_([sys.id for sys in self.system_list])35 ).all()36 return RestProtocol(conf_file_list)37 else:38 return RestProtocol(message='System not found', error_code=-1), 40439 def post(self):40 try:41 data = request.get_json(force=True)42 for param in self.not_null_list:43 if not data.get(param):44 raise DataNotNullError('Please input {}'.format(param))45 ConfigType[data.get('config_type')]46 except BadRequest:47 return RestProtocol(DataNotJsonError())48 except ApiError as e:49 return RestProtocol(e)50 except KeyError:51 return RestProtocol(DataEnumValueError())52 else:53 config_file = ConfigFile()54 config_file.name = data.get('name')55 config_file.sys_id = data.get('sys_id')56 config_file.config_type = ConfigType[data.get('config_type')]57 config_file.dir = data.get('dir')58 config_file.file = data.get('file')59 system = TradeSystem.find(id=data.get('sys_id'))60 if system:61 mod = {62 'name': 'md5',63 'args': {64 'dir': data.get('dir'),65 'file': data.get('file')66 }67 }68 conf = SSHConfig(system.ip,69 system.user,70 system.password)71 executor = Executor.Create(conf)72 result = executor.run(mod)73 if result.return_code == 0:74 config_file.hash_code = result.lines[0]75 config_file.timestamp = arrow.utcnow()76 # Make directory for config file77 if not os.path.exists(current_app.config['CONFIG_FILES']):78 os.mkdir(current_app.config['CONFIG_FILES'])79 os.chdir(os.path.join(current_app.config['CONFIG_FILES']))80 dir_name = system.name + '_' + system.ip81 if not os.path.exists(dir_name):82 os.mkdir(dir_name)83 config_file.storage = current_app.config['CONFIG_FILES'] + '/' + os.path.join(dir_name)84 os.chdir('../')85 db.session.add(config_file)86 db.session.commit()87 return RestProtocol(config_file)88class ConfigFileApi(Resource):89 def __init__(self):90 super(ConfigFileApi, self).__init__()91 def get(self, **kwargs):92 config_file = ConfigFile.find(**kwargs)93 if config_file:94 return RestProtocol(config_file)95 else:96 return RestProtocol(message='Config file not found', error_code=-1), 40497 def post(self, **kwargs):98 config_file = ConfigFile.find(**kwargs)99 if config_file:100 remote_config = SSHConfig(101 config_file.process.server.ip,102 config_file.process.system.user,103 config_file.process.system.password104 )105 exe = Executor.Create(remote_config)106 mod = {107 'name': 'md5',108 'args': {109 'dir': config_file.dir,110 'file': config_file.file111 }112 }113 result = exe.run(mod)114 if result.return_code == 0:115 config_file.pre_hash_code = config_file.hash_code116 config_file.pre_timestamp = config_file.timestamp117 config_file.hash_code = result.lines[0]118 config_file.timestamp = arrow.utcnow()119 return RestProtocol({120 'id': config_file.id,121 'uuid': config_file.uuid,122 'name': config_file.name,123 'type': config_file.config_type.name,124 'dir': config_file.dir,125 'file': config_file.file,126 'hash': config_file.hash_code,127 'timestamp': config_file.timestamp and \128 config_file.timestamp.to(current_app.config['TIME_ZONE']).format('YYYY-MM-DD HH:mm:ss'),129 'hash_changed': False130 })131 else:132 return RestProtocol(message='Config file not found', error_code=-1), 404133class ConfigFileCheckApi(Resource):134 def __init__(self):135 super(ConfigFileCheckApi, self).__init__()136 self.system_list = []137 self.config_file_list = []138 self.executor_dict = {}139 self.check_result = {}140 def gather_systems(self, parent_sys):141 self.system_list.append(parent_sys)142 for child_sys in parent_sys.child_systems:143 if child_sys.disabled is False:144 self.gather_systems(child_sys)145 def get(self, **kwargs):146 system = TradeSystem.find(**kwargs)147 if system:148 self.gather_systems(system)149 for sys in self.system_list:150 self.config_file_list.extend(sys.config_files)151 conf = SSHConfig(sys.ip,152 sys.user,153 sys.password)154 executor = Executor.Create(conf)155 self.executor_dict[sys.id] = executor156 for config_file in self.config_file_list:157 mod = {158 'name': 'md5',159 'args': {160 'dir': config_file.dir,161 'file': config_file.file162 }163 }164 conf_sys = TradeSystem.find(id=config_file.sys_id)165 if conf_sys:166 result = self.executor_dict[conf_sys.id].run(mod)167 if result.return_code == 0:168 # 'True' means not change, 'False' means change169 if result.lines[0] == config_file.hash_code:170 self.check_result[config_file.name] = True, config_file.dir171 if result.lines[0] != config_file.hash_code:172 self.check_result[config_file.name] = False, config_file.dir173 return RestProtocol(self.check_result)174 else:...

Full Screen

Full Screen

confmodif.py

Source:confmodif.py Github

copy

Full Screen

1import os2def conf_file_modify(pop):3 config_file = open(os.path.join("utils", "config-feedforward.txt"), "w")4 config_file.write("[NEAT]\n")5 config_file.write("fitness_criterion = max\n")6 config_file.write("fitness_threshold = 10000\n")7 config_file.write("pop_size = " + str(pop) + "\n")8 config_file.write("reset_on_extinction = False\n\n")9 config_file.write("[DefaultGenome]\n")10 config_file.write("# node activation options\n")11 config_file.write("activation_default = softplus\n")12 config_file.write("activation_mutate_rate = 0.2\n")13 config_file.write("activation_options = tanh sigmoid square\n\n")14 config_file.write("# node aggregation options\n")15 config_file.write("aggregation_default = sum\n")16 config_file.write("aggregation_mutate_rate = 0.0\n")17 config_file.write("aggregation_options = sum\n\n")18 config_file.write("# node bias options\n")19 config_file.write("bias_init_mean = 0.0\n")20 config_file.write("bias_init_stdev = 1.0\n")21 config_file.write("bias_max_value = 30.0\n")22 config_file.write("bias_min_value = -30.0\n")23 config_file.write("bias_mutate_power = 0.5\n")24 config_file.write("bias_mutate_rate = 0.7\n")25 config_file.write("bias_replace_rate = 0.1\n\n")26 config_file.write("# genome compatibility options\n")27 config_file.write("compatibility_disjoint_coefficient = 1.0\n")28 config_file.write("compatibility_weight_coefficient = 0.5\n\n")29 config_file.write("# connection add/remove rates\n")30 config_file.write("conn_add_prob = 0.5\n")31 config_file.write("conn_delete_prob = 0.5\n\n")32 config_file.write("# connection enable options\n")33 config_file.write("enabled_default = True\n")34 config_file.write("enabled_mutate_rate = 0.01\n\n")35 config_file.write("feed_forward = True\n")36 config_file.write("initial_connection = full\n\n")37 config_file.write("# node add/remove rates\n")38 config_file.write("node_add_prob = 0.2\n")39 config_file.write("node_delete_prob = 0.2\n\n")40 config_file.write("# network parameters\n")41 config_file.write("num_hidden = 0\n")42 config_file.write("num_inputs = 8\n")43 config_file.write("num_outputs = 4\n\n")44 config_file.write("# node response options\n")45 config_file.write("response_init_mean = 1.0\n")46 config_file.write("response_init_stdev = 0.0\n")47 config_file.write("response_max_value = 30.0\n")48 config_file.write("response_min_value = -30.0\n")49 config_file.write("response_mutate_power = 0.0\n")50 config_file.write("response_mutate_rate = 0.0\n")51 config_file.write("response_replace_rate = 0.0\n\n")52 config_file.write("# connection weight options\n")53 config_file.write("weight_init_mean = 0.0\n")54 config_file.write("weight_init_stdev = 1.0\n")55 config_file.write("weight_max_value = 30\n")56 config_file.write("weight_min_value = -30\n")57 config_file.write("weight_mutate_power = 0.5\n")58 config_file.write("weight_mutate_rate = 0.8\n")59 config_file.write("weight_replace_rate = 0.1\n\n")60 config_file.write("[DefaultSpeciesSet]\n")61 config_file.write("compatibility_threshold = 3.0\n\n")62 config_file.write("[DefaultStagnation]\n")63 config_file.write("species_fitness_func = max\n")64 config_file.write("max_stagnation = 10\n")65 config_file.write("species_elitism = 2\n\n")66 config_file.write("[DefaultReproduction]\n")67 config_file.write("elitism = 2\n")...

Full Screen

Full Screen

PreprocessingConfig.py

Source:PreprocessingConfig.py Github

copy

Full Screen

1import yaml2from os.path import join3CONFIG_FILE = yaml.load(open("config_preprocessing.yml"))4def set_config_file(path_to_config):5 global CONFIG_FILE6 CONFIG_FILE = yaml.load(open("config_preprocessing.yml"))7# SOURCE DIR8def get_source_dir(dataset_name: str):9 return join(CONFIG_FILE["source_dir"],10 CONFIG_FILE["trimmomatic"]["dataset"][dataset_name]["name"])11# DATASET NAMES12def get_dataset_names():13 dataset_names = CONFIG_FILE["trimmomatic"]["dataset"]14 return [k for k in dataset_names.keys() if k != "default"]15# QC REPORT16def get_qcreport_fastq_thread_count():17 return CONFIG_FILE["qc_report"]["threads"]18def get_qcreport_source_path(dataset_name: str, trimmed: bool = False):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run molecule automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful