Best Python code snippet using lisa_python
main.py
Source:main.py
1# -*- coding: utf-8 -*-2import json3import re4from json import JSONDecodeError5from typing import List, Dict, Union6from docker import errors as docker_errors7from datetime import datetime8from DictObject import DictObject9from luckydonaldUtils.logger import logging10from luckydonaldUtils.encoding import to_native as n11from auto_proxy.docker_utils.signal import parse_trigger_config, trigger_containers, parse_signals12from auto_proxy.secrets import SIGNALS13from .jinja2_utils import get_template14from .docker_utils import get_current_container_id15from .docker_utils.env import extract_container_envs16from .classes import DockerInfo, GitInfo17from .watcher import Watcher18__author__ = 'luckydonald'19logger = logging.getLogger(__name__)20DEFAULT_HOST="example.com"21INPUT_FILENAME = "nginx.conf.jinja2"22OUTPUT_FILENAME = "/output/nginx.conf"23class Status(object):24 def __init__(self, key, text, emoji):25 self.key = key26 self.text = text27 self.emoji = emoji28 # end def29# end class30VAR_REPLACEMENT_REGEX = re.compile('§{(?P<var>[A-Z_][A-Z0-9_]*)}')31""" basically `§{VARNAME}` """32TRIGGER_EVENTS = ['die', 'stop', 'start']33HEALTH_STATUS = {"health_status: healthy": True, "health_status: unhealthy": False}34statussses = {35 Status("attach", "attached", "ð"),36 Status("commit", "committed", "ð¦"),37 Status("copy", "copied", "ð"),38 Status("create", "created", "âï¸"),39 Status("destroy", "destroyed", "ð¥"),40 Status("detach", "detached", "ð"),41 Status("die", "died", "ð"),42 Status("exec_create", "exec created", "ðð"),43 Status("exec_detach", "exec detached", "ðð"),44 Status("exec_start", "exec started", "ðâ¶ï¸"),45 Status("export", "was exported", "ð¤"),46 Status("exit", "exited", "ð
¾ï¸"), # this is a variant of 'die', with exitCode == 0.47 Status("health_status: healthy", "is now healthy", "ð"),48 Status("health_status: unhealthy", "is unhealthy", "ð"),49 Status("import", "was imported", "ð¥"),50 Status("kill", "got killed", "ð«"),51 Status("oom", "got oom killed", "ð£"),52 Status("rename", "renamed", "ð"),53 Status("resize", "resized", "âï¸"),54 Status("restart", "restarted", "ð"),55 Status("start", "started", "â¶ï¸"),56 Status("stop", "was stopped", "ð´"),57 Status("top", "top", "ð"),58 Status("pause", "paused", "â¸ï¸"),59 Status("unpause", "top", "â¶ï¸"),60 Status("update", "top", "ð¨"),61 Status(None, " â we don't really know what it did?", "â ï¸"),62}63NUMBERS = {64 "0": "0ï¸â£",65 "1": "1ï¸â£",66 "2": "2ï¸â£",67 "3": "3ï¸â£",68 "4": "4ï¸â£",69 "5": "5ï¸â£",70 "6": "6ï¸â£",71 "7": "7ï¸â£",72 "8": "8ï¸â£",73 "9": "9ï¸â£",74 True: "ð",75 False: "ð",76}77STATUS = {s.key: s for s in statussses}78GIT_INFO = GitInfo.from_gitinfo_values()79def main():80 logger.success("Will watch for status changes.")81 logger.info("listening to the events: {!r}".format(TRIGGER_EVENTS))82 # get previous config file, to see if we need to trigger a nginx reload.83 old_file = None84 try:85 with open(OUTPUT_FILENAME, 'r') as f:86 old_file = n(f.read())87 # end with88 except OSError:89 logger.info("Not yet existent (or not readable): /data/nginx.conf")90 # end if91 global_reload_signals = []92 if SIGNALS:93 global_reload_signals = parse_trigger_config(SIGNALS)94 # end if95 logger.debug(f'loaded global signal configuration: {global_reload_signals!r}')96 # prepare template97 template = get_template(INPUT_FILENAME)98 # prepare docker99 filters = {"event": TRIGGER_EVENTS}100 w = Watcher(filters=filters)101 client = w.client102 docker_version = client.version()103 logger.success("Running initial run.")104 inspect_and_template(client, docker_version, old_file, template)105 # listen to incoming events106 for event in w.run():107 logger.debug("New event:\n{!r}".format(event))108 if not hasattr(event, "status"):109 continue110 # end if111 # docker compose projects: only use service name, don't use image name.112 if 'com.docker.compose.service' in event.Actor.Attributes:113 container = event.Actor.Attributes['com.docker.compose.service']114 image = None115 else:116 container = event.Actor.Attributes['name']117 image = event.Actor.Attributes['image']118 # end if119 container_id = event.id120 # more_data = DictObject.objectify(client.api.inspect_container(container_id))121 # getting the right STATUS122 if ":" in event.status:123 event_type, event_meta = tuple(event.status.split(":", maxsplit=1))124 event_type, event_meta = event_type.strip(), event_meta.strip()125 else:126 event_type = event.status.strip()127 event_meta = None128 # end if129 if ( # special case 'die' with exitCode == 0 --> 'exit'130 event.status == "die" and131 'exitCode' in event.Actor.Attributes and132 str(event.Actor.Attributes['exitCode']) == "0"133 ):134 status = STATUS["exit"]135 elif event.status in STATUS:136 status = STATUS[event.status]137 elif event_type in STATUS:138 status = STATUS[event_type]139 else:140 status = STATUS[None]141 # end def142 # make sure we don't get updates we don't want.143 if event.status not in TRIGGER_EVENTS:144 logger.debug("Skipping event {e!r} of container {c!r}.".format(e=event.status, c=container))145 continue146 # end if147 assert isinstance(status, Status)148 message = "{emoji} Container {container}: {image} {text}".format(149 image=" ({})".format(image) if image else "",150 emoji=status.emoji, container=container,151 text=status.text,152 )153 logger.success(message)154 # now155 # containers156 did_change, old_file = inspect_and_template(client, docker_version, old_file, template, global_container_signals=global_reload_signals)157 # end for158# end def159def inspect_and_template(client, docker_version, old_file, template, global_container_signals=None):160 if global_container_signals is None:161 global_container_signals = []162 # end if163 containers = []164 local_container_signals = []165 # retry listing until it succeeds166 list_exception = False167 while list_exception is not None:168 try:169 containers = client.containers.list(sparse=False)170 list_exception = None171 except docker_errors.NotFound as e: # probably a container which died just the right moment.172 list_exception = e173 logger.warn("Exception occured when listing, retrying...", exc_info=True)174 # end try175 # end while176 instances_by_name = {}177 for container in containers:178 if all(label in container.labels for label in (179 'com.docker.compose.project',180 'com.docker.compose.service',181 'com.docker.compose.container-number'182 )):183 service_name = container.labels['com.docker.compose.project'] + "_" \184 + container.labels['com.docker.compose.service'] # + "_" \185 # + ['com.docker.compose.container-number']186 service_name_short = container.labels['com.docker.compose.service']187 else:188 service_name = container.name189 service_name_short = service_name190 # end if191 container_environment_vars = extract_container_envs(container)192 def env_replace_func(match):193 """194 Used in :py:`get_label` as function to resolve the `§{VARNAME}` replacements.195 See the regex :py:`VAR_REPLACEMENT_REGEX` above.196 :param match:197 :return:198 """199 var = match.group('var')200 return container_environment_vars[var]201 # end def202 def get_label(203 name, default,204 valid_values=None,205 replace_variables_label=True, replace_variables_default=False, replace_variables=None,206 ):207 """208 Gets a label from the container label section.209 Optionally allows to pull in environment variables of the form `§{VARNAME}`.210 :param name: name of the label211 :type name: str212 :param default: The default value, if label was not found213 :type default: str214 :param valid_values: A list of valid values215 :type valid_values: List[str]|None216 :param replace_variables_label: If we should replace `§{VARNAME}` environment variables in the loaded label value.217 :type replace_variables_label: bool218 :param replace_variables_default: If we should replace `§{VARNAME}` environment variables in the fallback default value.219 :type replace_variables_default: bool220 :param replace_variables: If we should replace `§{VARNAME}` environment variables221 in the loaded label value and fallback default value.222 This overwrites the settings of `replace_variables_label` and `replace_variables_default`.223 Default: No action (`None`)224 :type replace_variables_label: None|bool225 :return: The label value or the default value.226 :rtype: str227 """228 if isinstance(replace_variables, bool):229 # `replace_variables` overwrites `replace_variables_label` and `replace_variables_default`.230 replace_variables_label = replace_variables231 replace_variables_default = replace_variables232 # end if233 label = container.labels.get(name, None)234 # TODO: https://stackoverflow.com/a/11819111/3423324#match-character-which-is-not-escaped235 if label is None:236 # use default237 label = default238 if replace_variables_default:239 label = VAR_REPLACEMENT_REGEX.sub(env_replace_func, label)240 # end if241 elif replace_variables_label:242 label = VAR_REPLACEMENT_REGEX.sub(env_replace_func, label)243 # end if244 if valid_values is not None and label not in valid_values:245 raise AssertionError(f'Invalid value for label {name}: Got {label!r}, allowed are {valid_values!r}')246 # end if247 return label248 # end def249 def get_label_array(250 name, default=None,251 replace_variables_label=True, replace_variables_default=False, replace_variables=None,252 ):253 """254 Gets a label from the container label section.255 Expects multiple sub-keys like "name.xxx", "name.yyy", "name.zzz".256 Optionally allows to pull in environment variables of the form `§{VARNAME}`.257 :param name: name of the label258 :type name: str259 :param default: The default value, if label was not found. If not set uses an empty array `{}`.260 :type default: dict261 :param replace_variables_label: If we should replace `§{VARNAME}` environment variables in the loaded label value.262 :type replace_variables_label: bool263 :param replace_variables_default: If we should replace `§{VARNAME}` environment variables in the fallback default value.264 :type replace_variables_default: bool265 :param replace_variables: If we should replace `§{VARNAME}` environment variables266 in the loaded label value and fallback default value.267 This overwrites the settings of `replace_variables_label` and `replace_variables_default`.268 Default: No action (`None`)269 :type replace_variables_label: None|bool270 :return: The array of labels grouped together with a common prefix.271 :rtype: Dict[str, str]272 """273 name = name.rstrip(".") # don't end with a dot.274 if default is None:275 default = {}276 # end if277 if isinstance(replace_variables, bool):278 # `replace_variables` overwrites `replace_variables_label` and `replace_variables_default`.279 replace_variables_label = replace_variables280 replace_variables_default = replace_variables281 # end if282 container.labels: dict283 data_array: Union[str, None] = get_label(284 name, default="", valid_values=None,285 replace_variables_label=replace_variables_label, replace_variables_default=replace_variables_default,286 replace_variables=replace_variables287 )288 # check if we get the key itself. If we do, assume string containing encoded json.289 if data_array:290 logger.debug(f"Found exact label {name!r} of container {service_name!r} ({container.id!r}), parsing as json to use as base for adding sub-labels: {data_array!r}")291 try:292 data_array: dict = json.loads(data_array)293 except JSONDecodeError:294 logger.warning(f"The data for the array at {name!r} of container {service_name!r} ({container.id!r}) could not be parsed. Using default {default!r}.")295 data_array: dict = default296 # end try297 else:298 data_array: dict = default299 # end if300 name_prefix = name + "."301 name_prefix_len = len(name_prefix)302 for label in container.labels.keys():303 if not label.startswith(name_prefix):304 # not an label of our array305 continue306 # end if307 key = label[name_prefix_len:] # strip the "name." prefix308 value = get_label(309 label, default="", valid_values=None,310 replace_variables_label=replace_variables_label, replace_variables_default=replace_variables_default,311 replace_variables=replace_variables312 )313 data_array[key] = value314 # end for315 return data_array316 # end def317 # check auto_proxy.signal before the auto_proxy.enable check continue318 signal = get_label('auto_proxy.signal', '', replace_variables=False)319 if signal:320 container_signals = parse_signals(container.id, signal)321 local_container_signals.extend(container_signals)322 logger.debug(f'loaded signal configuration for container {container.name} {container.id}:\n{container_signals!r}')323 # end if324 if get_label('auto_proxy.enable', '0', valid_values=["0", "1"], replace_variables=False) != '1':325 logger.debug(f'skipping disabled container: {container.id} {service_name}')326 continue327 # end if328 hosts = get_label('auto_proxy.hosts', get_label('auto_proxy.host', DEFAULT_HOST)).split(",")329 service_data = {330 "name": service_name,331 "short_name": service_name_short,332 "mount_point": get_label('auto_proxy.mount_point', f"{service_name_short}"),333 "strip_mount_point": get_label('auto_proxy.strip_mount_point', "0", valid_values=["0", "1"]) == "1",334 "buffer": get_label('auto_proxy.buffer', "1", valid_values=["0", "1"]) == "1",335 "enforce_https": get_label('auto_proxy.enforce_https', "1", valid_values=["0", "1", "client", "proxy"]),336 "hosts": hosts,337 "access": get_label('auto_proxy.access', "net", valid_values=['net', 'socket']),338 "protocol": get_label('auto_proxy.protocol', "http", valid_values=['http', 'uwsgi']),339 "port": int(get_label('auto_proxy.port', "80")),340 "socket_name": get_label('auto_proxy.socket_name', f"{service_name_short}.sock"),341 "environment": container_environment_vars,342 "directives": {343 "nginx": get_label_array('auto_proxy.directives.nginx'),344 },345 }346 instance_data = {347 "id": container.id,348 "name": container.name,349 }350 if service_name not in instances_by_name:351 instance = {352 "scaled_instances": [instance_data]353 }354 instance.update(service_data)355 instances_by_name[service_name] = instance356 else:357 instance = instances_by_name[service_name]358 for k, v in service_data.items():359 if instance[k] != v:360 raise AssertionError("instance[k] != v", k, service_data, instance)361 # end if362 # end for363 instance['scaled_instances'].append(instance_data)364 # end if365 # end for366 services_by_host = {}367 logger.debug(f"instances_by_name: {instances_by_name}")368 for instance in instances_by_name.values():369 logger.debug(f"instance: {instance}")370 for host in instance['hosts']:371 if host not in services_by_host:372 services_by_host[host] = []373 # end if374 services_by_host[host].append(instance)375 # end for376 # end for377 did_change, old_file = run_templating(378 client, docker_version, old_file, template,379 services_by_host=services_by_host,380 services_by_name=instances_by_name,381 services=instances_by_name.values(),382 )383 if did_change:384 trigger_containers(client, containers=global_container_signals + local_container_signals)385 return did_change, old_file386# end def387def run_templating(388 client, docker_version, old_file, template,389 services, services_by_host, services_by_name,390):391 d = client.info()392 docker = DockerInfo(393 name=d.get("Name"),394 container_count=int(d.get("Containers")),395 image_count=int(d.get("Images")),396 version=docker_version.get("Version"),397 api_version=docker_version.get("ApiVersion"),398 go_version=docker_version.get("GoVersion"),399 operating_system=docker_version.get("Os"),400 architecture=docker_version.get("Arch"),401 current_container_id=get_current_container_id(),402 datetime=datetime.now(),403 git_info=GIT_INFO,404 )405 new_file = template.render(406 docker=docker,407 services=services,408 services_by_host=services_by_host,409 services_by_name=services_by_name410 )411 if new_file != old_file:412 with open(OUTPUT_FILENAME, 'w') as f:413 f.write(new_file)414 # end with415 logger.success(f"File written to {OUTPUT_FILENAME}.")416 # old_file = new_file417 return True, new_file418 # end if419 logger.info(f"File would be unchanged. Not written.")420 return False, old_file421# end main422if __name__ == '__main__':423 logging.add_colored_handler("root", level=logging.DEBUG)424 main()...
test_replace_vars.py
Source:test_replace_vars.py
1#################################################################################2# The Institute for the Design of Advanced Energy Systems Integrated Platform3# Framework (IDAES IP) was produced under the DOE Institute for the4# Design of Advanced Energy Systems (IDAES), and is copyright (c) 2018-20215# by the software owners: The Regents of the University of California, through6# Lawrence Berkeley National Laboratory, National Technology & Engineering7# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia University8# Research Corporation, et al. All rights reserved.9#10# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and11# license information.12#################################################################################13"""14This module contains tests for the variable replace transformation.15"""16import pytest17import pyomo.environ as pyo18import idaes.core.plugins19__author__ = "John Eslick"20@pytest.mark.unit21def test_1():22 # Test scalar variables23 rp = pyo.TransformationFactory("replace_variables")24 m = pyo.ConcreteModel()25 m.x = pyo.Var(initialize=2)26 m.y = pyo.Var(initialize=3)27 m.z = pyo.Var(initialize=0)28 m.c1 = pyo.Constraint(expr=m.z == m.x + m.y)29 m.e1 = pyo.Expression(expr=m.x**m.y)30 m.o1 = pyo.Objective(expr=m.y - m.x)31 assert m.c1.body() == -5 # hope constraint arrangment is deterministic32 assert pyo.value(m.e1) == 833 assert pyo.value(m.o1) == 134 rp.apply_to(m, substitute=[(m.y, m.x)])35 assert m.c1.body() == -436 assert pyo.value(m.e1) == 437 assert pyo.value(m.o1) == 038@pytest.mark.unit39def test_2():40 # Test vector variables and sums41 rp = pyo.TransformationFactory("replace_variables")42 m = pyo.ConcreteModel()43 m.x = pyo.Var(["a", "b", "c"], initialize=2)44 m.y = pyo.Var(initialize=3)45 m.z = pyo.Var(initialize=0)46 m.c1 = pyo.Constraint(expr=m.z == m.x["a"] + m.x["b"] + m.x["c"])47 m.e1 = pyo.Expression(expr=sum(m.x[i] for i in m.x))48 assert m.c1.body() == -6 # hope constraint arrangment is deterministic49 assert pyo.value(m.e1) == 650 rp.apply_to(m, substitute=[(m.x["c"], m.y)])51 assert m.c1.body() == -752 assert pyo.value(m.e1) == 753@pytest.mark.unit54def test_3():55 # Test expression in constraint56 rp = pyo.TransformationFactory("replace_variables")57 m = pyo.ConcreteModel()58 m.x = pyo.Var(["a", "b", "c"], initialize=2)59 m.y = pyo.Var(initialize=3)60 m.z = pyo.Var(initialize=0)61 m.e1 = pyo.Expression(expr=sum(m.x[i] for i in m.x))62 m.c1 = pyo.Constraint(expr=m.z == m.e1)63 assert m.c1.body() == -6 # hope constraint arrangment is deterministic64 rp.apply_to(m, substitute=[(m.x["c"], m.y)])65 assert m.c1.body() == -766@pytest.mark.unit67def test_4():68 # Test expression in objective69 rp = pyo.TransformationFactory("replace_variables")70 m = pyo.ConcreteModel()71 m.x = pyo.Var(["a", "b", "c"], initialize=2)72 m.y = pyo.Var(initialize=3)73 m.z = pyo.Var(initialize=0)74 m.e1 = pyo.Expression(expr=sum(m.x[i] for i in m.x))75 m.c1 = pyo.Constraint(expr=m.z == m.e1)76 m.o1 = pyo.Objective(expr=m.e1)77 assert pyo.value(m.o1) == 678 rp.apply_to(m, substitute=[(m.x["c"], m.y)])79 assert pyo.value(m.o1) == 780@pytest.mark.unit81def test_4():82 # Test in a hierarchical model83 rp = pyo.TransformationFactory("replace_variables")84 m = pyo.ConcreteModel()85 m.b1 = pyo.Block()86 m.b1.b2 = pyo.Block()87 x = m.b1.b2.x = pyo.Var(["a", "b", "c"], initialize=2)88 m.y = pyo.Var(initialize=3)89 m.z = pyo.Var(initialize=0)90 m.e1 = pyo.Expression(expr=sum(x[i] for i in x))91 m.b1.c1 = pyo.Constraint(expr=m.z == m.e1)92 m.o1 = pyo.Objective(expr=m.e1)93 assert pyo.value(m.o1) == 694 assert m.b1.c1.body() == -695 assert pyo.value(m.e1) == 696 rp.apply_to(m, substitute=[(x["c"], m.y)])97 assert pyo.value(m.o1) == 798 assert m.b1.c1.body() == -799 assert pyo.value(m.e1) == 7100@pytest.mark.unit101def test_5():102 # Test indexed var replace103 rp = pyo.TransformationFactory("replace_variables")104 m = pyo.ConcreteModel()105 m.b1 = pyo.Block()106 m.b1.b2 = pyo.Block()107 x = m.b1.b2.x = pyo.Var(["a", "b", "c"], [1, 2, 3], initialize=2)108 m.y = pyo.Var(["a", "b", "c", "d"], [1, 2, 3], initialize=3)109 m.z = pyo.Var(initialize=0)110 m.e1 = pyo.Expression(expr=sum(x[i] for i in x))111 m.b1.c1 = pyo.Constraint(expr=m.z == m.e1)112 m.o1 = pyo.Objective(expr=m.e1)113 assert pyo.value(m.o1) == 18114 assert m.b1.c1.body() == -18115 assert pyo.value(m.e1) == 18116 rp.apply_to(m, substitute=[(x, m.y)])117 assert pyo.value(m.o1) == 27118 assert m.b1.c1.body() == -27119 assert pyo.value(m.e1) == 27120@pytest.mark.unit121def test_6():122 # Test non-variable exception123 rp = pyo.TransformationFactory("replace_variables")124 m = pyo.ConcreteModel()125 m.b1 = pyo.Block()126 m.b1.b2 = pyo.Block()127 x = m.b1.b2.x = pyo.Var(["a", "b", "c"], [1, 2, 3], initialize=2)128 m.y = pyo.Var(["a", "b", "c", "d"], [1, 2, 3], initialize=3)129 m.z = pyo.Var(initialize=0)130 with pytest.raises(TypeError):131 rp.apply_to(m, substitute=[(x, m.b1)])132 with pytest.raises(TypeError):133 rp.apply_to(m, substitute=[(m.b1, x)])134@pytest.mark.unit135def test_7():136 # Test replace indexed by non-indexed137 rp = pyo.TransformationFactory("replace_variables")138 m = pyo.ConcreteModel()139 m.b1 = pyo.Block()140 m.b1.b2 = pyo.Block()141 x = m.b1.b2.x = pyo.Var(["a", "b", "c"], [1, 2, 3], initialize=2)142 m.y = pyo.Var(["a", "b", "c", "d"], [1, 2, 3], initialize=3)143 m.z = pyo.Var(initialize=0)144 assert x.is_indexed()145 assert not m.z.is_indexed()146 with pytest.raises(TypeError):147 rp.apply_to(m, substitute=[(x, m.z)])148@pytest.mark.unit149def test_8():150 # Test replace indexed by indexed var that doesn't have enough/right indexes151 rp = pyo.TransformationFactory("replace_variables")152 m = pyo.ConcreteModel()153 m.b1 = pyo.Block()154 m.b1.b2 = pyo.Block()155 x = m.b1.b2.x = pyo.Var(["a", "b", "c"], [1, 2, 3], initialize=2)156 m.y = pyo.Var(["a", "b", "d"], [1, 2, 3], initialize=3)157 with pytest.raises(ValueError):158 rp.apply_to(m, substitute=[(x, m.y)])159@pytest.mark.unit160def test_9():161 # test with references the way we handle time indexing a lot in IDAES162 rp = pyo.TransformationFactory("replace_variables")163 block_set = {1, 2, 3}164 m = pyo.ConcreteModel()165 m.b1 = pyo.Block(block_set)166 for i in block_set:167 m.b1[i].x = pyo.Var(initialize=2)168 m.y = pyo.Var([1, 2, 3], initialize=3)169 m.xx = pyo.Reference(m.b1[:].x)170 m.display()171 m.e1 = pyo.Expression(expr=sum(m.xx[i] for i in m.xx))172 m.e2 = pyo.Expression(expr=sum(m.b1[i].x for i in m.b1))173 assert pyo.value(m.e1) == 6174 assert pyo.value(m.e2) == 6175 rp.apply_to(m, substitute=[(m.xx, m.y)])176 assert pyo.value(m.e1) == 9177 assert pyo.value(m.e2) == 9178@pytest.mark.unit179def test_10():180 # test with more than one variable in the list181 rp = pyo.TransformationFactory("replace_variables")182 m = pyo.ConcreteModel()183 m.x = pyo.Var(["a", "b", "c"], initialize=2)184 m.a = pyo.Var(initialize=5)185 m.b = pyo.Var(initialize=6)186 m.c = pyo.Var(initialize=7)187 m.e1 = pyo.Expression(expr=sum(m.x[i] for i in m.x))188 assert pyo.value(m.e1) == 6189 rp.apply_to(m, substitute=[(m.x["a"], m.a), (m.x["b"], m.b), (m.x["c"], m.c)])...
bbRobotLibrary.py
Source:bbRobotLibrary.py
...24 self._rawoutput = ''25 process = subprocess.Popen(["sudo","-v"], universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)26 self.sudooutput = process.communicate()27 def get_error_root(self):28 hostlist = BuiltIn().replace_variables('${HOSTLIST}')29 if hostlist == '':30 return self.jout31 if self.jout['rc'] == 0:32 return self.jout33 rank = self.jout['error']['firstFailRank']34 return self.jout[rank]35 def failing_rank_is(self):36 hostlist = BuiltIn().replace_variables('${HOSTLIST}')37 if hostlist == '':38 if self.jout['rc'] != 0:39 return 040 try:41 if self.jout['rc'] != 0:42 return self.jout['error']['firstFailRank']43 return "none"44 except ValueError as e:45 return "none"46 def failing_details(self):47 try:48 err = self.get_error_root()49 rank = self.failing_rank_is()50 return "Rank %s with status %s" % (rank, err['error']['text'])51 except ValueError as e:52 return "(failing_details_exception)"53 def status_should_be(self, expected_status):54 if expected_status != self.jout['rc']:55 if self.jout['rc'] != '0':56 raise AssertionError("Expected status to be '%s' but was '%s'. Details '%s'"57 % (expected_status, self.jout['rc'], self.failing_details()))58 else:59 raise AssertionError("Expected status to be '%s' but was '%s'." % (expected_status, self.jout['rc']))60 def status_should_not_be(self, expected_status):61 if expected_status == self.jout['rc']:62 if self.jout['rc'] != '0':63 raise AssertionError("Expected status to not be '%s' but was '%s'. Details '%s'"64 % (expected_status, self.jout['rc'], self.failing_details()))65 else:66 raise AssertionError("Expected status to not SUCCESSFULL")67 def read_device_data(self, field, rank):68 hostlist = BuiltIn().replace_variables('${HOSTLIST}')69 if hostlist == '':70 return self.jout['out'][field]71 return self.jout[rank]['out'][field]72 def read_transfer_handle(self, rank):73 hostlist = BuiltIn().replace_variables('${HOSTLIST}')74 if hostlist == '':75 return self.jout['out']['transferHandle']76 return self.jout[rank]['out']['transferHandle']77 def read_transfer_handle_list(self, rank):78 hostlist = BuiltIn().replace_variables('${HOSTLIST}')79 if hostlist == '':80 return self.jout['out']['handles']81 return self.jout[rank]['out']['handles']82 def read_transfer_status(self, rank):83 hostlist = BuiltIn().replace_variables('${HOSTLIST}')84 if hostlist == '':85 return self.jout['out']['status']86 return self.jout[rank]['out']['status']87 def bbcmd(self, command, *args):88 iamroot = BuiltIn().replace_variables('${IAMROOT}')89 suiteisroot= BuiltIn().replace_variables('${SUITEISROOT}')90 ranklist = BuiltIn().replace_variables('${RANKLIST}')91 hostlist = BuiltIn().replace_variables('${HOSTLIST}')92 jobid = BuiltIn().replace_variables('${JOBID}')93 jobstepid = BuiltIn().replace_variables('${JOBSTEPID}')94 command = [self._sut_path, command, '--config', self._config_file] + list(args)95 if hostlist != '':96 command.insert(2, "--hostlist=" + hostlist)97 command.insert(2, "--target=" + ranklist)98 else:99 command.insert(2, "--contribid=0")100 orighostlist = BuiltIn().replace_variables('${ORIG_HOSTLIST}')101 if hostlist == '':102 return103 104 command.insert(2, "--jobid=" + str(jobid))105 command.insert(2, "--jobstepid=" + str(jobstepid))106 if iamroot == '1' or suiteisroot == '1':107 command.insert(0, 'sudo')108 print("Command: '%s'" % ' '.join(command))109 process = subprocess.Popen(command, universal_newlines=True, stdout=subprocess.PIPE,110 stderr=subprocess.STDOUT)111 self._output = process.communicate()[0].strip()112 try:113 self.jout = json.loads(self._output)114 print(json.dumps(self.jout, indent=4, sort_keys=True))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!