Best Python code snippet using lisa_python
test_type_schema_loader.py
Source:test_type_schema_loader.py
...43def test_load_type_schema_from_json(loader):44 with patch.object(45 loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json46 ) as mock_load_json:47 type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_JSON)48 assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)49 mock_load_json.assert_called_with(TEST_TARGET_SCHEMA_JSON, None)50def test_load_type_schema_from_invalid_json(loader):51 with patch.object(52 loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json53 ) as mock_load_json:54 type_schema = loader.load_type_schema(55 '{"Credentials" :{"ApiKey": "123", xxxx}}'56 )57 assert not type_schema58 mock_load_json.assert_called_with('{"Credentials" :{"ApiKey": "123", xxxx}}', None)59def test_load_type_schema_from_invalid_json_fallback_to_default(loader):60 with patch.object(61 loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json62 ) as mock_load_json:63 type_schema = loader.load_type_schema(64 '{"Credentials" :{"ApiKey": "123", xxxx}}',65 OTHER_TEST_TARGET_FALLBACK_SCHEMA,66 )67 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)68 mock_load_json.assert_called_with(69 '{"Credentials" :{"ApiKey": "123", xxxx}}', OTHER_TEST_TARGET_FALLBACK_SCHEMA70 )71def test_load_type_schema_from_json_array(loader):72 with patch.object(73 loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json74 ) as mock_load_json:75 type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_JSON_ARRAY)76 assert [TEST_TARGET_SCHEMA] == type_schema77 mock_load_json.assert_called_with(TEST_TARGET_SCHEMA_JSON_ARRAY, None)78def test_load_type_schema_from_invalid_json_array(loader):79 with patch.object(80 loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json81 ) as mock_load_json:82 type_schema = loader.load_type_schema('[{"Credentials" :{"ApiKey": "123"}}]]')83 assert not type_schema84 mock_load_json.assert_called_with('[{"Credentials" :{"ApiKey": "123"}}]]', None)85def test_load_type_schema_from_invalid_json_array_fallback_to_default(loader):86 with patch.object(87 loader, "load_type_schema_from_json", wraps=loader.load_type_schema_from_json88 ) as mock_load_json:89 type_schema = loader.load_type_schema(90 '[{"Credentials" :{"ApiKey": "123"}}]]',91 OTHER_TEST_TARGET_FALLBACK_SCHEMA,92 )93 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)94 mock_load_json.assert_called_with(95 '[{"Credentials" :{"ApiKey": "123"}}]]', OTHER_TEST_TARGET_FALLBACK_SCHEMA96 )97def test_load_type_schema_from_file(loader):98 patch_file = patch("builtins.open", mock_open(read_data=TEST_TARGET_SCHEMA_JSON))99 patch_path_is_file = patch(100 "rpdk.core.type_schema_loader.os.path.isfile", return_value=True101 )102 patch_load_file = patch.object(103 loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file104 )105 with patch_file as mock_file, patch_path_is_file as mock_path_is_file, patch_load_file as mock_load_file:106 type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_FILE_PATH)107 assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)108 mock_path_is_file.assert_any_call(TEST_TARGET_SCHEMA_FILE_PATH)109 mock_load_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, None)110 mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")111def test_load_type_schema_from_file_file_not_found(loader):112 patch_file = patch("builtins.open", mock_open())113 patch_path_is_file = patch(114 "rpdk.core.type_schema_loader.os.path.isfile", return_value=True115 )116 patch_load_file = patch.object(117 loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file118 )119 with patch_file as mock_file, patch_path_is_file as mock_path_is_file, patch_load_file as mock_load_file:120 mock_file.side_effect = FileNotFoundError()121 type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_FILE_PATH)122 assert not type_schema123 mock_path_is_file.assert_any_call(TEST_TARGET_SCHEMA_FILE_PATH)124 mock_load_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, None)125 mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")126def test_load_type_schema_from_file_error_fallback_to_default(loader):127 patch_file = patch("builtins.open", mock_open())128 patch_path_is_file = patch(129 "rpdk.core.type_schema_loader.os.path.isfile", return_value=True130 )131 patch_load_file = patch.object(132 loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file133 )134 with patch_file as mock_file, patch_path_is_file as mock_path_is_file, patch_load_file as mock_load_file:135 mock_file.side_effect = FileNotFoundError()136 type_schema = loader.load_type_schema(137 TEST_TARGET_SCHEMA_FILE_PATH, OTHER_TEST_TARGET_FALLBACK_SCHEMA138 )139 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)140 mock_path_is_file.assert_any_call(TEST_TARGET_SCHEMA_FILE_PATH)141 mock_load_file.assert_called_with(142 TEST_TARGET_SCHEMA_FILE_PATH, OTHER_TEST_TARGET_FALLBACK_SCHEMA143 )144 mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")145def test_load_type_schema_from_file_uri(loader):146 patch_file = patch("builtins.open", mock_open(read_data=TEST_TARGET_SCHEMA_JSON))147 patch_load_from_uri = patch.object(148 loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri149 )150 patch_load_file = patch.object(151 loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file152 )153 with patch_file as mock_file, patch_load_from_uri as mock_load_from_uri, patch_load_file as mock_load_file:154 type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_FILE_URI)155 assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)156 mock_load_from_uri.assert_called_with(TEST_TARGET_SCHEMA_FILE_URI, None)157 mock_load_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, None)158 mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")159def test_load_type_schema_from_file_uri_file_not_found(loader):160 patch_file = patch("builtins.open", mock_open())161 patch_load_from_uri = patch.object(162 loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri163 )164 patch_load_file = patch.object(165 loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file166 )167 with patch_file as mock_file, patch_load_from_uri as mock_load_from_uri, patch_load_file as mock_load_file:168 mock_file.side_effect = FileNotFoundError()169 type_schema = loader.load_type_schema(TEST_TARGET_SCHEMA_FILE_URI)170 assert not type_schema171 mock_load_from_uri.assert_called_with(TEST_TARGET_SCHEMA_FILE_URI, None)172 mock_load_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, None)173 mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")174def test_load_type_schema_from_file_uri_error_fallback_to_default(loader):175 patch_file = patch("builtins.open", mock_open())176 patch_load_from_uri = patch.object(177 loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri178 )179 patch_load_file = patch.object(180 loader, "load_type_schema_from_file", wraps=loader.load_type_schema_from_file181 )182 with patch_file as mock_file, patch_load_from_uri as mock_load_from_uri, patch_load_file as mock_load_file:183 mock_file.side_effect = FileNotFoundError()184 type_schema = loader.load_type_schema(185 TEST_TARGET_SCHEMA_FILE_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA186 )187 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)188 mock_load_from_uri.assert_called_with(189 TEST_TARGET_SCHEMA_FILE_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA190 )191 mock_load_file.assert_called_with(192 TEST_TARGET_SCHEMA_FILE_PATH, OTHER_TEST_TARGET_FALLBACK_SCHEMA193 )194 mock_file.assert_called_with(TEST_TARGET_SCHEMA_FILE_PATH, "r")195def test_load_type_schema_from_https_url(loader):196 mock_request = Mock()197 mock_request.status_code = 200198 mock_request.content = TEST_TARGET_SCHEMA_JSON.encode("utf-8")199 patch_get_request = patch(200 "rpdk.core.type_schema_loader.requests.get", return_value=mock_request201 )202 patch_load_from_uri = patch.object(203 loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri204 )205 patch_get_from_url = patch.object(206 loader, "_get_type_schema_from_url", wraps=loader._get_type_schema_from_url207 )208 with patch_get_request as mock_get_request, patch_load_from_uri as mock_load_from_uri, patch_get_from_url as mock_get_from_url:209 type_schema = loader.load_type_schema(TEST_HTTPS_TARGET_SCHEMA_URI)210 assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)211 mock_load_from_uri.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, None)212 mock_get_from_url.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, None)213 mock_get_request.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, timeout=60)214def test_load_type_schema_from_https_url_unsuccessful(loader):215 mock_request = Mock()216 mock_request.status_code = 404217 patch_get_request = patch(218 "rpdk.core.type_schema_loader.requests.get", return_value=mock_request219 )220 patch_load_from_uri = patch.object(221 loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri222 )223 patch_get_from_url = patch.object(224 loader, "_get_type_schema_from_url", wraps=loader._get_type_schema_from_url225 )226 with patch_get_request as mock_get_request, patch_load_from_uri as mock_load_from_uri, patch_get_from_url as mock_get_from_url:227 type_schema = loader.load_type_schema(TEST_HTTPS_TARGET_SCHEMA_URI)228 assert not type_schema229 mock_load_from_uri.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, None)230 mock_get_from_url.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, None)231 mock_get_request.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, timeout=60)232def test_load_type_schema_from_https_url_unsuccessful_fallback_to_default(loader):233 mock_request = Mock()234 mock_request.status_code = 404235 patch_get_request = patch(236 "rpdk.core.type_schema_loader.requests.get", return_value=mock_request237 )238 patch_load_from_uri = patch.object(239 loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri240 )241 patch_get_from_url = patch.object(242 loader, "_get_type_schema_from_url", wraps=loader._get_type_schema_from_url243 )244 with patch_get_request as mock_get_request, patch_load_from_uri as mock_load_from_uri, patch_get_from_url as mock_get_from_url:245 type_schema = loader.load_type_schema(246 TEST_HTTPS_TARGET_SCHEMA_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA247 )248 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)249 mock_load_from_uri.assert_called_with(250 TEST_HTTPS_TARGET_SCHEMA_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA251 )252 mock_get_from_url.assert_called_with(253 TEST_HTTPS_TARGET_SCHEMA_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA254 )255 mock_get_request.assert_called_with(TEST_HTTPS_TARGET_SCHEMA_URI, timeout=60)256def test_load_type_schema_from_s3(loader):257 loader.s3_client.get_object.return_value = {258 "Body": BytesIO(TEST_TARGET_SCHEMA_JSON.encode("utf-8"))259 }260 patch_load_from_uri = patch.object(261 loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri262 )263 patch_get_from_s3 = patch.object(264 loader, "_get_type_schema_from_s3", wraps=loader._get_type_schema_from_s3265 )266 with patch_load_from_uri as mock_load_from_uri, patch_get_from_s3 as mock_get_from_s3:267 type_schema = loader.load_type_schema(TEST_S3_TARGET_SCHEMA_URI)268 assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)269 mock_load_from_uri.assert_called_with(TEST_S3_TARGET_SCHEMA_URI, None)270 mock_get_from_s3.assert_called_with(271 TEST_TARGET_SCHEMA_BUCKET, TEST_TARGET_SCHEMA_KEY, None272 )273 loader.s3_client.get_object.assert_called_once_with(274 Bucket=TEST_TARGET_SCHEMA_BUCKET, Key=TEST_TARGET_SCHEMA_KEY275 )276def test_load_type_schema_from_s3_client_error(loader):277 loader.s3_client.get_object.side_effect = ClientError(278 {"Error": {"Code": "", "Message": "Bucket does not exist"}},279 "get_object",280 )281 patch_load_from_uri = patch.object(282 loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri283 )284 patch_get_from_s3 = patch.object(285 loader, "_get_type_schema_from_s3", wraps=loader._get_type_schema_from_s3286 )287 with patch_load_from_uri as mock_load_from_uri, patch_get_from_s3 as mock_get_from_s3:288 type_schema = loader.load_type_schema(TEST_S3_TARGET_SCHEMA_URI)289 assert not type_schema290 mock_load_from_uri.assert_called_with(TEST_S3_TARGET_SCHEMA_URI, None)291 mock_get_from_s3.assert_called_with(292 TEST_TARGET_SCHEMA_BUCKET, TEST_TARGET_SCHEMA_KEY, None293 )294 loader.s3_client.get_object.assert_called_once_with(295 Bucket=TEST_TARGET_SCHEMA_BUCKET, Key=TEST_TARGET_SCHEMA_KEY296 )297def test_load_type_schema_from_s3_error_fallback_to_default(loader):298 loader.s3_client.get_object.side_effect = ClientError(299 {"Error": {"Code": "", "Message": "Bucket does not exist"}},300 "get_object",301 )302 patch_load_from_uri = patch.object(303 loader, "load_type_schema_from_uri", wraps=loader.load_type_schema_from_uri304 )305 patch_get_from_s3 = patch.object(306 loader, "_get_type_schema_from_s3", wraps=loader._get_type_schema_from_s3307 )308 with patch_load_from_uri as mock_load_from_uri, patch_get_from_s3 as mock_get_from_s3:309 type_schema = loader.load_type_schema(310 TEST_S3_TARGET_SCHEMA_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA311 )312 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)313 mock_load_from_uri.assert_called_with(314 TEST_S3_TARGET_SCHEMA_URI, OTHER_TEST_TARGET_FALLBACK_SCHEMA315 )316 mock_get_from_s3.assert_called_with(317 TEST_TARGET_SCHEMA_BUCKET,318 TEST_TARGET_SCHEMA_KEY,319 OTHER_TEST_TARGET_FALLBACK_SCHEMA,320 )321 loader.s3_client.get_object.assert_called_once_with(322 Bucket=TEST_TARGET_SCHEMA_BUCKET, Key=TEST_TARGET_SCHEMA_KEY323 )324def test_load_type_schema_from_cfn_registry(loader):325 loader.cfn_client.describe_type.return_value = {326 "Schema": TEST_TARGET_SCHEMA_JSON,327 "Type": "RESOURCE",328 "ProvisioningType": "FULLY_MUTABLE",329 }330 type_schema, target_type, provisioning_type = loader.load_schema_from_cfn_registry(331 TEST_TARGET_TYPE_NAME, "RESOURCE"332 )333 assert_dict_equals(TEST_TARGET_SCHEMA, type_schema)334 assert target_type == "RESOURCE"335 assert provisioning_type == "FULLY_MUTABLE"336 loader.cfn_client.describe_type.assert_called_once_with(337 Type="RESOURCE", TypeName=TEST_TARGET_TYPE_NAME338 )339def test_load_type_schema_from_cfn_registry_client_error(loader):340 loader.cfn_client.describe_type.side_effect = ClientError(341 {"Error": {"Code": "", "Message": "Type does not exist"}},342 "get_object",343 )344 type_schema, target_type, provisioning_type = loader.load_schema_from_cfn_registry(345 TEST_TARGET_TYPE_NAME, "RESOURCE"346 )347 assert not type_schema348 assert not target_type349 assert not provisioning_type350 loader.cfn_client.describe_type.assert_called_once_with(351 Type="RESOURCE", TypeName=TEST_TARGET_TYPE_NAME352 )353def test_load_type_schema_from_cfn_registry_error_fallback_to_default(loader):354 loader.cfn_client.describe_type.side_effect = ClientError(355 {"Error": {"Code": "", "Message": "Type does not exist"}},356 "get_object",357 )358 type_schema, target_type, provisioning_type = loader.load_schema_from_cfn_registry(359 TEST_TARGET_TYPE_NAME, "RESOURCE", OTHER_TEST_TARGET_FALLBACK_SCHEMA360 )361 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)362 assert not target_type363 assert not provisioning_type364 loader.cfn_client.describe_type.assert_called_once_with(365 Type="RESOURCE", TypeName=TEST_TARGET_TYPE_NAME366 )367def test_get_provision_type(loader):368 loader.cfn_client.describe_type.return_value = {369 "Schema": TEST_TARGET_SCHEMA_JSON,370 "Type": "RESOURCE",371 "ProvisioningType": "IMMUTABLE",372 }373 provisioning_type = loader.get_provision_type(TEST_TARGET_TYPE_NAME, "RESOURCE")374 assert provisioning_type == "IMMUTABLE"375 loader.cfn_client.describe_type.assert_called_once_with(376 Type="RESOURCE", TypeName=TEST_TARGET_TYPE_NAME377 )378def test_get_provision_type_client_error(loader):379 loader.cfn_client.describe_type.side_effect = ClientError(380 {"Error": {"Code": "", "Message": "Type does not exist"}},381 "get_object",382 )383 provisioning_type = loader.get_provision_type(TEST_TARGET_TYPE_NAME, "RESOURCE")384 assert not provisioning_type385 loader.cfn_client.describe_type.assert_called_once_with(386 Type="RESOURCE", TypeName=TEST_TARGET_TYPE_NAME387 )388def test_load_type_schema_null_input(loader):389 type_schema = loader.load_type_schema(None, OTHER_TEST_TARGET_FALLBACK_SCHEMA)390 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)391 type_schema = loader.load_type_schema_from_json(392 None, OTHER_TEST_TARGET_FALLBACK_SCHEMA393 )394 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)395 type_schema = loader.load_type_schema_from_uri(396 None, OTHER_TEST_TARGET_FALLBACK_SCHEMA397 )398 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)399 type_schema = loader.load_type_schema_from_file(400 None, OTHER_TEST_TARGET_FALLBACK_SCHEMA401 )402 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)403def test_load_type_schema_invalid_input(loader):404 type_schema = loader.load_type_schema(405 "This is invalid input", OTHER_TEST_TARGET_FALLBACK_SCHEMA406 )407 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)408 with patch(409 "rpdk.core.type_schema_loader.is_valid_type_schema_uri", return_value=True410 ):411 type_schema = loader.load_type_schema_from_uri(412 "ftp://unsupportedurlschema.com/test-schema.json",413 OTHER_TEST_TARGET_FALLBACK_SCHEMA,414 )415 assert_dict_equals(OTHER_TEST_TARGET_FALLBACK_SCHEMA, type_schema)416@pytest.mark.parametrize(417 "uri",418 [...
action.py
Source:action.py
1# Copyright 2019 Ben Kehoe2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14name_pattern = r"^\w+$"15action_response_schema = {16 "type": "object",17 "properties": {18 "json": {19 "type": "object"20 },21 "html": {22 "type": "string"23 },24 "text": {25 "type": "string"26 },27 "redirect": {28 "type": "string",29 "format": "uri"30 }31 }32}33def _get_action_schema(34 type_schema,35 properties={},36 required=[],37 schema={}):38 ts = {39 "type": "string"40 }41 ts.update(type_schema)42 props = {43 "name": {44 "type": "string",45 "pattern": name_pattern46 },47 "type": ts,48 "response": action_response_schema49 }50 props.update(properties)51 req = ["name", "type"]52 req.extend(required)53 s = {54 "type": "object",55 "properties": props,56 "required": req57 }58 s.update(schema)59 return s60def _get_post_outcome_schema(61 type_schema,62 properties={},63 required=[],64 schema={}):65 p = {66 "schema": {67 "type": "object"68 }69 }70 p.update(properties)71 req = ["schema"]72 req.extend(required)73 return _get_action_schema(74 type_schema,75 properties=p,76 required=req,77 schema=schema78 )79post_outcome_success_schema = _get_post_outcome_schema(80 type_schema={"const": "success"},81 schema={82 "oneOf": [83 {84 "properties": {85 "output_body": {86 "const": True,87 }88 },89 "required": ["output_body"]90 },91 {92 "properties": {93 "output_path": {94 "type": "string"95 }96 },97 "required": ["output_path"]98 },99 {100 "properties": {101 "output": {}102 },103 "required": ["output"]104 }105 ]106 }107)108post_outcome_failure_schema = _get_post_outcome_schema(109 type_schema={"const": "failure"},110 properties={111 "error": {112 "type": "string"113 },114 "error_path": {115 "type": "string"116 },117 "cause": {118 "type": "string"119 },120 "cause_path": {121 "type": "string"122 }123 },124 schema={125 "allOf": [126 {127 "not": {128 "required": ["error", "error_path"]129 }130 },131 {132 "not": {133 "required": ["cause", "cause_path"]134 }135 }136 ]137 }138)139post_outcome_heartbeat_schema = _get_post_outcome_schema(140 type_schema={"const": "heartbeat"}141)142post_outcome_schema = {143 "oneOf": [144 post_outcome_success_schema,145 post_outcome_failure_schema,146 post_outcome_heartbeat_schema147 ]148}149post_action_schema = _get_action_schema(150 type_schema={"const": "post"},151 properties={152 "outcomes": {153 "type": "array",154 "items": post_outcome_schema,155 "minItems": 1156 }157 }158)159success_action_schema = _get_action_schema(160 type_schema={"const": "success"},161 properties={162 "output": {}163 },164 required=["output"]165)166failure_action_schema = _get_action_schema(167 type_schema={"const": "failure"},168 properties={169 "error": {170 "type": "string"171 },172 "cause": {173 "type": "string"174 }175 },176)177heartbeat_action_schema = _get_action_schema(178 type_schema={"const": "heartbeat"}179)180action_schema = {181 "oneOf": [182 success_action_schema,183 failure_action_schema,184 heartbeat_action_schema,185 post_action_schema186 ]...
transform.py
Source:transform.py
1class InvalidData(Exception):2 """Raise when data doesn't validate the schema"""3def transform_row(row, schema):4 return _transform_field(row, schema)5def _transform_datetime(value):6 return value + "Z"7def _any_of(data, schema_list):8 for schema in schema_list:9 try:10 return _transform_field(data, schema)11 except Exception:12 pass13 raise InvalidData("{} doesn't match any of {}".format(data, schema_list))14def _array(data, items_schema):15 return [_transform_field(value, items_schema) for value in data]16def _object(data, properties_schema):17 return {field: _transform_field(data[field], field_schema)18 for field, field_schema in properties_schema.items()19 if field in data}20def _type_transform(value, type_schema):21 if isinstance(type_schema, list):22 for typ in type_schema:23 try:24 return _type_transform(value, typ)25 except Exception:26 pass27 raise InvalidData("{} doesn't match any of {}".format(value, type_schema))28 if value is None:29 if type_schema != "null":30 raise InvalidData("Null is not allowed")31 else:32 return None33 if type_schema == "string":34 return str(value)35 if type_schema == "integer":36 return int(value)37 if type_schema == "number":38 return float(value)39 if type_schema == "boolean":40 return bool(value)41 raise InvalidData("Unknown type {}".format(type_schema))42def _format_transform(value, format_schema):43 if format_schema == "date-time":44 return _transform_datetime(value)45 raise InvalidData("Unknown format {}".format(format_schema))46def _transform_field(value, field_schema):47 if "anyOf" in field_schema:48 return _any_of(value, field_schema["anyOf"])49 if field_schema["type"] == "array":50 return _array(value, field_schema["items"])51 if field_schema["type"] == "object":52 return _object(value, field_schema["properties"])53 value = _type_transform(value, field_schema["type"])54 if "format" in field_schema:55 value = _format_transform(value, field_schema["format"])...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!