Best Python code snippet using hypothesis
reached_setpoint.py
Source:reached_setpoint.py
1from ...math.utils import compare_value2from ophyd import PVPositionerPC, Component as Cpt, Signal, Kind3from ophyd.utils import errors4from ophyd.status import SubscriptionStatus, Status, AndStatus5import logging6logger = logging.getLogger()7class OphydInvalidParameter(ValueError, errors.OpException):8 """given parameter was invalid9 """10 pass11class OphydMethodNotOverloaded(AssertionError, errors.OpException):12 """13 """14 pass15# t_super = Device16#: It has to be PVPositionerPC so that it will work17t_super = PVPositionerPC18class DoneBasedOnReadback(t_super):19 """Wait until readback is matching setpoint within requested precision20 The idea of this class is to mimic a proper done variable using21 setpoint and readback value. Then the done variable should be22 set when the readback value matches the setpoint value within23 the specified precision.24 See :class:`ReachedSetpoint` for an implemementation of this class25 The work is done in26 This checking behaviour is implemented in27 :meth:`_positionReached`.28 The device has to provide two signal like variables:29 * setpoint30 * readback31 Warning:32 Code not yet checked33 Todo:34 Check __init__ handling timeout35 """36 #:37 always_set = Cpt(Signal, name='always_set', value=False, kind=Kind.config)38 #: Tune correction is made by calling method set, but without39 #: writing a value to it40 do_not_set = Cpt(Signal, name='always_set', value=False, kind=Kind.config)41 def __init__(self, *args, **kws):42 """43 Args:44 setting_parameters :45 """46 self._setting_parameters = None47 self._timeout = None48 setting_parameters = kws.pop("setting_parameters", None)49 timeout = kws.pop("timeout", 0.0)50 timeout = float(timeout)51 super().__init__(*args, **kws)52 setpar = self._checkSettingParameters(setting_parameters)53 if setpar is None:54 cls_name = self.__class__.__name__55 txt = (56 f'{cls_name}._checkSettingParameters must return'57 ' valid parameters (returned None)'58 )59 raise AssertionError(txt)60 self._setting_parameters = setpar61 self._timeout = timeout62 self._checkSetup()63 # Required to trace the status of the device64 self._moving = None65 def _checkSetup(self):66 """check that instance contains required variables67 """68 assert(self.readback is not None)69 assert(callable(self.readback.get))70 assert(self.setpoint is not None)71 assert(callable(self.readback.get))72 assert(callable(self.readback.set))73 assert(self._timeout > 0)74 def _checkSettingParameters(self, setting_parameters):75 """Check and store setting Parameters76 Overload this function to check setting parameters77 Returns:78 valid setting parameters79 """80 raise OphydMethodNotOverloaded("Overload this method")81 return setting_parameters82 def _positionReached(self, *args, **kws):83 """check that the position has been reached84 Returns: flag(bool)85 Returns true if position was reached, false otherwise86 """87 raise OphydMethodNotOverloaded("Overload this method")88 def set(self, value):89 """90 Returns:91 :class:`ophyd.status.SubscriptionStatus`92 """93 def callback(*args, **kws):94 pos_valid = self._positionReached(*args, **kws)95 cls_name = self.__class__.__name__96 txt = (97 f'{cls_name}:set cb: args {args} kws {kws}:'98 f' self._moving {self._moving} pos_valid {pos_valid}'99 )100 self.log.info(txt)101 if self._moving and pos_valid:102 self._moving = False103 self.log.info(txt)104 return True105 else:106 self._moving = True107 return False108 pos_valid = self._positionReached(check_set_value=value)109 always_set = self.always_set.get()110 cls_name = self.__class__.__name__111 name = self.name112 if pos_valid:113 txt = f"{cls_name}: no motion required for value {value} always set {always_set}"114 self.log.info(txt)115 if pos_valid and not always_set:116 status = Status()117 # status.success = 1 -> status.set_finished()118 status.set_finished()119 txt = f"{cls_name}.{name}: no motion required : always set {always_set} False. No motion"120 self.log.debug(txt)121 return status122 self.log.debug(f'{cls_name}.{name}settle time {self.settle_time}')123 stat_rbk = None124 if not pos_valid:125 # No response expected126 stat_rbk = SubscriptionStatus(self.readback, callback,127 timeout=self._timeout,128 settle_time=self.settle_time)129 txt = f"{cls_name}: setting to value {value}"130 self.log.debug(txt)131 if self.do_not_set.get():132 txt = (133 f'Not setting the value {value} position valid {pos_valid}'134 f' stat_rbk {stat_rbk}'135 )136 self.log.info(txt)137 stat_setp = Status()138 stat_setp.set_finished()139 else:140 stat_setp = self.setpoint.set(value, timeout=self._timeout)141 if stat_rbk is None:142 status = stat_setp143 else:144 status = AndStatus(stat_rbk, stat_setp)145 cls_name = self.__class__.__name__146 txt = f"{cls_name}:set cb: value {value} status = {status}: setp {stat_setp} {stat_rbk}"147 # print(txt)148 self.log.info(txt)149 return status150class ReachedSetpoint(DoneBasedOnReadback):151 """Setpoint within some absolute precision152 Obsolete class use :class:`ReachedSetpointEPS` instead153 """154 def __init__(self, *args, **kwargs):155 msg = 'Obsolete class use ReachedSetpointEPS instead'156 raise NotImplementedError(msg)157class ReachedSetpointEPS(DoneBasedOnReadback):158 """Setpoint within some absolute and relative precision159 """160 eps_rel = Cpt(Signal, name='eps_rel', value=1e-9)161 eps_abs = Cpt(Signal, name='eps_abs', value=1e-9)162 def _correctReadback(self, val):163 return val164 def _positionReached(self, *args, **kws):165 """position within given range?166 """167 rbk = self.readback.get()168 rbk = self._correctReadback(rbk)169 check_set_value = kws.pop("check_set_value", None)170 if check_set_value is None:171 setp = self.setpoint.get()172 else:173 setp = check_set_value174 eps_abs = self.eps_abs.get()175 eps_rel = self.eps_rel.get()176 t_cmp = compare_value(rbk, setp, eps_abs=eps_abs, eps_rel=eps_rel)177 flag = t_cmp == 0178 c_name = str(self.__class__)179 name = self.name180 txt = (181 f'{c_name}:_positionReached: name {name}, set {setp} rbk {rbk} '182 f'eps: abs {eps_abs} rel {eps_rel} '183 f'comparison {t_cmp} position valid {flag}'184 )185 # print(txt)186 if flag:187 self.log.info(txt)188 else:189 self.log.debug(txt)190 return flag191 def _checkSettingParameters(self, unused):192 """Absolute value for setting parameter193 And thus just a float194 """195 eps_abs = self.eps_abs.get()196 try:197 t_range = float(eps_abs)198 assert(t_range > 0)199 except ValueError as des:200 msg = f"Expected eps_abs {eps_abs} >0 got: error {des}"201 raise OphydInvalidParameter(msg)202 eps_rel = self.eps_rel.get()203 try:204 t_range = float(eps_rel)205 assert(t_range > 0)206 except ValueError as des:207 msg = f"Expected eps_rel {eps_rel} >0 got: error {des}"208 raise OphydInvalidParameter(msg)...
ReachedSetPoint.py
Source:ReachedSetPoint.py
1from ....math.utils import compare_value2from ophyd import PVPositionerPC, Component as Cpt, Signal3from ophyd.utils import errors4from ophyd.status import SubscriptionStatus, Status, AndStatus5import logging6logger = logging.getLogger()7class OphydInvalidParameter(ValueError, errors.OpException):8 """given parameter was invalid9 """10 pass11class OphydMethodNotOverloaded(AssertionError, errors.OpException):12 """13 """14 pass15# t_super = Device16#: It has to be PVPositionerPC so that it will work17t_super = PVPositionerPC18class DoneBasedOnReadback(t_super):19 """Wait until readback is matching setpoint within requested precision20 The idea of this class is to mimic a proper done variable using21 setpoint and readback value. Then the done variable should be22 set when the readback value matches the setpoint value within23 the specified precision.24 See :class:`ReachedSetpoint` for an implemementation of this class25 The work is done in26 This checking behaviour is implemented in27 :meth:`_positionReached`.28 The device has to provide two signal like variables:29 * setpoint30 * readback31 Warning:32 Code not yet checked33 Todo:34 Check __init__ handling timeout35 """36 def __init__(self, *args, **kws):37 """38 Args:39 setting_parameters :40 """41 self._setting_parameters = None42 self._timeout = None43 setting_parameters = kws.pop("setting_parameters", None)44 timeout = kws.pop("timeout", 0.0)45 timeout = float(timeout)46 super().__init__(*args, **kws)47 setpar = self._checkSettingParameters(setting_parameters)48 if setpar is None:49 cls_name = self.__class__.__name__50 txt = (51 f'{cls_name}._checkSettingParameters must return'52 ' valid parameters (returned None)'53 )54 raise AssertionError(txt)55 self._setting_parameters = setpar56 self._timeout = timeout57 self._checkSetup()58 # Required to trace the status of the device59 self._moving = None60 def _checkSetup(self):61 """check that instance contains required variables62 """63 assert(self.readback is not None)64 self.readback.value65 assert(self.setpoint is not None)66 self.setpoint.value67 self.setpoint.set68 assert(self._timeout > 0)69 def _checkSettingParameters(self, setting_parameters):70 """Check and store setting Parameters71 Overload this function to check setting parameters72 Returns:73 valid setting parameters74 """75 raise OphydMethodNotOverloaded("Overload this method")76 return setting_parameters77 def _positionReached(self, *args, **kws):78 """check that the position has been reached79 Returns: flag(bool)80 Returns true if position was reached, false otherwise81 """82 raise OphydMethodNotOverloaded("Overload this method")83 def set(self, value):84 """85 Returns:86 :class:`ophyd.status.SubscriptionStatus`87 """88 def callback(*args, **kws):89 pos_valid = self._positionReached(*args, **kws)90 cls_name = self.__class__.__name__91 txt = (92 f'{cls_name}:set cb: args {args} kws {kws}:'93 f' self._moving {self._moving} pos_valid {pos_valid}'94 )95 self.log.debug(txt)96 if self._moving and pos_valid:97 self._moving = False98 return True99 else:100 self._moving = True101 return False102 pos_valid = self._positionReached(check_set_value=value)103 if pos_valid:104 status = Status()105 status.done = 1106 status.success = 1107 cls_name = self.__class__.__name__108 txt = f"{cls_name}: no motion required for value {value}"109 self.log.info(txt)110 return status111 self.log.info(f'settle time {self.settle_time}')112 stat_rbk = SubscriptionStatus(self.readback, callback,113 timeout=self._timeout,114 settle_time=self.settle_time)115 stat_setp = self.setpoint.set(value, timeout=self._timeout)116 status = AndStatus(stat_rbk, stat_setp)117 cls_name = self.__class__.__name__118 txt = f"{cls_name}:set cb: value {value} status = {status}"119 # print(txt)120 if logger:121 logger.debug(txt)122 return status123class ReachedSetpoint(DoneBasedOnReadback):124 """Setpoint within some absolute precision125 Obsolete class use :class:`ReachedSetpointEPS` instead126 """127 def __init__(self, *args, **kwargs):128 msg = 'Obsolete class use ReachedSetpointEPS instead'129 raise NotImplementedError(msg)130class ReachedSetpointEPS(DoneBasedOnReadback):131 """Setpoint within some absolute and relative precision132 """133 eps_rel = Cpt(Signal, name='eps_rel', value=1e-9)134 eps_abs = Cpt(Signal, name='eps_abs', value=1e-9)135 def _correctReadback(self, val):136 return val137 def _positionReached(self, *args, **kws):138 """position within given range?139 """140 rbk = self.readback.value141 rbk = self._correctReadback(rbk)142 check_set_value = kws.pop("check_set_value", None)143 if check_set_value is None:144 setp = self.setpoint.value145 else:146 setp = check_set_value147 eps_abs = self.eps_abs.value148 eps_rel = self.eps_rel.value149 t_cmp = compare_value(rbk, setp, eps_abs=eps_abs, eps_rel=eps_rel)150 flag = t_cmp == 0151 c_name = str(self.__class__)152 txt = (153 f'{c_name}:_positionReached: set {setp} rbk {rbk} '154 f'eps: abs {eps_abs} rel {eps_rel} '155 f'comparison {t_cmp} position valid {flag}'156 )157 # print(txt)158 self.log.info(txt)159 return flag160 def _checkSettingParameters(self, unused):161 """Absolute value for setting parameter162 And thus just a float163 """164 eps_abs = self.eps_abs.value165 try:166 t_range = float(eps_abs)167 assert(t_range > 0)168 except ValueError as des:169 msg = f"Expected eps_abs {eps_abs} >0 got: error {des}"170 raise OphydInvalidParameter(msg)171 eps_rel = self.eps_rel.value172 try:173 t_range = float(eps_rel)174 assert(t_range > 0)175 except ValueError as des:176 msg = f"Expected eps_rel {eps_rel} >0 got: error {des}"177 raise OphydInvalidParameter(msg)...
check_set_value.py
Source:check_set_value.py
1#!/usr/bin/python32# check_set_value.py: Wrapper to demonstrate check_set_value()3# utility function that determines whether a value is a legal4# value of a given SET column.5# Usage: check_set_value.py db_name tbl_name col_name test_value6import sys7import mysql.connector8import cookbook9from cookbook_utils import *10if len(sys.argv) != 5:11 print("Usage: check_enum_value.py db_name tbl_name col_name test_val")12 sys.exit(1)13db_name = sys.argv[1]14tbl_name = sys.argv[2]15col_name = sys.argv[3]16val = sys.argv[4]...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!