Best Python code snippet using behave
runner.py
Source:runner.py
...653 # -- Allow steps to import other stuff from the steps dir654 # NOTE: Default matcher can be overridden in "environment.py" hook.655 steps_dir = os.path.join(self.base_dir, self.config.steps_dir)656 step_paths = [steps_dir] + list(extra_step_paths)657 load_step_modules(step_paths)658 def feature_locations(self):659 return collect_feature_locations(self.config.paths)660 def run(self):661 with self.path_manager:662 self.setup_paths()663 return self.run_with_paths()664 def run_with_paths(self):665 self.context = Context(self)666 self.load_hooks()667 self.load_step_definitions()668 # -- ENSURE: context.execute_steps() works in weird cases (hooks, ...)669 # self.setup_capture()670 # self.run_hook("before_all", self.context)671 # -- STEP: Parse all feature files (by using their file location)....
runner_mp.py
Source:runner_mp.py
1# -*- coding: UTF-8 -*-2"""3This module provides multiprocessing Runner class.4"""5import six6import os7import multiprocessing8from behave.formatter._registry import make_formatters9from behave.runner import Runner, Context10from behave.model import Feature, Scenario, ScenarioOutline, NoMatch11from behave.runner_util import parse_features, load_step_modules12from behave.step_registry import registry as the_step_registry13if six.PY2:14 import Queue as queue15else:16 import queue17class MultiProcRunner(Runner):18 """Master multiprocessing runner: scans jobs and distributes to slaves19 This runner should not do any "processing" tasks, apart from scanning20 the feature files and their scenarios. It then spawns processing nodes21 and lets them consume the queue of tasks scheduled.22 """23 def __init__(self, config):24 super(MultiProcRunner, self).__init__(config)25 self.jobs_map = {}26 self.jobsq = multiprocessing.JoinableQueue()27 self.resultsq = multiprocessing.Queue()28 self._reported_features = set()29 self.results_fail = False30 def run_with_paths(self):31 feature_locations = [filename for filename in self.feature_locations()32 if not self.config.exclude(filename)]33 self.load_hooks() # hooks themselves not used, but 'environment.py' loaded34 # step definitions are needed here for formatters only35 self.load_step_definitions()36 features = parse_features(feature_locations, language=self.config.lang)37 self.features.extend(features)38 feature_count, scenario_count = self.scan_features()39 njobs = len(self.jobs_map)40 proc_count = int(self.config.proc_count)41 print ("INFO: {0} scenario(s) and {1} feature(s) queued for"42 " consideration by {2} workers. Some may be skipped if the"43 " -t option was given..."44 .format(scenario_count, feature_count, proc_count))45 procs = []46 old_outs = self.config.outputs47 self.config.outputs = []48 old_reporters = self.config.reporters49 self.config.reporters = []50 for i in range(proc_count):51 client = MultiProcClientRunner(self, i)52 p = multiprocessing.Process(target=client.run)53 procs.append(p)54 p.start()55 del p56 print ("INFO: started {0} workers for {1} jobs.".format(proc_count, njobs))57 self.config.reporters = old_reporters58 self.formatters = make_formatters(self.config, old_outs)59 self.config.outputs = old_outs60 while (not self.jobsq.empty()):61 # 1: consume while tests are running62 self.consume_results()63 if not any([p.is_alive() for p in procs]):64 break65 if any([p.is_alive() for p in procs]):66 self.jobsq.join() # wait for all jobs to be processed67 print ("INFO: all jobs have been processed")68 while self.consume_results(timeout=0.1):69 # 2: remaining results70 pass71 # then, wait for all workers to exit:72 [p.join() for p in procs]73 print ("INFO: all sub-processes have returned")74 while self.consume_results(timeout=0.1):75 # 3: just in case some arrive late in the pipe76 pass77 for f in self.features:78 # make sure all features (including ones that have not returned)79 # are printed80 self._output_feature(f)81 for formatter in self.formatters:82 formatter.close()83 for reporter in self.config.reporters:84 reporter.end()85 return self.results_fail86 def scan_features(self):87 raise NotImplementedError88 def consume_results(self, timeout=1):89 try:90 job_id, result = self.resultsq.get(timeout=timeout)91 except queue.Empty:92 return False93 if job_id is None and result == 'set_fail':94 self.results_fail = True95 return True96 item = self.jobs_map.get(job_id)97 if item is None:98 print("ERROR: job_id=%x not found in master map" % job_id)99 return True100 try:101 item.recv_status(result)102 if isinstance(item, Feature):103 self._output_feature(item)104 elif isinstance(item, Scenario):105 feature = item.feature106 if feature.is_finished:107 self._output_feature(feature)108 else:109 print("INFO: scenario finished: %x" % (job_id,))110 except Exception as e:111 print("ERROR: cannot receive status for %r: %s" % (item, e))112 if self.config.wip and not self.config.quiet:113 import traceback114 traceback.print_exc()115 return True116 def _output_feature(self, feature):117 if id(feature) in self._reported_features:118 return119 self._reported_features.add(id(feature))120 for formatter in self.formatters:121 formatter.uri(feature.filename)122 formatter.feature(feature)123 if feature.background:124 formatter.background(feature.background)125 for scenario in feature.scenarios:126 formatter.scenario(scenario)127 for step in scenario.steps:128 formatter.step(step)129 for step in scenario.steps:130 match = the_step_registry.find_match(step)131 if match:132 formatter.match(match)133 else:134 formatter.match(NoMatch())135 formatter.result(step)136 formatter.eof()137 for reporter in self.config.reporters:138 reporter.feature(feature)139class MultiProcRunner_Feature(MultiProcRunner):140 def scan_features(self):141 for feature in self.features:142 self.jobs_map[id(feature)] = feature143 self.jobsq.put(id(feature))144 for scen in feature.scenarios:145 scen.background_steps146 if isinstance(scen, ScenarioOutline):147 # compute the sub-scenarios before serializing148 for subscen in scen.scenarios:149 subscen.background_steps150 return len(self.jobs_map), 0151class MultiProcRunner_Scenario(MultiProcRunner):152 def scan_features(self):153 nfeat = nscens = 0154 def put(sth):155 idf = id(sth)156 self.jobs_map[idf] = sth157 self.jobsq.put(idf)158 for feature in self.features:159 if 'serial' in feature.tags:160 put(feature)161 nfeat += 1162 for scen in feature.scenarios:163 scen.background_steps164 if isinstance(scen, ScenarioOutline):165 # compute the sub-scenarios before serializing166 for subscen in scen.scenarios:167 subscen.background_steps168 continue169 for scenario in feature.scenarios:170 scenario.background_steps # compute them, before sending out171 if scenario.type == 'scenario':172 put(scenario)173 nscens += 1174 else:175 for subscenario in scenario.scenarios:176 subscenario.background_steps177 put(subscenario)178 nscens += 1179 return nfeat, nscens180class MultiProcClientRunner(Runner):181 """Multiprocessing Client runner: picks "jobs" from parent queue182 Each client is tagged with a `num` to appear in outputs etc.183 """184 def __init__(self, parent, num):185 super(MultiProcClientRunner, self).__init__(parent.config)186 self.num = num187 self.jobs_map = parent.jobs_map188 self.jobsq = parent.jobsq189 self.resultsq = parent.resultsq190 def iter_queue(self):191 """Iterator fetching features from the queue192 Note that this iterator is lazy and multiprocess-affected:193 it cannot know its set of features in advance, will dynamically194 yield ones as found in the queue195 """196 while True:197 try:198 job_id = self.jobsq.get(timeout=0.5)199 except queue.Empty:200 break201 job = self.jobs_map.get(job_id, None)202 if job is None:203 print("ERROR: missing job id=%s from map" % job_id)204 self.jobsq.task_done()205 continue206 if isinstance(job, Feature):207 yield job208 try:209 self.resultsq.put((job_id, job.send_status()))210 except Exception as e:211 print("ERROR: cannot send result: {0}".format(e))212 elif isinstance(job, Scenario):213 # construct a dummy feature, having only this scenario214 kwargs = {}215 for k in ('filename', 'line', 'keyword', 'name', 'tags',216 'description', 'background', 'language'):217 kwargs[k] = getattr(job.feature, k)218 kwargs['scenarios'] = [job]219 orig_parser = job.feature.parser220 feature = Feature(**kwargs)221 feature.parser = orig_parser222 yield feature223 try:224 self.resultsq.put((job_id, job.send_status()))225 except Exception as e:226 print("ERROR: cannot send result: {0}".format(e))227 else:228 raise TypeError("Don't know how to process: %s" % type(job))229 self.jobsq.task_done()230 def run_with_paths(self):231 self.context = Context(self)232 self.load_hooks()233 self.load_step_definitions()234 assert not self.aborted235 failed = self.run_model(features=self.iter_queue())236 if failed:237 self.resultsq.put((None, 'set_fail'))238 self.resultsq.close()...
event_engine.py
Source:event_engine.py
...116 )117class XOSEventEngine(object):118 """ XOSEventEngine119 Subscribe to and handle processing of events. Two methods are defined:120 load_step_modules(dir) ... look for step modules in the given directory, and load objects that are121 descendant from EventStep.122 start() ... Launch threads to handle processing of the EventSteps. It's expected that load_step_modules()123 will be called before start().124 """125 def __init__(self, model_accessor, log):126 self.event_steps = []127 self.threads = []128 self.model_accessor = model_accessor129 self.log = log130 def load_event_step_modules(self, event_step_dir):131 self.event_steps = []132 self.log.info("Loading event steps", event_step_dir=event_step_dir)133 # NOTE we'll load all the classes that inherit from EventStep134 for fn in os.listdir(event_step_dir):135 pathname = os.path.join(event_step_dir, fn)136 if (...
pull_step_engine.py
Source:pull_step_engine.py
...50 """ XOSPullStepEngine51 Load pull step modules. Two methods are defined:52 load_pull_step_modules(dir) ... look for step modules in the given directory, and load objects that are53 descendant from PullStep.54 start() ... Launch threads to handle processing of the PullSteps. It's expected that load_step_modules()55 will be called before start().56 """57 def __init__(self, model_accessor):58 self.model_accessor = model_accessor59 self.pull_steps = []60 def load_pull_step_modules(self, pull_step_dir):61 self.pull_steps = []62 log.info("Loading pull steps", pull_step_dir=pull_step_dir)63 # NOTE we'll load all the classes that inherit from PullStep64 for fn in os.listdir(pull_step_dir):65 pathname = os.path.join(pull_step_dir, fn)66 if (67 os.path.isfile(pathname)68 and fn.endswith(".py")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!