Best Python code snippet using grail_python
environment.py
Source:environment.py
1# pylint: disable=missing-docstring,unused-argument2import copy3import logging4import os5import pickle6import uuid7import re8import redis9from features.steps import oauth, utils10from testsuite import fhir11from testsuite.config_reader import get_vendor_config, get_env_config12from testsuite.oauth import authorize, factory13CACHE_TTL = 60 * 60 # 1 hour14FHIR_RESOURCE_TAGS = {15 'patient-demographics',16 'smoking-status',17 'problems',18 'medication-orders',19 'medication-requests',20 'medication-statements',21 'medication-dispensations',22 'medication-administrations',23 'allergies-and-intolerances',24 'lab-results',25 'vital-signs',26 'procedures',27 'immunizations',28 'patient-documents',29 'coverage',30 'explanation-of-benefit'31}32def before_all(context):33 """ Runs once before all tests.34 Set up some global state necessary to the tests and test runner.35 * Get the vendor config and attach it to the context.36 * Authorize against the vendor FHIR server and store the authorization.37 * Get the test plan so that we can show a progress meter.38 * Load the conformance statement so we know which resources are supported.39 """40 # Get the vendor config and attach it to the context.41 vendor = getattr(context.config, 'vendor', os.getenv('VENDOR'))42 override = getattr(context.config, 'override', os.getenv('CONFIG_OVERRIDE', ''))43 vendor_config = get_vendor_config(vendor, override)44 # Get configuration from the test server's environment.45 env_config = get_env_config()46 # Attempt to retrieve the security URL for this version.47 vendor_config['versioned_auth']['aud'] = vendor_config['versioned_api']['url']48 context.vendor_config = copy.deepcopy(vendor_config)49 context.env_config = copy.deepcopy(env_config)50 # Filter out any tagged vendor config steps51 steps = vendor_config['versioned_auth'].get('steps', [])52 steps = [step for step in steps if 'when' not in step]53 vendor_config['versioned_auth']['steps'] = steps54 # Set the ElasticSearch logging endpoint55 context.config.es_url = os.getenv('ES_URL')56 # Authorize against the vendor FHIR server.57 try:58 context.oauth = factory(vendor_config)59 context.oauth.authorize()60 if getattr(context.oauth, 'patient', None) is not None:61 context.vendor_config['versioned_api']['patient'] = context.oauth.patient62 except AssertionError as error:63 logging.error(utils.bad_response_assert(error.args[0], ''))64 raise Exception(utils.bad_response_assert(error.args[0], ''))65 except authorize.AuthorizationException as err:66 error = oauth.ERROR_SELENIUM_SCREENSHOT.format(67 err.args[0],68 err.args[1],69 err.args[2],70 context.vendor_config['host'],71 )72 raise Exception(error)73 except ValueError as error:74 logging.error(utils.bad_response_assert(error.response, ''))75 raise Exception(utils.bad_response_assert(error.response, ''))76 # Get the test plan so that we can show a progress meter.77 context.config.plan = []78 # There is no other way to get a feature list from the context.79 # Since this is for display purposes only, this should be safe.80 features = context._runner.features # pylint: disable=protected-access81 for feature in features:82 scenariolist = []83 context.config.plan.append({84 'name': feature.name,85 'location': str(feature.location),86 'scenarios': scenariolist})87 for scenario in feature.scenarios:88 scenariolist.append({89 'name': scenario.name,90 'location': str(scenario.location)})91 # Download the conformance statement92 try:93 context.conformance = fhir.get_conformance_statement(vendor_config['versioned_auth']['aud'])94 except ValueError as error:95 context.conformance = None96 logging.error(utils.bad_response_assert(error.response, ''))97 # Define a global cache98 context.cache = Cache(redis.StrictRedis())99def before_feature(context, feature):100 """ Configure Feature scope.101 Some features need feature-level resources so that we don't need to102 make a bunch of API requests and slow things down.103 """104 # We handle the with_use_case tag with this custom functionality.105 # Extract which use case this feature is part of and determine106 # if we need to run it. Store use_case in the feature object for use107 # by child scenarios.108 feature.use_case = None109 skip_use_case = True110 for tag in feature.tags:111 use_case_matches = re.match("use.with_use_case=(.*)", tag)112 version_matches = re.match("use.with_version=(.*)", tag)113 if use_case_matches and feature.use_case is None:114 use_case = use_case_matches.groups()[0]115 if use_case in context.vendor_config["use_cases"]:116 feature.use_case = use_case117 skip_use_case = False118 if version_matches and feature.use_case:119 version = version_matches.groups()[0]120 if version != context.vendor_config["use_cases"][feature.use_case]:121 feature.skip("Feature version (%s) not supported in this use case (%s)."122 % (version, use_case))123 if skip_use_case:124 feature.skip("Feature (%s) not in any use case." % feature.name)125 return126 try:127 ignored_steps = context.vendor_config["ignored_steps"][feature.location.filename]128 for step in ignored_steps:129 if step == "all":130 feature.skip("Feature (%s) requested skip by vendor." % feature.name)131 except KeyError:132 pass133 tags = list(FHIR_RESOURCE_TAGS.intersection(feature.tags))134 if len(tags) > 1:135 raise Exception('Too many CCDS tags', tags)136 if len(tags) == 1:137 ccds_type = tags[0].capitalize().replace('-', ' ')138 steps = [139 'Given I am logged in',140 'And this server supports {0}'.format(ccds_type),141 'When I request {0}'.format(ccds_type),142 ]143 try:144 context.execute_steps('\n'.join(steps))145 except AssertionError as error:146 feature.skip(error.args[0])147def before_scenario(context, scenario):148 # The skipping logic here only applies when a use case has been defined on this feature.149 if scenario.feature.use_case:150 use_case_version = context.vendor_config["use_cases"][scenario.feature.use_case]151 # Skip the scenario if its version doesn't much the use_case version.152 for tag in scenario.effective_tags:153 matches = re.match("use.with_version=(.*)", tag)154 if matches and matches.groups()[0] != use_case_version:155 scenario.skip("Scenario's version (%s) not in Use Case (%s)."156 % (matches.groups()[0], scenario.feature.use_case))157def before_step(context, step):158 context.vendor_skip = False159 try:160 ignored_steps = context.vendor_config["ignored_steps"][step.location.filename]161 for ignored_step in ignored_steps:162 if step.name == ignored_step:163 context.vendor_skip = True164 break165 except KeyError:166 pass167class Cache(object):168 """ A minimal caching layer.169 """170 def __init__(self, redis_client):171 self.redis_client = redis_client172 # A unique prefix ensures that each test run does not share a cache.173 self.prefix = 'cache-{0}-'.format(uuid.uuid4())174 def __getitem__(self, key):175 key = self.prefix + key176 return pickle.loads(self.redis_client.get(key))177 def __setitem__(self, key, value):178 key = self.prefix + key179 self.redis_client.setex(key,180 CACHE_TTL,181 pickle.dumps(value))182 def __contains__(self, key):183 key = self.prefix + key184 return self.redis_client.exists(key)185 def clear(self):186 pattern = self.prefix + '*'187 keys = self.redis_client.keys(pattern)188 if keys:189 deleted = self.redis_client.delete(*keys)...
test_steps.py
Source:test_steps.py
...35 @step36 def fail_step(self):37 ok_(False, 'Explicit error')38 @step39 def ignored_step(self):40 pass41 @step(step_group=True)42 def pass_fail_ignore_group(self):43 self.step1()44 self.fail_step()45 self.ignored_step()46 @step(step_group=True)47 def pass_pending_ignore_group(self):48 self.step1()49 self.pending_step()50 self.ignored_step()51 @step52 def incorrect_step_call(self):53 self.step1()54 @step(treat_nested_steps_as_methods=True)55 def explicit_step_call_skip(self):56 self.step1()57 @step(step_group=True, treat_nested_steps_as_methods=True)58 def incorrect_step_group(self):59 self.step1()60 self.step2()61 def setUp(self):62 grail.state.is_test_wrapped = True63 def tearDown(self):64 grail.state.reset()...
modifications.py
Source:modifications.py
1"""2Modification mixins extending the traditional chaos game.3"""4import abc5import random6import functools7class Modification(abc.ABC):8 """9 The base class for modification mixins.10 Subclasses should override the initialization method11 to implement customization of the modification.12 """13 # The game which this modification modifies.14 # The game instance sets this attribute for each modification.15 game = None16class VertexesModification(Modification):17 """18 The base class for mixins which modify the vertexes.19 """20 @abc.abstractmethod21 def get_vertexes(self):22 pass23class NextVertexModification(Modification):24 """25 The base class for mixins which modify the next vertex.26 """27 @abc.abstractmethod28 def get_next_vertex_index(self):29 pass30class IgnorePreviousVertexesModification(NextVertexModification):31 """32 A modification which ignores vertexes selected some number of steps ago.33 """34 # The steps ago of the ignored vertexes, represented as negative integers.35 # For example, if the list has negative one, the last vertex is ignored.36 ignored_steps: list[int] = []37 def __init__(self, ignored_steps: list[int]):38 """39 Customize the modification.40 """41 self.ignored_steps = ignored_steps42 def get_next_vertex_index(self) -> int:43 """44 Choose from the vertexes at random, but ignore some previous vertexes.45 """46 vertex_indexes = list(range(len(self.game.get_vertexes())))47 assert len(self.ignored_steps) < len(vertex_indexes), (48 "The number of potential ignored vertexes"49 " must be less than the total number of vertexes."50 )51 for ignored_step in self.ignored_steps:52 if -len(self.game.selected_vertex_indexes) <= ignored_step < 0:53 ignored_vertex_index = self.game.selected_vertex_indexes[ignored_step]54 while ignored_vertex_index in vertex_indexes:55 vertex_indexes.remove(ignored_vertex_index)56 return random.choice(vertex_indexes)57class IgnoreShiftedVertexesModification(NextVertexModification):58 """59 A modification which ignores vertexes shifted from the current vertex.60 """61 # The shifts of the ignored vertexes, represented62 # as positive (counterclockwise) or negative (clockwise) integers.63 ignored_shifts: list[int] = []64 def __init__(self, ignored_shifts: list[int]):65 """66 Customize the modification.67 """68 self.ignored_shifts = ignored_shifts69 def get_next_vertex_index(self) -> int:70 """71 Choose from the vertexes at random, but ignore some shifted vertexes.72 """73 current_vertex_index = self.game.selected_vertex_indexes[-1]74 vertex_count = len(self.game.get_vertexes())75 vertex_indexes = list(range(vertex_count))76 assert len(self.ignored_shifts) < len(vertex_indexes), (77 "The number of ignored vertexes"78 " must be less than the total number of vertexes."79 )80 for ignored_shift in self.ignored_shifts:81 ignored_vertex_index = current_vertex_index + ignored_shift82 ignored_vertex_index %= vertex_count83 while ignored_vertex_index in vertex_indexes:84 vertex_indexes.remove(ignored_vertex_index)85 return random.choice(vertex_indexes)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!