Best Python code snippet using localstack_python
Project.py
Source:Project.py
...42 def get_snakefile_directory(self) -> Path:43 raise Exception("Should not be called!")44 def get_main_snakefile_path(self) -> Path:45 raise Exception("Should not be called!")46 def get_environment_variables(self) -> Dict[str, List[str]]:47 raise Exception("Should not be called!")48 def get_definition(self) -> str:49 '''50 Definitions is a string of the form:51 "directory:<dir>" OR52 "package:<package>" OR53 "<dir>" which is equivallent to "directory:<dir>"54 '''55 raise Exception("Should not be called!")56 def build_object_graph(self, project: Project, args: List[str]) \57 -> ObjectGraph:58 project = project59 args = args60 raise Exception("not yet")61class DirectoryPipeline(Pipeline):62 """63 Subclass of the Pipeline class defined by directory path.64 A snakeobject DirectoryPipeline attributes are:65 .. py:attribute:: definition_dir66 :type: str67 .. py:attribute:: snakefile_directory68 :type: str69 DirecoryPipeline useful functions are:70 .. py:attribute:: get_snakefile_directory()71 returns directory of Snakefile72 .. py:attribute:: get_main_snakefile_path()73 returns path to the Snakefile74 .. py:attribute:: get_environment_variables()75 returns dictionary with keys PATH, PYTHPONPATH, etc.76 .. py:attribute:: build_object_graph(proj: Project, args: List[str])77 returns instance of and ObjectGraph78 """79 def __init__(self, definition_dir: str):80 self.definition_dir = definition_dir81 self.snakefile_directory = Path(definition_dir).resolve(True)82 def get_snakefile_directory(self) -> Path:83 return self.snakefile_directory84 def get_main_snakefile_path(self) -> Path:85 return self.snakefile_directory / "Snakefile"86 DIR_TO_ENV = {87 "bin": ["PATH"],88 "python": ["PATH", "PYTHONPATH"],89 "R": ["PATH"],90 "perl": ["PATH", "PERL5LIB"]91 }92 def get_environment_variables(self) -> Dict[str, List[str]]:93 vars: Dict[str, List[str]] = {}94 vars["PATH"] = [str(self.snakefile_directory)]95 for sub_dir, env_vars in DirectoryPipeline.DIR_TO_ENV.items():96 sub_dir_path = self.snakefile_directory / sub_dir97 if os.path.exists(sub_dir_path):98 for var in env_vars:99 try:100 vars[var].append(str(sub_dir_path))101 except KeyError:102 vars[var] = [str(sub_dir_path)]103 return vars104 def get_definition(self) -> str:105 return self.definition_dir106 def build_object_graph(self, project: Project, args: List[str]) \107 -> ObjectGraph:108 bldObjGraphPy = self.get_snakefile_directory() / "build_object_graph.py"109 if not os.path.isfile(bldObjGraphPy):110 raise Exception(f'ERROR: There is no {bldObjGraphPy}')111 spec = spec_from_file_location("build_object_graph", bldObjGraphPy)112 if spec is None:113 raise Exception("Could build specs for build_object_graph")114 if spec.loader is None:115 raise Exception("Loader is empty??")116 foo = module_from_spec(spec)117 spec.loader.exec_module(foo)118 newObjectGraph = ObjectGraph()119 foo.run(project, newObjectGraph, *args)120 return newObjectGraph121class PackagePipeline(Pipeline):122 """123 Subclass of Pipeline class defined by python package.124 A snakeobject PackagePipeline attributes are:125 .. py:attribute:: definition_package126 :type: str127 .. py:attribute:: snake_file_dir128 :type: str129 PackagePipeline useful functions are:130 .. py:attribute:: get_snakefile_directory()131 returns directory of Snakefile132 .. py:attribute:: get_main_snakefile_path()133 returns path to the Snakefile134 .. py:attribute:: get_environment_variables()135 returns dictionary with keys PATH, PYTHPONPATH, etc.136 .. py:attribute:: build_object_graph(proj: Project, args: List[str])137 returns instance of and ObjectGraph138 """139 def __init__(self, definition_package: str):140 self.definition_package = definition_package141 self.pipeline_package = importlib.import_module(self.definition_package)142 if self.pipeline_package.__file__ is None:143 raise Exception(f"{definition_package} should be a python package 1")144 if not self.pipeline_package.__file__.endswith('__init__.py'):145 raise Exception(f"{definition_package} should be a python package 2")146 self.snake_file_dir = Path(self.pipeline_package.__file__).parent147 def get_snakefile_directory(self) -> Path:148 return self.snake_file_dir149 def get_main_snakefile_path(self) -> Path:150 return self.snake_file_dir / "Snakefile"151 def get_environment_variables(self) -> Dict[str, List[str]]:152 try:153 return self.pipeline_package.get_environment_variables()154 except:155 return {}156 157 def get_definition(self) -> str:158 return f"pacakage:{self.definition_package}"159 def build_object_graph(self, project: Project, args: List[str]) \160 -> ObjectGraph:161 new_object_graph = ObjectGraph()162 self.pipeline_package.build_object_graph(163 project, new_object_graph, args)164 return new_object_graph165def build_pipeline(pipeline_definition: str) -> Pipeline:166 if pipeline_definition.startswith('directory:'):167 dir_path = pipeline_definition.split(':')[1]168 return DirectoryPipeline(dir_path)169 if pipeline_definition.startswith('package'):170 package_name = pipeline_definition.split(':')[1]171 return PackagePipeline(package_name)172 return DirectoryPipeline(pipeline_definition)173class Project:174 """175 Each objects of the Project class represents a snakeobject project.176 The project directory is given as the ``directory`` parameter. If 177 ``directory`` is none,178 the :py:func:`find_so_project_directory` function is used to determine 179 the project directory.180 A snakeobject project attributes are:181 .. py:attribute:: directory182 :type: str183 the project directory184 .. py:attribute:: parameters185 :type: dict[str,str]186 a key value dictionary for global project level parameters187 .. py:attribute:: parent_projects188 :type: dict[str,Project]189 a key value dictionary for parent project names190 .. py:attribute:: objectGraph191 :type: ObjectGraph192 the objectGraph for the project193 Project useful functions are:194 .. py:attribute:: get_object_directory(object_instance)195 returns directory of object196 .. py:attribute:: get_object_path(type_name, object_name)197 returns path to the object of type `type_name`, and object name `object_name`198 .. py:attribute:: get_pipeline_directory( )199 returns path to the project pipeline directory200 .. py:attribute:: get_parameter(name, parent_project_id=None)201 If `parent_project_id` is given returns the value of parameter in202 that project, otherwise, if `name` is in project parameters, returns203 its value. Finally, if `name` is not in the project parameters,204 returns the value of named parameters in the first project205 recursively folowing parent projects.206 """207 def __init__(self, directory=None):208 self.directory = directory if directory else find_so_project_directory()209 self._porojectFileName = self.directory + "/so_project.yaml"210 self._objectGraphFileName = self.directory + "/OG.json"211 if os.path.isfile(self._porojectFileName):212 self.parameters = load_yaml(self._porojectFileName)213 self.parameters['directory'] = self.directory214 else:215 self.parameters = {}216 self.parent_projects: Dict[str, Project] = {}217 self._run_parameter_interpolation()218 if 'so_parent_projects' in self.parameters and not self.parent_projects:219 for p, d in self.parameters['so_parent_projects'].items():220 self.parent_projects[p] = Project(d)221 if os.path.isfile(self._objectGraphFileName):222 self.objectGraph = load_object_graph(self._objectGraphFileName)223 else:224 self.objectGraph = ObjectGraph()225 self.set_pipeline()226 def interpolate(self, obj, oo=None):227 ptn = re.compile(r'(\[(\w+)\:([\w\/]+)(\:(\w+))?\])(\w*)')228 if type(obj) == int or type(obj) == float:229 return obj230 elif type(obj) == list:231 return [self.interpolate(x) for x in obj]232 elif type(obj) == dict:233 return {k: self.interpolate(v) for k, v in obj.items()}234 elif type(obj) == str:235 for s in ptn.findall(obj):236 iType = s[1]237 name = s[2]238 prnt = s[4]239 # print("AAA",O,s)240 if iType == 'P':241 if not oo:242 raise ProjectException("object interpolation requires an object")243 if name not in oo.params:244 raise ProjectException(245 "parameter %s is not defined for the object %s" % (name, oo.k()))246 obj = obj.replace(s[0], oo.params[name])247 elif iType == 'E':248 if name not in os.environ:249 raise ProjectException("Varianble %s is not defined" % name)250 obj = obj.replace(s[0], os.environ[name])251 elif iType == 'D':252 if name == "project":253 pv = self.directory254 elif name == "pipeline":255 pv = str(self.get_pipeline_directory())256 else:257 raise ProjectException('The project property %s is unknonw.' % name)258 obj = obj.replace(s[0], pv)259 elif iType == 'PP':260 if ('so_parent_projects' in self.parameters and261 not self.parent_projects):262 for k, v in self.parameters['so_parent_projects'].items():263 self.parent_projects[k] = Project(v)264 if not prnt:265 if name not in self.parameters:266 raise ProjectException('Parameter %s is not defined' % name)267 obj = obj.replace(s[0], self.parameters[name])268 elif '/' not in prnt:269 if '/' not in prnt:270 if prnt in self.parent_projects:271 dir = self.parent_projects[prnt].directory272 if not os.path.exists(dir):273 raise ProjectException('The path to parent does not exist')274 self.parent_projects[prnt] = Project(dir)275 obj = obj.replace(276 s[0], str(self.parent_projects[prnt].parameters[name]))277 else:278 raise ProjectException(279 f'Project {prnt} is not in so_parent_projects')280 elif '/' in prnt:281 cs = prnt.split('/')282 if not cs[0] in self.parent_projects:283 raise ProjectException(f'Project {cs[0]} is not in so_parent_projects')284 else:285 dir = self.parent_projects[cs[0]]286 if not os.path.exists(dir):287 raise ProjectException(f'The path {dir} to parent does not exist')288 self.parent_projects[cs[0]] = Project(dir)289 P = self.parent_projects[cs[0]]290 for p in cs[1:]:291 P = P.parent_projects[p]292 obj = obj.replace(s[0], str(P.parameters[name]))293 else:294 raise ProjectException(295 'Interpolation type [%s: ...] is unknown; can be only E|P|PP|D.' % iType)296 return obj297 def _run_parameter_interpolation(self):298 for k, v in sorted(self.parameters.items(), key=lambda x: not '[D:' in x):299 self.parameters[k] = self.interpolate(v)300 def get_parent_project(self, parent_project_id):301 parent_project_ids = parent_project_id.split("/")302 proj = self303 for id in parent_project_ids:304 proj = proj.parent_projects[id]305 return proj306 def get_parameter(self, name, parent_project_id=None):307 if parent_project_id:308 proj = self.get_parent_project(parent_project_id)309 return proj.parameters[name]310 if name in self.parameters:311 return self.parameters[name]312 for proj in self.parent_projects.values():313 v = proj.get_parameter(name)314 if v:315 return v316 return None317 def set_pipeline(self):318 pipeline_definition = f"directory:{self.directory}"319 if "SO_PIPELINE" in os.environ:320 pipeline_definition = os.environ['SO_PIPELINE']321 if "so_pipeline" in self.parameters:322 pipeline_definition = self.parameters["so_pipeline"]323 self.pipeline = build_pipeline(pipeline_definition)324 '''325 if "so_pipeline" in self.parameters:326 ppd = self.parameters['so_pipeline']327 if ppd.startswith('directory:'):328 ppd = os.path.abspath(ppd.split(':')[1])329 self.pipeline = DirectoryPipeline(ppd)330 elif ppd.startswith('package'):331 self.pipeline = PackagePipeline(ppd.split(':')[1])332 else:333 self.pipeline = DirectoryPipeline(ppd)334 elif "SO_PIPELINE" in os.environ:335 self.pipeline = DirectoryPipeline(os.environ['SO_PIPELINE'])336 elif os.path.exists("workflow"):337 self.pipeline = DirectoryPipeline(os.path.abspath("workflow"))338 else:339 self.pipeline = DirectoryPipeline(os.path.abspath(self.directory))340 '''341 def get_pipeline_directory(self) -> Path:342 self.set_pipeline()343 return self.pipeline.get_snakefile_directory()344 def get_environment_variables(self) -> Dict[str, List[str]]:345 vars = defaultdict(list)346 def add_vars(extra_vars: Dict[str, List[str]]):347 for var, values in extra_vars.items():348 try:349 vars[var] += values350 except KeyError:351 vars[var] = values352 # so_environment_EEE_add353 add_env_re = re.compile("so_environment_(.*)_add")354 for param_key, param_value in self.parameters.items():355 match = add_env_re.match(param_key)356 if match:357 env_name = match.group(1)358 if type(param_value) == list:359 for pv in param_value:360 vars[env_name].append(pv)361 else:362 vars[env_name].append(param_value)363 add_vars(self.pipeline.get_environment_variables())364 for pp in self.parent_projects.values():365 add_vars(pp.get_environment_variables())366 # so_environment_EEE_set367 set_env_re = re.compile("so_environment_(.*)_set")368 for param_key, param_value in self.parameters.items():369 match = set_env_re.match(param_key)370 if match:371 env_name = match.group(1)372 vars[env_name] = [param_value]373 return vars374 def save_object_graph(self):375 self.objectGraph.save(self._objectGraphFileName)376 def ensure_object_type_snakefile_exists(self, ot):377 sfile = self.get_pipeline_directory() / (ot + ".snakefile")378 if not os.path.exists(sfile):379 print(f'WARNING: creating dummy snakefile '380 f'{str(sfile).split("/")[-1]}')381 with open(sfile, 'w') as f:382 f.write(f'add_targets()\n')383 return sfile384 def write_main_snakefile(self):385 from glob import glob386 mf = self.get_pipeline_directory() / "Snakefile"387 from snakeobjects import __version__388 o_types = set(self.objectGraph.get_object_types())389 o_list = set([x.split('/')[-1][:-10] for x in390 glob(str(self.get_pipeline_directory() / '*.snakefile'))])391 all_o_types = sorted(set.union(o_types, o_list))392 if os.path.exists(mf):393 with open(mf) as f:394 old_version = f.readline().strip('\n\r').split(' ')[1]395 old_o_types = f.readline().strip('\n\r').split(' ')[2]396 if old_version == __version__ and old_o_types == ','.join(all_o_types):397 return398 header = importlib_resources.read_text(__package__, 'header.snakefile')399 with open(mf, 'w') as f:400 f.write(f'#snakeobjects {__version__}\n')401 f.write(f'#Snakeobjects types: {all_o_types}\n')402 f.write(header)403 for ot in all_o_types:404 sfile = self.ensure_object_type_snakefile_exists(ot)405 sfile = ot + ".snakefile"406 f.write(f'include: "{sfile}"\n')407 f.write(f'rule so_all_{ot}:\n')408 f.write(f' input:\n')409 f.write(f' expand("{{of}}", of=project.get_all_object_flags("{ot}"))\n\n')410 f.write(f'rule so_{ot}_obj:\n')411 f.write(f' input: get_targets("{ot}")\n')412 f.write(f' output: touch("{ot}/{{oid}}/obj.flag")\n\n')413 def get_object_flag(self, o):414 return f'{o.oType}/{o.oId}/obj.flag'415 def get_object_directory(self, o):416 return f'{self.directory}/{o.oType}/{o.oId}'417 def get_object_path(self, oType, oId):418 return Path(self.get_object_directory(self.objectGraph[oType, oId]))419 def create_symbolic_links(self):420 for tp in sorted(self.objectGraph.get_object_types()):421 for o in self.objectGraph[tp]:422 oDir = self.get_object_directory(o)423 for k, v in list(o.params.items()):424 if not k.startswith("symlink."):425 continue426 if not os.path.exists(oDir):427 os.makedirs(oDir)428 dst = oDir + "/" + k[8:]429 src = v430 os.system("ln -sf %s %s" % (src, dst))431 os.system("touch -h -r %s %s" % (src, dst))432 def get_all_object_flags(self, oType=None):433 OG = self.objectGraph434 if oType:435 return [self.get_object_flag(o) for o in OG[oType]]436 return [self.get_object_flag(o) for oType in OG.get_object_types() for o in OG[oType]]437 def set_environment(self,update_environment=True):438 if update_environment:439 print("UPDATING ENVIRONMENT:")440 print("export SO_PROJECT=", self.directory, sep="")441 print("export SO_PIPELINE=", self.pipeline.get_definition(), sep="")442 os.environ['SO_PROJECT'] = self.directory443 os.environ['SO_PIPELINE'] = str(self.pipeline.get_definition())444 vars = self.get_environment_variables()445 for var, values in vars.items():446 my_val = os.pathsep.join(values)447 if var in ['PATH', 'PYTHONPATH', 'PERL5LIB'] and var in os.environ:448 print(f"export {var}={my_val}{os.pathsep}${var}")449 values.append(os.environ[var])450 else:451 print(f"export {var}={my_val}")452 full_val = os.pathsep.join(values)453 os.environ[var] = full_val454 def buildObjectGraph(self, args: List[str]):455 self.set_environment()456 self.objectGraph = self.pipeline.build_object_graph(self, args)457if __name__ == "__main__":458 print("hi")...
command_handler.py
Source:command_handler.py
...60 runner = DockerPlanRunner(project_compose=self.project_compose,61 working_directory=self.working_directory,62 repo_dir=self.repo_dir)63 if StateHolder.always_update and cmd in ('start', 'up', 'restart'): # Pull before start in developer mode64 runner.run(plan=plan, commands='pull', envs=self.get_environment_variables(plan=plan))65 for cmd in command_list['docker']:66 if cmd == 'pull' and StateHolder.offline: # Skip pull in offline mode67 continue68 runner.run(plan=plan, commands=cmd,69 envs=self.get_environment_variables(plan=plan))70 def run_kubernetes(self, cmd, command_list, plan):71 runner = KubernetesRunner(working_directory=self.working_directory, repo_dir=self.repo_dir) \72 if StateHolder.container_mode == 'Kubernetes' \73 else HelmRunner(working_directory=self.working_directory, repo_dir=self.repo_dir)74 if len(command_list[StateHolder.container_mode.lower()]) == 0:75 ColorPrint.exit_after_print_messages('Command: ' + cmd + ' not supported with' + StateHolder.container_mode)76 for cmd in command_list[StateHolder.container_mode.lower()]:77 runner.run(plan=plan, commands=cmd, envs=self.get_environment_variables(plan=plan))78 def check_command(self, cmd):79 if self.hierarchy is None or not isinstance(self.hierarchy, dict):80 ColorPrint.exit_after_print_messages("Command hierarchy config is missing")81 if cmd not in self.hierarchy:82 ColorPrint.exit_after_print_messages("Command not found in hierarchy: " + str(cmd))83 def pre_run(self, command_list, plan):84 if command_list.get('before', False):85 self.script_runner.run(plan=plan, script_type='before_script')86 self.run_method('premethods', command_list)87 def after_run(self, command_list, plan):88 self.run_method('postmethods', command_list)89 if command_list.get('after', False):90 self.script_runner.run(plan=plan, script_type='after_script')91 def run_method(self, type, command_list):92 if type in command_list and len(command_list[type]) > 0:93 for method in command_list[type]:94 getattr(self, method)()95 def parse_environment_dict(self, path, env):96 """Compose dictionary from environment variables."""97 file_name = FileUtils.get_compose_file_relative_path(repo_dir=self.repo_dir,98 working_directory=self.working_directory,99 file_name=path)100 env_file = ProjectUtils.get_file(file=file_name)101 if env_file is None:102 ColorPrint.exit_after_print_messages(103 message="Environment file (" + str(file_name) + ") not exists in repository: " + StateHolder.name)104 with open(env_file) as stream:105 for line in stream.readlines():106 if not line.startswith("#"):107 data = line.split("=", 1)108 if len(data) > 1:109 env[data[0].strip()] = data[1].strip()110 def get_environment_dict(self, envs):111 """Process environment files. Environment for selected plan will be override the defaults"""112 environment = dict()113 '''First the default, if exists'''114 if "environment" in self.project_compose:115 self.parse_environment_dict(path=self.project_compose["environment"]["include"], env=environment)116 for env in envs:117 self.parse_environment_dict(path=env, env=environment)118 return environment119 def get_environment_variables(self, plan):120 """Get all environment variables depends on selected plan"""121 envs = list()122 if isinstance(plan, dict):123 if 'environment' in plan and 'include' in plan['environment']:124 envs.extend(ProjectUtils.get_list_value(plan['environment']['include']))125 if 'docker-compose-dir' in plan:126 envs.extend(FileUtils.get_filtered_sorted_alter_from_base_dir(base_dir=self.repo_dir,127 actual_dir=self.working_directory,128 target_directories=ProjectUtils.129 get_list_value(130 plan['docker-compose-dir']),131 filter_ends='.env'))132 env_dict = self.get_environment_dict(envs=envs)133 env_copy = os.environ.copy()134 for key in env_dict.keys():135 env_copy[key] = env_dict[key]136 """Add host system to environment"""137 env_copy["HOST_SYSTEM"] = platform.system()138 return env_copy139 def pack(self):140 plan = self.project_compose['plan'][self.plan]141 envs = self.get_environment_variables(plan=plan)142 runner = DockerPlanRunner(project_compose=self.project_compose,143 working_directory=self.working_directory,144 repo_dir=self.repo_dir)...
config_pipeline.py
Source:config_pipeline.py
1"""2This is a boilerplate pipeline 'example'3generated using Kedro 0.18.04"""5from kedro.pipeline import Pipeline, node, pipeline6from .nodes import get_environment_variables7def create_pipeline(**kwargs) -> Pipeline:8 # TODO: cover by test case9 return pipeline(10 [11 node(12 func=get_environment_variables,13 inputs=None,14 outputs="02_intermediate__env_variables",15 name="config__env_vars_extraction",16 ),17 node(18 func=lambda x: x["task_id"],19 inputs="02_intermediate__env_variables",20 outputs="params:common.task_id",21 name="geocoding__config__extraction__task_id",22 ),23 ],...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!