Best Python code snippet using playwright-python
actions.py
Source:actions.py
1from collections import namedtuple2from pprint import pprint3from gebsyas.basic_controllers import InEqController, run_ineq_controller4from gebsyas.dl_reasoning import DLDisjunction, DLConjunction5from gebsyas.numeric_scene_state import AssertionDrivenPredicateState, pfd_str6from gebsyas.predicates import Predicate7from gebsyas.ros_visualizer import ROSVisualizer8import rospy9Context = namedtuple('Context', ['agent', 'log', 'display'])10LogData = namedtuple('LogData', ['stamp', 'name', 'data'])11def pbainst_str(pba):12 return '{}({})'.format(pba.action_wrapper.action_name, ', '.join(['{}={}'.format(a,v) for a, v in pba.assignments.items()]))13class Logger():14 """Super class for all loggers.15 Provides a very basic logging functionality, which has an automatic indentation level.16 All logged messages are automatically timestamped relative to the creation of the logger.17 Overrides __call__ to allow for shorter logging calls.18 """19 def __init__(self):20 """Initializes indentation level and reference time point"""21 self.log_birth = rospy.Time.now()22 self.indentation = 023 def log(self, data, name=''):24 """Logs arbitrary data.25 The log can be named or unnamed.26 The message is reformatted to match the current indentation level.27 """28 title_str = '{:>12.4f}:{} | '.format((rospy.Time.now() - self.log_birth).to_sec(), ' ' * self.indentation)29 pure_data_str = str(data)30 if pure_data_str[-1] == '\n':31 data_str = pure_data_str[:-1].replace('\n', '\n{}| '.format(' ' * (len(title_str) - 2)))32 else:33 data_str = pure_data_str.replace('\n', '\n{}| '.format(' ' * (len(title_str) - 2)))34 if len(name) > 0:35 print('{}{}: {}'.format(title_str, name, data_str))36 else:37 print('{}{}'.format(title_str, data_str))38 def indent(self):39 """Increases the indentation level by one."""40 self.indentation = self.indentation + 141 def unindent(self):42 """Decreases the indentation level by one. Level will never be smaller than 0."""43 if self.indentation > 0:44 self.indentation = self.indentation - 145 def __call__(self, data, name=''):46 """Override to make logging more comfortable"""47 self.log(data, name)48class Action(object):49 """ Superclass for all actions in the system.50 Actions have a name and an execution function, which implements the action's behavior.51 """52 def __init__(self, name):53 """Sets the name of the action"""54 self.name = name55 def __str__(self):56 """Basic string representation for an action"""57 return 'action({})'.format(self.name)58 def execute(self, context):59 """Override and implement action's behavior here60 Return a value >= 0, <= 1. 0 is total failure, 1 is complete success61 """62 raise (NotImplementedError)63 def execute_subaction(self, context, action):64 """Comfort wrapper. automatically logs the start of the new action and indents the log.65 Passes along the subaction's return.66 """67 context.log(action.name, 'Starting sub action')68 context.log.indent()69 out = action.execute(context)70 context.log.unindent()71 context.log.log('\b{}\nExecution of sub action {} results in {}'.format('-' * 25, action.name, out))72 return out73class PBasedActionSequence(Action):74 """75 @brief An action which executes a sequence of predicate-based actions.76 """77 def __init__(self, sequence=[], cost=0.0):78 """Constructor. Receives a sequence of PWrapperInstance to instantiate and execute."""79 super(PBasedActionSequence, self).__init__('P-ActionSequence')80 self.sequence = sequence81 self.cost = cost82 def execute(self, context):83 """Executes the action sequence by testing the wrappers' preconditions against the current predicate state,84 instantiating the actions and executing them,85 """86 pred_state = context.agent.get_predicate_state()87 for a_inst in self.sequence:88 if rospy.is_shutdown():89 return 0.090 precon_diff = AssertionDrivenPredicateState(a_inst.precons).difference(context, pred_state)91 if len(precon_diff) == 0:92 if self.execute_subaction(context, a_inst.action_wrapper.instantiate_action(context, a_inst.assignments)) < 0.8:93 return 0.094 else:95 context.log('Execution of action sequence terminated because the preconditions for {} are not met. Mismatch:\n {}'.format(96 pbainst_str(a_inst),97 pfd_str(precon_diff, '\n ')))98 return 0.099 return 1.0100 def __add__(self, other):101 """Creates a new sequence which is the concatenation of this sequence and the other one."""102 if type(other) != PBasedActionSequence:103 raise TypeError104 return PBasedActionSequence(self.sequence + other.sequence, self.cost + other.cost)105 def append(self, action):106 """Append an action the the sequence. Returns a new sequence.."""107 return PBasedActionSequence(self.sequence + [action], self.cost + action.action_wrapper.cost)108 def push(self, action):109 """Adds an action at the front of the sequence. Returns a new sequence."""110 return PBasedActionSequence([action] + self.sequence, self.cost + action.action_wrapper.cost)111 def __str__(self):112 """Generates a string representation for this sequence."""113 return ' -> '.join([pbainst_str(a) for a in self.sequence])114class ActionManager(object):115 """Container structure for actions. The actions are associated with their pre- and postconditions."""116 def __init__(self, capabilities):117 """Constructor. Receives a list of PActionWrappers to store."""118 self.__precon_action_map = {}119 self.__postcon_action_map = {}120 for c in capabilities:121 for p in c.precons.keys():122 if p not in self.__precon_action_map:123 self.__precon_action_map[p] = set()124 self.__precon_action_map[p].add(c)125 for p in c.postcons.keys():126 if p not in self.__postcon_action_map:127 self.__postcon_action_map[p] = set()128 self.__postcon_action_map[p].add(c)129 def get_postcon_map(self):130 """Returns the mapping of all postconditions to the actions which have them."""131 return self.__postcon_action_map132 def get_precon_map(self):133 """Returns the mapping of all preconditions to the actions which have them."""134 return self.__precon_action_map135class PActionInterface(object):136 """Superclass for wrappers linking the actions to symbolic pre- and postconditions."""137 def __init__(self, action_name, precons, postcons, cost=1.0):138 """Constructor. Receives a name for the action being wrapped,139 a list of pre- and postconditions and a cost."""140 self.action_name = action_name141 self.cost = cost142 self.signature = {}143 self.precons = {}144 self.postcons = {}145 self._arg_p_map = {}146 self._arg_pre_p_map = {}147 self._arg_post_p_map = {}148 for pI in (precons + postcons):149 for x in range(len(pI.predicate.dl_args)):150 a = pI.args[x]151 if not a in self.signature:152 self.signature[a] = pI.predicate.dl_args[x]153 self._arg_p_map[a] = []154 else:155 # implication_intersection = self.signature[a].implication_intersection(pI.predicate.dl_args[x])156 # if len(implication_intersection) == 0:157 # raise Exception('Invalid action signature! Symbol {} was first defined as:\n {}\nbut is redefined as:\n {}\n in {}({}) = {}'.format(a, str(self.signature[a]), str(pI.predicate.dl_args[x]), pI.predicate.P, ', '.join(pI.args), pI.value))158 # elif len(implication_intersection) < len(self.signature[a].implied_by):159 if type(self.signature[a]) == DLConjunction:160 self.signature[a] = DLConjunction(pI.predicate.dl_args[x], *list(self.signature[a].concepts))161 else:162 self.signature[a] = DLConjunction(self.signature[a], pI.predicate.dl_args[x])163 self._arg_p_map[a].append(pI)164 for pI in precons:165 if not pI.predicate in self.precons:166 self.precons[pI.predicate] = {}167 if pI.args in self.precons[pI.predicate]:168 raise Exception('Redefinition of precondition {}({}) for action {}. '.format(pI.predicate.P, ', '.join(pI.args), action_name))169 self.precons[pI.predicate][pI.args] = pI.value170 for a in pI.args:171 if a not in self._arg_pre_p_map:172 self._arg_pre_p_map[a] = []173 self._arg_pre_p_map[a].append(pI)174 for pI in postcons:175 if not pI.predicate in self.postcons:176 self.postcons[pI.predicate] = {}177 if pI.args in self.postcons[pI.predicate]:178 raise Exception('Redefinition of postcondition {}({}) for action {}. '.format(pI.predicate.P, ', '.join(pI.args), action_name))179 self.postcons[pI.predicate][pI.args] = pI.value180 for a in pI.args:181 if a not in self._arg_post_p_map:182 self._arg_post_p_map[a] = []183 self._arg_post_p_map[a].append(pI)184 param_len = max([len(n) for n in self.signature.keys()])185 self.__str_representation = '{}:\n Parameters:\n {}\n Preconditions:\n {}\n Postconditions:\n {}'.format(self.action_name,186 '\n '.join(['{:>{:d}}: {}'.format(a, param_len, str(t)) for a, t in self.signature.items()]),187 pfd_str(self.precons, '\n '),188 pfd_str(self.postcons, '\n '))189 def parameterize_by_postcon(self, context, postcons):190 """Generates an iterator which generates all variable assignments191 for this action based on the given context and postconditions."""192 return PostconditionAssignmentIterator(context, self, postcons)193 def parameterize_by_precon(self, context, precons):194 """Generates an iterator which generates all variable assignments195 for this action based on the given context and preconditions."""196 return PreconditionAssignmentIterator(context, self, precons)197 def __str__(self):198 """String representation of this wrapper."""199 return self.__str_representation200 def __hash__(self):201 """Hash for this wrapper."""202 return hash(self.__str_representation)203 def instantiate_action(self, context, assignments):204 """Override in subclass. Takes a mapping of the variable names to objects and205 instantiates the action that is being wrapped by the class."""206 raise (NotImplementedError)207# Connects a variable assignment with an action wrapper and their resulting pre- and postconditions.208PWrapperInstance = namedtuple('PWrapperInstance', ['assignments', 'precons', 'postcons', 'action_wrapper'])209class PermutationIterator(object):210 """211 @brief Iterator generating all possible permutations for the assignment of unbound variables.212 """213 def __init__(self, iterator_dict):214 """Constructor. Receives a dict mapping the variable names to the iterators listing their possibilities."""215 self.iterators = []216 self.names = []217 for name, iterator in iterator_dict.items():218 self.iterators.append(iterator)219 self.names.append(name)220 self.iterator_cache = []221 self.it_index = len(self.iterators) - 1222 self.current_permutation = []223 for x in range(len(self.iterators) - 1):224 self.iterator_cache.append([self.iterators[x].next()])225 self.current_permutation.append(self.iterator_cache[x][0])226 self.iterator_cache.append([])227 self.current_permutation.append(None)228 self.use_cached = len(self.iterators) - 1229 def __iter__(self):230 return self231 def next(self):232 """Generates the next new permutation."""233 while True:234 try:235 next_elem = self.iterators[self.it_index].next()236 if self.use_cached < self.it_index:237 self.iterator_cache[self.it_index].append(next_elem)238 self.current_permutation[self.it_index] = next_elem239 if self.it_index+1 == len(self.iterators):240 return dict(zip(self.names, self.current_permutation))241 else:242 self.it_index += 1243 self.iterators[self.it_index] = iter(self.iterator_cache[self.it_index])244 except StopIteration:245 if self.it_index == 0:246 raise StopIteration247 else:248 self.use_cached -= 1249 self.it_index -= 1250class PostconditionAssignmentIterator(object):251 """252 @brief Iterator generating all possible variable assignments for253 an action wrapper based on its postconditions and a context.254 """255 def __init__(self, context, action_wrapper, postcon_constraints):256 """Constructor."""257 self.context = context258 self.action_wrapper = action_wrapper259 self.assignment_it = ParameterAssignmentIterator(action_wrapper._arg_post_p_map, action_wrapper.postcons, postcon_constraints)260 self.perm_iter = None261 def __iter__(self):262 return self263 def next(self):264 """Generates the next PWrapperInstance for the wrapper."""265 while True:266 if self.perm_iter == None:267 self.assignment, self.unbound = self.assignment_it.next()268 if len(self.unbound) != 0:269 ds = self.context.agent.get_data_state()270 self.perm_iter = PermutationIterator({a: ds.dl_iterator(self.action_wrapper.signature[a]) for a in self.unbound})271 continue272 else:273 try:274 self.assignment.update(self.perm_iter.next())275 except StopIteration:276 self.perm_iter = None277 continue278 out_post = {}279 out_pre = {}280 for p, args in self.action_wrapper.precons.items():281 out_pre[p] = {}282 for t, v in args.items():283 out_pre[p][tuple([self.assignment[x] for x in t])] = v284 for p, args in self.action_wrapper.postcons.items():285 out_post[p] = {}286 for t, v in args.items():287 out_post[p][tuple([self.assignment[x] for x in t])] = v288 return PWrapperInstance(assignments=self.assignment, precons=out_pre, postcons=out_post, action_wrapper=self.action_wrapper)289class PreconditionAssignmentIterator(object):290 """291 @brief Iterator generating all possible variable assignments for292 an action wrapper based on its preconditions and a context.293 """294 def __init__(self, context, action_wrapper, precon_constraints):295 self.context = context296 self.action_wrapper = action_wrapper297 self.assignment_it = ParameterAssignmentIterator(action_wrapper._arg_pre_p_map, action_wrapper.precons, precon_constraints)298 self.perm_iter = None299 def __iter__(self):300 return self301 def next(self):302 """Generates the next PWrapperInstance for the wrapper."""303 while True:304 if self.perm_iter == None:305 self.assignment, self.unbound = self.assignment_it.next()306 if len(self.unbound) == 0:307 ds = self.context.agent.get_data_state()308 self.perm_iter = PermutationIterator({a: ds.dl_iterator(self.action_wrapper.signature[a]) for a in self.unbound})309 else:310 try:311 self.assignment.update(self.perm_iter.next())312 except StopIteration:313 self.perm_iter = None314 continue315 out_post = {}316 out_pre = {}317 for p, args in self.action_wrapper.precons.items():318 out_pre[p] = {}319 for t, v in args.items():320 out_pre[p][tuple([self.assignment[x] for x in t])] = v321 for p, args in self.action_wrapper.postcons.items():322 out_post[p] = {}323 for t, v in args.items():324 out_post[p][tuple([self.assignment[x] for x in t])] = v325 return PWrapperInstance(assignments=self.assignment, precons=out_pre, postcons=out_post, action_wrapper=self.action_wrapper)326class ParameterAssignmentIterator(object):327 """328 @brief Iterator generating variable assignments satisfying a constraint set based on a clue set.329 """330 def __init__(self, param_constraint_map, constraint_map, clue_set):331 """Constructor. Receives variables to assign, constraints to satisfy and332 a set of clues, which indicates the possible values of the variables.333 """334 self.param_constraint_map = param_constraint_map335 self.constraint_map = constraint_map336 self.remaining = set()337 for p in clue_set.keys():338 if p in self.constraint_map:339 for args in self.constraint_map[p].keys():340 for a in args:341 self.remaining.add(a)342 self.free_set = set(self.param_constraint_map.keys()).difference(self.remaining)343 self.pIit = iter(self.param_constraint_map[self.remaining.pop()])344 self.clue_set = clue_set345 self.assignments = {}346 self.problem_stack = []347 self.poIter = None348 self.previous_assignments = set()349 def __iter__(self):350 return self351 def next(self):352 while True:353 try:354 if self.poIter == None:355 # print('New param-value iterator needed')356 while True:357 self.pI = self.pIit.next()358 if self.pI.predicate in self.clue_set:359 self.poIter = self.clue_set[self.pI.predicate].iteritems()360 # print('{}New param-value iterator for {} will iterate over postconditions. It should do {} steps.\n Id:{}'.format(' '*len(self.problem_stack), self.pI.predicate.P, len(self.clue_set[self.pI.predicate]), id(self.poIter)))361 # print('{}It will iterate over:\n {}{}'.format(' '*len(self.problem_stack), ' '*len(self.problem_stack), '\n {}'.format(' '*len(self.problem_stack)).join(['({}) : {}'.format(', '.join(params), value) for params, value in self.clue_set[self.pI.predicate].items()])))362 break363 if self.poIter != None:364 raise Exception('Param-value iterator should be None when this point is reached!')365 # If this predicate is contained in the post conditions366 try:367 tab_space = ' '*len(self.problem_stack)368 # print('{}Param-value iterator is about to do next. Id: {}'.format(tab_space, id(self.poIter)))369 params, value = self.poIter.next()370 if value == self.pI.value:371 # print('{}- {}({})'.format(' ' * len(self.problem_stack), self.pI.predicate.P, ', '.join(params)))372 new_remainder = self.remaining.copy()373 for x in range(len(params)):374 self.assignments[self.pI.args[x]] = params[x]375 if self.pI.args[x] in self.remaining:376 new_remainder.remove(self.pI.args[x])377 # print('{} Assigned: {}'.format(tab_space, self.pI.args[x]))378 if len(new_remainder) == 0:379 hashable_dict = tuple(sorted(self.assignments.items()))380 if not hashable_dict in self.previous_assignments:381 ok = True382 for pI, pAssignments in self.constraint_map.items():383 if pI in self.clue_set:384 for args, v in pAssignments.items():385 rArgs = tuple([self.assignments[a] for a in args])386 if rArgs in self.clue_set[pI] and self.clue_set[pI][rArgs] != v:387 ok = False388 break389 if not ok:390 break391 if ok:392 # print('New assignment checked out. Returning it.')393 self.previous_assignments.add(hashable_dict)394 return self.assignments.copy(), self.free_set395 # print('New assignment did not check out.')396 else:397 # print('New assignment is incomplete. Pushing current state to stack and trying to solve for the next parameter.')398 self.problem_stack.append((self.remaining, self.pIit, self.poIter, self.pI))399 # print('{}Old remainder: ( {} )'.format(tab_space, ', '.join(self.remaining)))400 self.remaining = new_remainder401 # print('{}New remainder: ( {} )'.format(tab_space, ', '.join(self.remaining)))402 self.pIit = iter(self.param_constraint_map[self.remaining.pop()])403 self.poIter = None404 self.pI = None405 except StopIteration:406 # print('Next of {} killed it.'.format(id(self.poIter)))407 self.poIter = None408 # print('Param-value iterator has reached the end of set')409 except StopIteration:410 if len(self.problem_stack) > 0:411 # print('Predicate iterator has reached the end of its set. Luckily there\'s still at least one on the stack.')412 self.remaining, self.pIit, self.poIter, self.pI = self.problem_stack.pop()413 else:414 # print('Predicate iterator has reached the end of its set. The stack is empty, so we are aborting.')...
behavioral_cloning.py
Source:behavioral_cloning.py
...134 def __call__(self, x):135 return MultiDimensionalSoftmaxDistribution(self.get_raw_value(x))136137138def parse_action_wrapper(action_wrapper, env, always_keys, reverse_keys,139 exclude_keys, exclude_noop, allow_pitch,140 num_camera_discretize, max_camera_range):141 if action_wrapper == 'discrete':142 return SerialDiscreteActionWrapper(143 env,144 always_keys=always_keys, reverse_keys=reverse_keys, exclude_keys=exclude_keys, exclude_noop=exclude_noop,145 num_camera_discretize=num_camera_discretize, allow_pitch=allow_pitch,146 max_camera_range=max_camera_range)147 elif action_wrapper == 'continuous':148 return NormalizedContinuousActionWrapper(149 env, allow_pitch=allow_pitch,150 max_camera_range=max_camera_range)151 elif action_wrapper == 'multi-dimensional-softmax':152 return MultiDimensionalSoftmaxActionWrapper(153 env, allow_pitch=allow_pitch,154 num_camera_discretize=num_camera_discretize,155 max_camera_range=max_camera_range)156 else:157 raise RuntimeError('Unsupported action wrapper name: {}'.format(action_wrapper))158159160def main():161 parser = argparse.ArgumentParser()162 parser.add_argument('--env', type=str, default='MineRLTreechop-v0',163 choices=[164 'MineRLTreechop-v0',165 'MineRLNavigate-v0', 'MineRLNavigateDense-v0', 'MineRLNavigateExtreme-v0', 'MineRLNavigateExtremeDense-v0',166 'MineRLObtainIronPickaxe-v0', 'MineRLObtainDiamond-v0',167 'MineRLObtainIronPickaxeDense-v0', 'MineRLObtainDiamondDense-v0',168 'MineRLNavigateDenseFixed-v0' # for debug use169 ],170 help='MineRL environment identifier.')171 parser.add_argument('--outdir', type=str, default='results',172 help='Directory path to save output files. If it does not exist, it will be created.')173 parser.add_argument('--expert', type=str, required=True,174 help='Path storing expert trajectories.')175 parser.add_argument('--seed', type=int, default=0, help='Random seed [0, 2 ** 31)')176 parser.add_argument('--gpu', type=int, default=0, help='GPU to use, set to -1 if no GPU.')177 parser.add_argument('--demo', action='store_true', default=False)178 parser.add_argument('--load', type=str, default=None)179 parser.add_argument('--eval-n-runs', type=int, default=3)180 parser.add_argument('--logging-level', type=int, default=20, help='Logging level. 10:DEBUG, 20:INFO etc.')181 parser.add_argument('--gray-scale', action='store_true', default=False, help='Convert pov into gray scaled image.')182 parser.add_argument('--monitor', action='store_true', default=False,183 help='Monitor env. Videos and additional information are saved as output files when evaluation.')184 parser.add_argument('--lr', type=float, default=2.5e-4, help='Learning rate.')185 parser.add_argument('--frame-stack', type=int, default=None, help='Number of frames stacked (None for disable).')186 parser.add_argument('--frame-skip', type=int, default=None, help='Number of frames skipped (None for disable).')187 parser.add_argument('--always-keys', type=str, default=None, nargs='*',188 help='List of action keys, which should be always pressed throughout interaction with environment.')189 parser.add_argument('--reverse-keys', type=str, default=None, nargs='*',190 help='List of action keys, which should be always pressed but can be turn off via action.')191 parser.add_argument('--exclude-keys', type=str, default=None, nargs='*',192 help='List of action keys, which should be ignored for discretizing action space.')193 parser.add_argument('--exclude-noop', action='store_true', default=False, help='The "noop" will be excluded from discrete action list.')194 parser.add_argument('--action-wrapper', type=str, default='discrete',195 choices=['discrete', 'continuous', 'multi-dimensional-softmax'])196 parser.add_argument('--activation-function', type=str, default='tanh',197 choices=['sigmoid', 'tanh', 'relu', 'leaky-relu'])198 parser.add_argument('--prioritized-elements', type=str, nargs='+', default=None,199 help='define priority of each element on discrete setting')200 # parser.add_argument('--write-video', type=str, default=None)201 parser.add_argument('--entropy-coef', type=float, default=0)202 parser.add_argument('--allow-pitch', action='store_true', default=False,203 help='Always set camera pitch as 0 in agent action.')204 parser.add_argument('--max-camera-range', type=float, default=10.,205 help='Maximum value of camera angle change in one frame')206 parser.add_argument('--num-camera-discretize', type=int, default=7,207 help='Number of actions to discretize pitch/yaw respectively')208 parser.add_argument('--training-dataset-ratio', type=float, default=0.7,209 help='ratio of training dataset on behavioral cloning between (0, 1)')210 args = parser.parse_args()211212 args.outdir = chainerrl.experiments.prepare_output_dir(args, args.outdir)213214 import logging215 log_format = '%(levelname)-8s - %(asctime)s - [%(name)s %(funcName)s %(lineno)d] %(message)s'216 logging.basicConfig(filename=os.path.join(args.outdir, 'log.txt'), format=log_format, level=args.logging_level)217 console_handler = logging.StreamHandler()218 console_handler.setLevel(args.logging_level)219 console_handler.setFormatter(logging.Formatter(log_format))220 logging.getLogger('').addHandler(console_handler) # add hander to the root logger221222 logger.info('Output files are saved in {}'.format(args.outdir))223224 try:225 _main(args)226 except: # noqa227 logger.exception('execution failed.')228 raise229230231def _main(args):232 logger.info('The first `gym.make(MineRL*)` may take several minutes. Be patient!')233234 os.environ['MALMO_MINECRAFT_OUTPUT_LOGDIR'] = args.outdir235236 # Set a random seed used in ChainerRL.237 chainerrl.misc.set_random_seed(args.seed)238239 # Set different random seeds for train and test envs.240 train_seed = args.seed # noqa: never used in this script241 test_seed = 2 ** 31 - 1 - args.seed242243 def wrap_env(env, test):244 # wrap env: observation...245 # NOTE: wrapping order matters!246 if test and args.monitor:247 env = gym.wrappers.Monitor(248 env, args.outdir, mode='evaluation' if test else 'training', video_callable=lambda episode_id: True)249 if args.gray_scale:250 env = GrayScaleWrapper(env, dict_space_key='pov')251 if args.frame_skip is not None:252 env = FrameSkip(env, skip=args.frame_skip)253 if args.env.startswith('MineRLObtain'):254 env = UnifiedObservationWrapper(env)255 elif args.env.startswith('MineRLNavigate'):256 env = PoVWithCompassAngleWrapper(env)257 else:258 env = ObtainPoVWrapper(env)259 env = MoveAxisWrapper(env, source=-1, destination=0) # convert hwc -> chw as Chainer requires.260 env = ScaledFloatFrame(env)261 if args.frame_stack is not None:262 env = FrameStack(env, args.frame_stack, channel_order='chw')263264 # wrap env: action...265 env = parse_action_wrapper(266 args.action_wrapper,267 env,268 always_keys=args.always_keys, reverse_keys=args.reverse_keys,269 exclude_keys=args.exclude_keys, exclude_noop=args.exclude_noop,270 allow_pitch=args.allow_pitch,271 num_camera_discretize=args.num_camera_discretize,272 max_camera_range=args.max_camera_range)273274 env_seed = test_seed if test else train_seed275 env.seed(int(env_seed)) # TODO: not supported yet276 return env277278 core_env = gym.make(args.env)279 env = wrap_env(core_env, test=False)
...
AzureFunctionService.py
Source:AzureFunctionService.py
...141 self.__interceptors = []142 def _apply_validation(self, schema: Schema, action: Callable[[func.HttpRequest], func.HttpResponse]) -> Callable[143 [func.HttpRequest], func.HttpResponse]:144 # Create an action function145 def action_wrapper(context: func.HttpRequest) -> func.HttpResponse:146 # Validate object147 if schema and context:148 # Perform validation149 params = {'body': {} if not context.get_body() else context.get_json()}150 params.update(context.route_params)151 params.update(context.params)152 correlation_id = self._get_correlation_id(context)153 err = schema.validate_and_return_exception(correlation_id, params, False)154 if err is not None:155 return func.HttpResponse(156 body=json.dumps(err.to_json()),157 status_code=err.status158 )159 result = action(context)160 return result161 return action_wrapper162 def _apply_interceptors(self, action: Callable[[func.HttpRequest], Any]) -> Callable[[func.HttpRequest], Any]:163 action_wrapper = action164 index = len(self.__interceptors) - 1165 while index >= 0:166 interceptor = self.__interceptors[index]167 action_wrapper = lambda _action: lambda params: interceptor(params, _action)(action_wrapper)168 index -= 1169 return action_wrapper170 def _generate_action_cmd(self, name: str) -> str:171 cmd = name172 if self.__name:173 cmd = self.__name + '.' + cmd174 return cmd175 def _register_action(self, name: str, schema: Schema, action: Callable[[func.HttpRequest], func.HttpResponse]):176 """177 Registers a action in Azure Function function.178 :param name: an action name179 :param schema: a validation schema to validate received parameters.180 :param action: an action function that is called when operation is invoked.181 """182 action_wrapper = self._apply_validation(schema, action)183 action_wrapper = self._apply_interceptors(action_wrapper)184 register_action: AzureFunctionAction = AzureFunctionAction(self._generate_action_cmd(name), schema,185 lambda params: action_wrapper(params))186 self.__actions.append(register_action)187 def _register_action_with_auth(self, name: str, schema: Schema,188 authorize: Callable[[func.HttpRequest, Callable[[func.HttpRequest], Any]], Any],189 action: Callable[[func.HttpRequest], func.HttpResponse]):190 """191 Registers an action with authorization.192 :param name: an action name193 :param schema: a validation schema to validate received parameters.194 :param authorize: an authorization interceptor195 :param action: an action function that is called when operation is invoked.196 """197 action_wrapper = self._apply_validation(schema, action)198 # Add authorization just before validation199 action_wrapper = lambda call: authorize(call, action_wrapper)200 action_wrapper = self._apply_interceptors(action_wrapper)201 register_action: AzureFunctionAction = AzureFunctionAction(self._generate_action_cmd(name), schema,202 lambda params: action_wrapper(params))203 self.__actions.append(register_action)204 def _register_interceptor(self, action: Callable[[func.HttpRequest, Callable[[func.HttpRequest], Any]], Any]):205 """206 Registers a middleware for actions in AWS Lambda service.207 :param action: an action function that is called when middleware is invoked.208 """209 self.__interceptors.append(action)210 @abstractmethod211 def register(self):212 """213 Registers all service routes in HTTP endpoint.214 This method is called by the service and must be overridden215 in child classes.216 """...
project.py
Source:project.py
1import inspect2import re3from abc import ABC4from django.core.exceptions import ImproperlyConfigured5from django_meta.api import ApiFieldWrapper, ExistingApiFieldWrapper, Methods, UrlPatternWrapper, \6 ExistingUrlPatternWrapper, ApiActionWrapper7from django_meta.model import ModelFieldWrapper, ModelWrapper, PermissionWrapper, ExistingPermissionWrapper8from nlp.lookout.base import Lookout9from nlp.lookout.token import RestActionLookout10from nlp.similarity import CosineSimilarity, ContainsSimilarity, LevenshteinSimilarity11from settings import Settings12class DjangoProjectLookout(Lookout, ABC):13 """14 This lookout is specialized to find stuff in Django projects.15 """16 similarity_benchmark = 0.5917 def prepare_keywords(self, keywords):18 return set([k.replace('_', ' ') if k else None for k in keywords])19 def should_stop_looking_for_output(self):20 """Always look through all outputs"""21 return False22 def get_compare_variations(self, output_object, keyword):23 variations = []24 if not self.text or not keyword:25 return []26 translated_keyword = self.translator_to_src.translate(keyword)27 # use the basic version if only one word is passed28 doc_input = self.doc_src_language29 if len(self.doc_src_language) == 1:30 doc_input = self.nlp_src_language(self.doc_src_language[0].lemma_)31 # create documents for english and source language to get the similarity32 # en - keyword33 variations.append((self.doc_en, self.nlp_en(keyword)))34 # src - keyword35 variations.append((doc_input, self.nlp_src_language(keyword)))36 # src - keyword translated to src37 variations.append((doc_input, self.nlp_src_language(translated_keyword)))38 return variations39 def get_similarity(self, input_doc, target_doc):40 """Returns the similarity between two docs/ tokens in a range from 0 - 1."""41 cos_similarity = CosineSimilarity(input_doc, target_doc).get_similarity()42 contains_similarity = ContainsSimilarity(input_doc, target_doc).get_similarity()43 # if cos is very sure, just use it44 if cos_similarity > 0.8:45 return cos_similarity46 if contains_similarity == 1:47 return contains_similarity48 cos_weight = 0.549 levenshtein_weight = 0.350 contains_weight = 0.251 levenshtein_similarity = LevenshteinSimilarity(input_doc, target_doc).get_similarity()52 total_similarity = (53 cos_similarity * cos_weight54 ) + (55 levenshtein_similarity * levenshtein_weight56 ) + (57 contains_similarity * contains_weight58 )59 return total_similarity60class ClassArgumentLookout(DjangoProjectLookout):61 """62 This lookout searches tokens that could represent the argument/ parameter of an __init__ of a given class.63 """64 def get_fallback(self):65 return None66 def get_keywords(self, parameter_name):67 return [parameter_name]68 def get_output_objects(self, cls, exclude_parameters=None, *args, **kwargs):69 signature = inspect.signature(cls.__init__)70 parameters = dict(signature.parameters)71 # remove self as it is not passed to init from outside72 del parameters['self']73 if exclude_parameters is None:74 exclude_parameters = []75 # remove any parameters that are marked as excluded76 for exclude_parameter in exclude_parameters:77 try:78 del parameters[exclude_parameter]79 except KeyError:80 pass81 # only use the names for now82 return [param.name for param in parameters.values()]83class ModelFieldLookout(DjangoProjectLookout):84 def get_fallback(self):85 return ModelFieldWrapper(name=self.translator_to_en.translate(self.text))86 def get_keywords(self, field):87 return [field.name, getattr(field, 'verbose_name', None)]88 def get_output_objects(self, model_wrapper, *args, **kwargs):89 return model_wrapper.fields90class SerializerFieldLookout(DjangoProjectLookout):91 def get_fallback(self):92 return ApiFieldWrapper(name=self.translator_to_en.translate(self.text))93 def get_keywords(self, field):94 return [field.source]95 def get_output_objects(self, serializer, *args, **kwargs):96 if not serializer:97 return []98 return [ExistingApiFieldWrapper(api_field=field) for field in serializer.fields.fields.values()]99class ModelLookout(DjangoProjectLookout):100 def get_fallback(self):101 return ModelWrapper(name=self.translator_to_en.translate(self.text))102 def get_keywords(self, model):103 return [model.name, model.verbose_name, model.verbose_name_plural]104 def get_output_objects(self, project_wrapper, *args, **kwargs):105 return project_wrapper.get_models(as_wrapper=True, include_django=True)106class PermissionLookout(DjangoProjectLookout):107 """Can search for specific permissions in Django."""108 def __init__(self, text, src_language, locate_on_init=False, *args, **kwargs):109 self._raw_text = text110 if self.text_input_is_permission_code:111 text = text.split('.')[1].replace('_', ' ')112 super().__init__(text, src_language, locate_on_init, *args, **kwargs)113 self._model_token = None114 self._find_model_token()115 self._model_wrapper = None116 @property117 def text_input_is_permission_code(self):118 """Checks if the provided text is in the format <model>.<name>."""119 re_permission_string = re.compile(r'(\w)+\.(\w)+$')120 return re_permission_string.match(self._raw_text)121 def _find_model_token(self):122 """Tries to find the model token in the text that was provided."""123 if self.text_input_is_permission_code:124 # if the text is in the permission format (<model>.<name>) take the first part and use nlp125 self._model_token = self.nlp_src_language(self._raw_text.split('.')[0])[0]126 else:127 # else search for a noun128 for token in self.doc_src_language:129 if token.pos_ == 'NOUN':130 self._model_token = token131 break132 def get_fallback(self):133 """Fallback is a permission wrapper class."""134 return PermissionWrapper(self.translator_to_en.translate(self.text), self.model_wrapper)135 @property136 def model_wrapper(self):137 """Search for a model wrapper with the model token."""138 if not self._model_token:139 return None140 if self._model_wrapper is None:141 model_lookout = ModelLookout(str(self._model_token.lemma_), self.src_language)142 self._model_wrapper = model_lookout.locate(Settings.django_project_wrapper)143 return self._model_wrapper144 def get_keywords(self, permission):145 return [permission.codename, permission.name]146 def get_output_objects(self, *args, **kwargs):147 """148 By default we want to filter out any permission objects that do not fit the model. If the model does not exist149 in the code, we should just go to the fallback.150 """151 try:152 from django.contrib.auth.models import Permission153 from django.contrib.contenttypes.models import ContentType154 except ImproperlyConfigured:155 return []156 if not self._model_token or self.model_wrapper.exists_in_code is False:157 qs = Permission.objects.none()158 else:159 content_type = ContentType.objects.get_for_model(self.model_wrapper.model)160 qs = Permission.objects.filter(content_type=content_type)161 return [ExistingPermissionWrapper(p) for p in qs]162class ApiActionLookout(DjangoProjectLookout):163 """164 This lookout will find urls in the django project.165 """166 def __init__(self, text, language, model_wrapper, valid_methods):167 super().__init__(text, language)168 self.model_wrapper = model_wrapper169 if Methods.PUT in valid_methods:170 valid_methods = valid_methods.copy()171 valid_methods.append(Methods.PATCH)172 self.valid_methods = [method for method in valid_methods if method]173 def get_fallback(self):174 url_wrapper = UrlPatternWrapper(model_wrapper=self.model_wrapper)175 return ApiActionWrapper(176 url_pattern_wrapper=url_wrapper,177 method=self.valid_methods[0] if self.valid_methods else Methods.GET,178 fn_name=self.translator_to_en.translate(str(self.text)),179 url_name='detail',180 )181 def go_to_next_output(self, similarity):182 return similarity > 0.9183 def is_new_fittest_output_object(self, similarity, action_wrapper, input_doc, output_doc):184 """185 For different urls there might be multiple endpoints with the same methods. To identify which one is meant,186 the reverse name and url name is used to determine which endpoint is better suited.187 """188 fittest_action = self.fittest_output_object189 if not fittest_action or similarity != self.highest_similarity or fittest_action == action_wrapper:190 return super().is_new_fittest_output_object(similarity, action_wrapper, input_doc, output_doc)191 best_similarity = 0192 conversion_default_route = action_wrapper.url_name in ['detail', 'list']193 fittest_conversion_default_route = fittest_action.url_name in ['detail', 'list']194 best_action = fittest_action195 # compare the two actions more detailed196 for action in [fittest_action, action_wrapper]:197 reverse_name = action.url_pattern_wrapper.reverse_name198 reverse_url_name_translated = self.translator_to_src.translate(action.url_name)199 reverse_name_translated = self.translator_to_src.translate(reverse_name)200 for check in [reverse_name_translated, reverse_url_name_translated]:201 exact_similarity = self.get_similarity(input_doc, self.nlp_src_language(check))202 if exact_similarity > best_similarity:203 best_action = action204 best_similarity = exact_similarity205 # sometimes we are not provided with the reverse url name but only the method; if there are default206 # api routes by an ApiView, use them instead207 if best_similarity < 0.4:208 if conversion_default_route and not fittest_conversion_default_route:209 best_action = action_wrapper210 elif not conversion_default_route and fittest_conversion_default_route:211 best_action = fittest_action212 return best_action == action_wrapper213 def get_keywords(self, action_wrapper):214 keywords = [action_wrapper.url_name, action_wrapper.fn_name]215 if action_wrapper.method == Methods.GET:216 keywords += RestActionLookout.GET_KEYWORDS217 if action_wrapper.method == Methods.POST:218 keywords += RestActionLookout.CREATE_KEYWORDS219 if action_wrapper.method == Methods.PUT or action_wrapper.method == Methods.PATCH:220 keywords += RestActionLookout.UPDATE_KEYWORDS221 if action_wrapper.method == Methods.DELETE:222 keywords += RestActionLookout.DELETE_KEYWORDS223 return keywords224 @property225 def only_selected_methods(self):226 return bool(self.valid_methods)227 def get_output_objects(self, django_project, *args, **kwargs):228 results = []229 for pattern in django_project.urls:230 wrapper = ExistingUrlPatternWrapper(pattern)231 if not wrapper.is_represented_by_view_set:232 continue233 # if the url does not have a valid method, skip it234 # if there are no valid methods provided, allow all235 if self.only_selected_methods and not any([m in self.valid_methods for m in wrapper.methods]):236 continue237 # get all actions for this model238 actions = wrapper.get_all_actions_for_model_wrapper(self.model_wrapper)239 for action in actions:240 if action not in results and (not self.only_selected_methods or action.method in self.valid_methods):241 results.append(action)...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!