How to use _parse_xml_string_to_dom method in localstack

Best Python code snippet using localstack_python

parsers.py

Source:parsers.py Github

copy

Full Screen

...395 xml_dict[key] = [xml_dict[key], item]396 else:397 xml_dict[key] = item398 return xml_dict399 def _parse_xml_string_to_dom(self, xml_string):400 try:401 parser = ETree.XMLParser(402 target=ETree.TreeBuilder(),403 encoding=self.DEFAULT_ENCODING)404 parser.feed(xml_string)405 root = parser.close()406 except XMLParseError as e:407 raise ResponseParserError(408 "Unable to parse response (%s), "409 "invalid XML received. Further retries may succeed:\n%s" %410 (e, xml_string))411 return root412 def _replace_nodes(self, parsed):413 for key, value in parsed.items():414 if list(value):415 sub_dict = self._build_name_to_xml_node(value)416 parsed[key] = self._replace_nodes(sub_dict)417 else:418 parsed[key] = value.text419 return parsed420 @_text_content421 def _handle_boolean(self, shape, text):422 if text == 'true':423 return True424 else:425 return False426 @_text_content427 def _handle_float(self, shape, text):428 return float(text)429 @_text_content430 def _handle_timestamp(self, shape, text):431 return self._timestamp_parser(text)432 @_text_content433 def _handle_integer(self, shape, text):434 return int(text)435 @_text_content436 def _handle_string(self, shape, text):437 return text438 @_text_content439 def _handle_blob(self, shape, text):440 return self._blob_parser(text)441 _handle_character = _handle_string442 _handle_double = _handle_float443 _handle_long = _handle_integer444class QueryParser(BaseXMLResponseParser):445 def _do_error_parse(self, response, shape):446 xml_contents = response['body']447 root = self._parse_xml_string_to_dom(xml_contents)448 parsed = self._build_name_to_xml_node(root)449 self._replace_nodes(parsed)450 # Once we've converted xml->dict, we need to make one or two451 # more adjustments to extract nested errors and to be consistent452 # with ResponseMetadata for non-error responses:453 # 1. {"Errors": {"Error": {...}}} -> {"Error": {...}}454 # 2. {"RequestId": "id"} -> {"ResponseMetadata": {"RequestId": "id"}}455 if 'Errors' in parsed:456 parsed.update(parsed.pop('Errors'))457 if 'RequestId' in parsed:458 parsed['ResponseMetadata'] = {'RequestId': parsed.pop('RequestId')}459 return parsed460 def _do_modeled_error_parse(self, response, shape):461 return self._parse_body_as_xml(response, shape, inject_metadata=False)462 def _do_parse(self, response, shape):463 return self._parse_body_as_xml(response, shape, inject_metadata=True)464 def _parse_body_as_xml(self, response, shape, inject_metadata=True):465 xml_contents = response['body']466 root = self._parse_xml_string_to_dom(xml_contents)467 parsed = {}468 if shape is not None:469 start = root470 if 'resultWrapper' in shape.serialization:471 start = self._find_result_wrapped_shape(472 shape.serialization['resultWrapper'],473 root)474 parsed = self._parse_shape(shape, start)475 if inject_metadata:476 self._inject_response_metadata(root, parsed)477 return parsed478 def _find_result_wrapped_shape(self, element_name, xml_root_node):479 mapping = self._build_name_to_xml_node(xml_root_node)480 return mapping[element_name]481 def _inject_response_metadata(self, node, inject_into):482 mapping = self._build_name_to_xml_node(node)483 child_node = mapping.get('ResponseMetadata')484 if child_node is not None:485 sub_mapping = self._build_name_to_xml_node(child_node)486 for key, value in sub_mapping.items():487 sub_mapping[key] = value.text488 inject_into['ResponseMetadata'] = sub_mapping489class EC2QueryParser(QueryParser):490 def _inject_response_metadata(self, node, inject_into):491 mapping = self._build_name_to_xml_node(node)492 child_node = mapping.get('requestId')493 if child_node is not None:494 inject_into['ResponseMetadata'] = {'RequestId': child_node.text}495 def _do_error_parse(self, response, shape):496 # EC2 errors look like:497 # <Response>498 # <Errors>499 # <Error>500 # <Code>InvalidInstanceID.Malformed</Code>501 # <Message>Invalid id: "1343124"</Message>502 # </Error>503 # </Errors>504 # <RequestID>12345</RequestID>505 # </Response>506 # This is different from QueryParser in that it's RequestID,507 # not RequestId508 original = super(EC2QueryParser, self)._do_error_parse(response, shape)509 if 'RequestID' in original:510 original['ResponseMetadata'] = {511 'RequestId': original.pop('RequestID')512 }513 return original514 def _get_error_root(self, original_root):515 for child in original_root:516 if self._node_tag(child) == 'Errors':517 for errors_child in child:518 if self._node_tag(errors_child) == 'Error':519 return errors_child520 return original_root521class BaseJSONParser(ResponseParser):522 def _handle_structure(self, shape, value):523 final_parsed = {}524 if shape.is_document_type:525 final_parsed = value526 else:527 member_shapes = shape.members528 if value is None:529 # If the comes across the wire as "null" (None in python),530 # we should be returning this unchanged, instead of as an531 # empty dict.532 return None533 final_parsed = {}534 if self._has_unknown_tagged_union_member(shape, value):535 tag = self._get_first_key(value)536 return self._handle_unknown_tagged_union_member(tag)537 for member_name in member_shapes:538 member_shape = member_shapes[member_name]539 json_name = member_shape.serialization.get('name', member_name)540 raw_value = value.get(json_name)541 if raw_value is not None:542 final_parsed[member_name] = self._parse_shape(543 member_shapes[member_name],544 raw_value)545 return final_parsed546 def _handle_map(self, shape, value):547 parsed = {}548 key_shape = shape.key549 value_shape = shape.value550 for key, value in value.items():551 actual_key = self._parse_shape(key_shape, key)552 actual_value = self._parse_shape(value_shape, value)553 parsed[actual_key] = actual_value554 return parsed555 def _handle_blob(self, shape, value):556 return self._blob_parser(value)557 def _handle_timestamp(self, shape, value):558 return self._timestamp_parser(value)559 def _do_error_parse(self, response, shape):560 body = self._parse_body_as_json(response['body'])561 error = {"Error": {"Message": '', "Code": ''}, "ResponseMetadata": {}}562 # Error responses can have slightly different structures for json.563 # The basic structure is:564 #565 # {"__type":"ConnectClientException",566 # "message":"The error message."}567 # The error message can either come in the 'message' or 'Message' key568 # so we need to check for both.569 error['Error']['Message'] = body.get('message',570 body.get('Message', ''))571 # if the message did not contain an error code572 # include the response status code573 response_code = response.get('status_code')574 code = body.get('__type', response_code and str(response_code))575 if code is not None:576 # code has a couple forms as well:577 # * "com.aws.dynamodb.vAPI#ProvisionedThroughputExceededException"578 # * "ResourceNotFoundException"579 if '#' in code:580 code = code.rsplit('#', 1)[1]581 error['Error']['Code'] = code582 self._inject_response_metadata(error, response['headers'])583 return error584 def _inject_response_metadata(self, parsed, headers):585 if 'x-amzn-requestid' in headers:586 parsed.setdefault('ResponseMetadata', {})['RequestId'] = (587 headers['x-amzn-requestid'])588 def _parse_body_as_json(self, body_contents):589 if not body_contents:590 return {}591 body = body_contents.decode(self.DEFAULT_ENCODING)592 try:593 original_parsed = json.loads(body)594 return original_parsed595 except ValueError:596 # if the body cannot be parsed, include597 # the literal string as the message598 return {'message': body}599class BaseEventStreamParser(ResponseParser):600 def _do_parse(self, response, shape):601 final_parsed = {}602 if shape.serialization.get('eventstream'):603 event_type = response['headers'].get(':event-type')604 event_shape = shape.members.get(event_type)605 if event_shape:606 final_parsed[event_type] = self._do_parse(response, event_shape)607 else:608 self._parse_non_payload_attrs(response, shape,609 shape.members, final_parsed)610 self._parse_payload(response, shape, shape.members, final_parsed)611 return final_parsed612 def _do_error_parse(self, response, shape):613 exception_type = response['headers'].get(':exception-type')614 exception_shape = shape.members.get(exception_type)615 if exception_shape is not None:616 original_parsed = self._initial_body_parse(response['body'])617 body = self._parse_shape(exception_shape, original_parsed)618 error = {619 'Error': {620 'Code': exception_type,621 'Message': body.get('Message', body.get('message', ''))622 }623 }624 else:625 error = {626 'Error': {627 'Code': response['headers'].get(':error-code', ''),628 'Message': response['headers'].get(':error-message', ''),629 }630 }631 return error632 def _parse_payload(self, response, shape, member_shapes, final_parsed):633 if shape.serialization.get('event'):634 for name in member_shapes:635 member_shape = member_shapes[name]636 if member_shape.serialization.get('eventpayload'):637 body = response['body']638 if member_shape.type_name == 'blob':639 parsed_body = body640 elif member_shape.type_name == 'string':641 parsed_body = body.decode(self.DEFAULT_ENCODING)642 else:643 raw_parse = self._initial_body_parse(body)644 parsed_body = self._parse_shape(member_shape, raw_parse)645 final_parsed[name] = parsed_body646 return647 # If we didn't find an explicit payload, use the current shape648 original_parsed = self._initial_body_parse(response['body'])649 body_parsed = self._parse_shape(shape, original_parsed)650 final_parsed.update(body_parsed)651 def _parse_non_payload_attrs(self, response, shape,652 member_shapes, final_parsed):653 headers = response['headers']654 for name in member_shapes:655 member_shape = member_shapes[name]656 if member_shape.serialization.get('eventheader'):657 if name in headers:658 value = headers[name]659 if member_shape.type_name == 'timestamp':660 # Event stream timestamps are an in milleseconds so we661 # divide by 1000 to convert to seconds.662 value = self._timestamp_parser(value / 1000.0)663 final_parsed[name] = value664 def _initial_body_parse(self, body_contents):665 # This method should do the initial xml/json parsing of the666 # body. We we still need to walk the parsed body in order667 # to convert types, but this method will do the first round668 # of parsing.669 raise NotImplementedError("_initial_body_parse")670class EventStreamJSONParser(BaseEventStreamParser, BaseJSONParser):671 def _initial_body_parse(self, body_contents):672 return self._parse_body_as_json(body_contents)673class EventStreamXMLParser(BaseEventStreamParser, BaseXMLResponseParser):674 def _initial_body_parse(self, xml_string):675 if not xml_string:676 return ETree.Element('')677 return self._parse_xml_string_to_dom(xml_string)678class JSONParser(BaseJSONParser):679 EVENT_STREAM_PARSER_CLS = EventStreamJSONParser680 """Response parser for the "json" protocol."""681 def _do_parse(self, response, shape):682 parsed = {}683 if shape is not None:684 event_name = shape.event_stream_name685 if event_name:686 parsed = self._handle_event_stream(response, shape, event_name)687 else:688 parsed = self._handle_json_body(response['body'], shape)689 self._inject_response_metadata(parsed, response['headers'])690 return parsed691 def _do_modeled_error_parse(self, response, shape):692 return self._handle_json_body(response['body'], shape)693 def _handle_event_stream(self, response, shape, event_name):694 event_stream_shape = shape.members[event_name]695 event_stream = self._create_event_stream(response, event_stream_shape)696 try:697 event = event_stream.get_initial_response()698 except NoInitialResponseError:699 error_msg = 'First event was not of type initial-response'700 raise ResponseParserError(error_msg)701 parsed = self._handle_json_body(event.payload, shape)702 parsed[event_name] = event_stream703 return parsed704 def _handle_json_body(self, raw_body, shape):705 # The json.loads() gives us the primitive JSON types,706 # but we need to traverse the parsed JSON data to convert707 # to richer types (blobs, timestamps, etc.708 parsed_json = self._parse_body_as_json(raw_body)709 return self._parse_shape(shape, parsed_json)710class BaseRestParser(ResponseParser):711 def _do_parse(self, response, shape):712 final_parsed = {}713 final_parsed['ResponseMetadata'] = self._populate_response_metadata(714 response)715 self._add_modeled_parse(response, shape, final_parsed)716 return final_parsed717 def _add_modeled_parse(self, response, shape, final_parsed):718 if shape is None:719 return final_parsed720 member_shapes = shape.members721 self._parse_non_payload_attrs(response, shape,722 member_shapes, final_parsed)723 self._parse_payload(response, shape, member_shapes, final_parsed)724 def _do_modeled_error_parse(self, response, shape):725 final_parsed = {}726 self._add_modeled_parse(response, shape, final_parsed)727 return final_parsed728 def _populate_response_metadata(self, response):729 metadata = {}730 headers = response['headers']731 if 'x-amzn-requestid' in headers:732 metadata['RequestId'] = headers['x-amzn-requestid']733 elif 'x-amz-request-id' in headers:734 metadata['RequestId'] = headers['x-amz-request-id']735 # HostId is what it's called whenever this value is returned736 # in an XML response body, so to be consistent, we'll always737 # call is HostId.738 metadata['HostId'] = headers.get('x-amz-id-2', '')739 return metadata740 def _parse_payload(self, response, shape, member_shapes, final_parsed):741 if 'payload' in shape.serialization:742 # If a payload is specified in the output shape, then only that743 # shape is used for the body payload.744 payload_member_name = shape.serialization['payload']745 body_shape = member_shapes[payload_member_name]746 if body_shape.serialization.get('eventstream'):747 body = self._create_event_stream(response, body_shape)748 final_parsed[payload_member_name] = body749 elif body_shape.type_name in ['string', 'blob']:750 # This is a stream751 body = response['body']752 if isinstance(body, bytes):753 body = body.decode(self.DEFAULT_ENCODING)754 final_parsed[payload_member_name] = body755 else:756 original_parsed = self._initial_body_parse(response['body'])757 final_parsed[payload_member_name] = self._parse_shape(758 body_shape, original_parsed)759 else:760 original_parsed = self._initial_body_parse(response['body'])761 body_parsed = self._parse_shape(shape, original_parsed)762 final_parsed.update(body_parsed)763 def _parse_non_payload_attrs(self, response, shape,764 member_shapes, final_parsed):765 headers = response['headers']766 for name in member_shapes:767 member_shape = member_shapes[name]768 location = member_shape.serialization.get('location')769 if location is None:770 continue771 elif location == 'statusCode':772 final_parsed[name] = self._parse_shape(773 member_shape, response['status_code'])774 elif location == 'headers':775 final_parsed[name] = self._parse_header_map(member_shape,776 headers)777 elif location == 'header':778 header_name = member_shape.serialization.get('name', name)779 if header_name in headers:780 final_parsed[name] = self._parse_shape(781 member_shape, headers[header_name])782 def _parse_header_map(self, shape, headers):783 # Note that headers are case insensitive, so we .lower()784 # all header names and header prefixes.785 parsed = {}786 prefix = shape.serialization.get('name', '').lower()787 for header_name in headers:788 if header_name.lower().startswith(prefix):789 # The key name inserted into the parsed hash790 # strips off the prefix.791 name = header_name[len(prefix):]792 parsed[name] = headers[header_name]793 return parsed794 def _initial_body_parse(self, body_contents):795 # This method should do the initial xml/json parsing of the796 # body. We we still need to walk the parsed body in order797 # to convert types, but this method will do the first round798 # of parsing.799 raise NotImplementedError("_initial_body_parse")800 def _handle_string(self, shape, value):801 parsed = value802 if is_json_value_header(shape):803 decoded = base64.b64decode(value).decode(self.DEFAULT_ENCODING)804 parsed = json.loads(decoded)805 return parsed806 def _handle_list(self, shape, node):807 location = shape.serialization.get('location')808 if location == 'header' and not isinstance(node, list):809 # List in headers may be a comma separated string as per RFC7230810 node = [e.strip() for e in node.split(',')]811 return super(BaseRestParser, self)._handle_list(shape, node)812class RestJSONParser(BaseRestParser, BaseJSONParser):813 EVENT_STREAM_PARSER_CLS = EventStreamJSONParser814 def _initial_body_parse(self, body_contents):815 return self._parse_body_as_json(body_contents)816 def _do_error_parse(self, response, shape):817 error = super(RestJSONParser, self)._do_error_parse(response, shape)818 self._inject_error_code(error, response)819 return error820 def _inject_error_code(self, error, response):821 # The "Code" value can come from either a response822 # header or a value in the JSON body.823 body = self._initial_body_parse(response['body'])824 if 'x-amzn-errortype' in response['headers']:825 code = response['headers']['x-amzn-errortype']826 # Could be:827 # x-amzn-errortype: ValidationException:828 code = code.split(':')[0]829 error['Error']['Code'] = code830 elif 'code' in body or 'Code' in body:831 error['Error']['Code'] = body.get(832 'code', body.get('Code', ''))833class RestXMLParser(BaseRestParser, BaseXMLResponseParser):834 EVENT_STREAM_PARSER_CLS = EventStreamXMLParser835 def _initial_body_parse(self, xml_string):836 if not xml_string:837 return ETree.Element('')838 return self._parse_xml_string_to_dom(xml_string)839 def _do_error_parse(self, response, shape):840 # We're trying to be service agnostic here, but S3 does have a slightly841 # different response structure for its errors compared to other842 # rest-xml serivces (route53/cloudfront). We handle this by just843 # trying to parse both forms.844 # First:845 # <ErrorResponse xmlns="...">846 # <Error>847 # <Type>Sender</Type>848 # <Code>InvalidInput</Code>849 # <Message>Invalid resource type: foo</Message>850 # </Error>851 # <RequestId>request-id</RequestId>852 # </ErrorResponse>853 if response['body']:854 # If the body ends up being invalid xml, the xml parser should not855 # blow up. It should at least try to pull information about the856 # the error response from other sources like the HTTP status code.857 try:858 return self._parse_error_from_body(response)859 except ResponseParserError:860 LOG.debug(861 'Exception caught when parsing error response body:',862 exc_info=True)863 return self._parse_error_from_http_status(response)864 def _parse_error_from_http_status(self, response):865 return {866 'Error': {867 'Code': str(response['status_code']),868 'Message': six.moves.http_client.responses.get(869 response['status_code'], ''),870 },871 'ResponseMetadata': {872 'RequestId': response['headers'].get('x-amz-request-id', ''),873 'HostId': response['headers'].get('x-amz-id-2', ''),874 }875 }876 def _parse_error_from_body(self, response):877 xml_contents = response['body']878 root = self._parse_xml_string_to_dom(xml_contents)879 parsed = self._build_name_to_xml_node(root)880 self._replace_nodes(parsed)881 if root.tag == 'Error':882 # This is an S3 error response. First we'll populate the883 # response metadata.884 metadata = self._populate_response_metadata(response)885 # The RequestId and the HostId are already in the886 # ResponseMetadata, but are also duplicated in the XML887 # body. We don't need these values in both places,888 # we'll just remove them from the parsed XML body.889 parsed.pop('RequestId', '')890 parsed.pop('HostId', '')891 return {'Error': parsed, 'ResponseMetadata': metadata}892 elif 'RequestId' in parsed:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful