How to use _parse_body_as_json method in localstack

Best Python code snippet using localstack_python

parsers.py

Source:parsers.py Github

copy

Full Screen

...556 return self._blob_parser(value)557 def _handle_timestamp(self, shape, value):558 return self._timestamp_parser(value)559 def _do_error_parse(self, response, shape):560 body = self._parse_body_as_json(response['body'])561 error = {"Error": {"Message": '', "Code": ''}, "ResponseMetadata": {}}562 # Error responses can have slightly different structures for json.563 # The basic structure is:564 #565 # {"__type":"ConnectClientException",566 # "message":"The error message."}567 # The error message can either come in the 'message' or 'Message' key568 # so we need to check for both.569 error['Error']['Message'] = body.get('message',570 body.get('Message', ''))571 # if the message did not contain an error code572 # include the response status code573 response_code = response.get('status_code')574 code = body.get('__type', response_code and str(response_code))575 if code is not None:576 # code has a couple forms as well:577 # * "com.aws.dynamodb.vAPI#ProvisionedThroughputExceededException"578 # * "ResourceNotFoundException"579 if '#' in code:580 code = code.rsplit('#', 1)[1]581 error['Error']['Code'] = code582 self._inject_response_metadata(error, response['headers'])583 return error584 def _inject_response_metadata(self, parsed, headers):585 if 'x-amzn-requestid' in headers:586 parsed.setdefault('ResponseMetadata', {})['RequestId'] = (587 headers['x-amzn-requestid'])588 def _parse_body_as_json(self, body_contents):589 if not body_contents:590 return {}591 body = body_contents.decode(self.DEFAULT_ENCODING)592 try:593 original_parsed = json.loads(body)594 return original_parsed595 except ValueError:596 # if the body cannot be parsed, include597 # the literal string as the message598 return {'message': body}599class BaseEventStreamParser(ResponseParser):600 def _do_parse(self, response, shape):601 final_parsed = {}602 if shape.serialization.get('eventstream'):603 event_type = response['headers'].get(':event-type')604 event_shape = shape.members.get(event_type)605 if event_shape:606 final_parsed[event_type] = self._do_parse(response, event_shape)607 else:608 self._parse_non_payload_attrs(response, shape,609 shape.members, final_parsed)610 self._parse_payload(response, shape, shape.members, final_parsed)611 return final_parsed612 def _do_error_parse(self, response, shape):613 exception_type = response['headers'].get(':exception-type')614 exception_shape = shape.members.get(exception_type)615 if exception_shape is not None:616 original_parsed = self._initial_body_parse(response['body'])617 body = self._parse_shape(exception_shape, original_parsed)618 error = {619 'Error': {620 'Code': exception_type,621 'Message': body.get('Message', body.get('message', ''))622 }623 }624 else:625 error = {626 'Error': {627 'Code': response['headers'].get(':error-code', ''),628 'Message': response['headers'].get(':error-message', ''),629 }630 }631 return error632 def _parse_payload(self, response, shape, member_shapes, final_parsed):633 if shape.serialization.get('event'):634 for name in member_shapes:635 member_shape = member_shapes[name]636 if member_shape.serialization.get('eventpayload'):637 body = response['body']638 if member_shape.type_name == 'blob':639 parsed_body = body640 elif member_shape.type_name == 'string':641 parsed_body = body.decode(self.DEFAULT_ENCODING)642 else:643 raw_parse = self._initial_body_parse(body)644 parsed_body = self._parse_shape(member_shape, raw_parse)645 final_parsed[name] = parsed_body646 return647 # If we didn't find an explicit payload, use the current shape648 original_parsed = self._initial_body_parse(response['body'])649 body_parsed = self._parse_shape(shape, original_parsed)650 final_parsed.update(body_parsed)651 def _parse_non_payload_attrs(self, response, shape,652 member_shapes, final_parsed):653 headers = response['headers']654 for name in member_shapes:655 member_shape = member_shapes[name]656 if member_shape.serialization.get('eventheader'):657 if name in headers:658 value = headers[name]659 if member_shape.type_name == 'timestamp':660 # Event stream timestamps are an in milleseconds so we661 # divide by 1000 to convert to seconds.662 value = self._timestamp_parser(value / 1000.0)663 final_parsed[name] = value664 def _initial_body_parse(self, body_contents):665 # This method should do the initial xml/json parsing of the666 # body. We we still need to walk the parsed body in order667 # to convert types, but this method will do the first round668 # of parsing.669 raise NotImplementedError("_initial_body_parse")670class EventStreamJSONParser(BaseEventStreamParser, BaseJSONParser):671 def _initial_body_parse(self, body_contents):672 return self._parse_body_as_json(body_contents)673class EventStreamXMLParser(BaseEventStreamParser, BaseXMLResponseParser):674 def _initial_body_parse(self, xml_string):675 if not xml_string:676 return ETree.Element('')677 return self._parse_xml_string_to_dom(xml_string)678class JSONParser(BaseJSONParser):679 EVENT_STREAM_PARSER_CLS = EventStreamJSONParser680 """Response parser for the "json" protocol."""681 def _do_parse(self, response, shape):682 parsed = {}683 if shape is not None:684 event_name = shape.event_stream_name685 if event_name:686 parsed = self._handle_event_stream(response, shape, event_name)687 else:688 parsed = self._handle_json_body(response['body'], shape)689 self._inject_response_metadata(parsed, response['headers'])690 return parsed691 def _do_modeled_error_parse(self, response, shape):692 return self._handle_json_body(response['body'], shape)693 def _handle_event_stream(self, response, shape, event_name):694 event_stream_shape = shape.members[event_name]695 event_stream = self._create_event_stream(response, event_stream_shape)696 try:697 event = event_stream.get_initial_response()698 except NoInitialResponseError:699 error_msg = 'First event was not of type initial-response'700 raise ResponseParserError(error_msg)701 parsed = self._handle_json_body(event.payload, shape)702 parsed[event_name] = event_stream703 return parsed704 def _handle_json_body(self, raw_body, shape):705 # The json.loads() gives us the primitive JSON types,706 # but we need to traverse the parsed JSON data to convert707 # to richer types (blobs, timestamps, etc.708 parsed_json = self._parse_body_as_json(raw_body)709 return self._parse_shape(shape, parsed_json)710class BaseRestParser(ResponseParser):711 def _do_parse(self, response, shape):712 final_parsed = {}713 final_parsed['ResponseMetadata'] = self._populate_response_metadata(714 response)715 self._add_modeled_parse(response, shape, final_parsed)716 return final_parsed717 def _add_modeled_parse(self, response, shape, final_parsed):718 if shape is None:719 return final_parsed720 member_shapes = shape.members721 self._parse_non_payload_attrs(response, shape,722 member_shapes, final_parsed)723 self._parse_payload(response, shape, member_shapes, final_parsed)724 def _do_modeled_error_parse(self, response, shape):725 final_parsed = {}726 self._add_modeled_parse(response, shape, final_parsed)727 return final_parsed728 def _populate_response_metadata(self, response):729 metadata = {}730 headers = response['headers']731 if 'x-amzn-requestid' in headers:732 metadata['RequestId'] = headers['x-amzn-requestid']733 elif 'x-amz-request-id' in headers:734 metadata['RequestId'] = headers['x-amz-request-id']735 # HostId is what it's called whenever this value is returned736 # in an XML response body, so to be consistent, we'll always737 # call is HostId.738 metadata['HostId'] = headers.get('x-amz-id-2', '')739 return metadata740 def _parse_payload(self, response, shape, member_shapes, final_parsed):741 if 'payload' in shape.serialization:742 # If a payload is specified in the output shape, then only that743 # shape is used for the body payload.744 payload_member_name = shape.serialization['payload']745 body_shape = member_shapes[payload_member_name]746 if body_shape.serialization.get('eventstream'):747 body = self._create_event_stream(response, body_shape)748 final_parsed[payload_member_name] = body749 elif body_shape.type_name in ['string', 'blob']:750 # This is a stream751 body = response['body']752 if isinstance(body, bytes):753 body = body.decode(self.DEFAULT_ENCODING)754 final_parsed[payload_member_name] = body755 else:756 original_parsed = self._initial_body_parse(response['body'])757 final_parsed[payload_member_name] = self._parse_shape(758 body_shape, original_parsed)759 else:760 original_parsed = self._initial_body_parse(response['body'])761 body_parsed = self._parse_shape(shape, original_parsed)762 final_parsed.update(body_parsed)763 def _parse_non_payload_attrs(self, response, shape,764 member_shapes, final_parsed):765 headers = response['headers']766 for name in member_shapes:767 member_shape = member_shapes[name]768 location = member_shape.serialization.get('location')769 if location is None:770 continue771 elif location == 'statusCode':772 final_parsed[name] = self._parse_shape(773 member_shape, response['status_code'])774 elif location == 'headers':775 final_parsed[name] = self._parse_header_map(member_shape,776 headers)777 elif location == 'header':778 header_name = member_shape.serialization.get('name', name)779 if header_name in headers:780 final_parsed[name] = self._parse_shape(781 member_shape, headers[header_name])782 def _parse_header_map(self, shape, headers):783 # Note that headers are case insensitive, so we .lower()784 # all header names and header prefixes.785 parsed = {}786 prefix = shape.serialization.get('name', '').lower()787 for header_name in headers:788 if header_name.lower().startswith(prefix):789 # The key name inserted into the parsed hash790 # strips off the prefix.791 name = header_name[len(prefix):]792 parsed[name] = headers[header_name]793 return parsed794 def _initial_body_parse(self, body_contents):795 # This method should do the initial xml/json parsing of the796 # body. We we still need to walk the parsed body in order797 # to convert types, but this method will do the first round798 # of parsing.799 raise NotImplementedError("_initial_body_parse")800 def _handle_string(self, shape, value):801 parsed = value802 if is_json_value_header(shape):803 decoded = base64.b64decode(value).decode(self.DEFAULT_ENCODING)804 parsed = json.loads(decoded)805 return parsed806 def _handle_list(self, shape, node):807 location = shape.serialization.get('location')808 if location == 'header' and not isinstance(node, list):809 # List in headers may be a comma separated string as per RFC7230810 node = [e.strip() for e in node.split(',')]811 return super(BaseRestParser, self)._handle_list(shape, node)812class RestJSONParser(BaseRestParser, BaseJSONParser):813 EVENT_STREAM_PARSER_CLS = EventStreamJSONParser814 def _initial_body_parse(self, body_contents):815 return self._parse_body_as_json(body_contents)816 def _do_error_parse(self, response, shape):817 error = super(RestJSONParser, self)._do_error_parse(response, shape)818 self._inject_error_code(error, response)819 return error820 def _inject_error_code(self, error, response):821 # The "Code" value can come from either a response822 # header or a value in the JSON body.823 body = self._initial_body_parse(response['body'])824 if 'x-amzn-errortype' in response['headers']:825 code = response['headers']['x-amzn-errortype']826 # Could be:827 # x-amzn-errortype: ValidationException:828 code = code.split(':')[0]829 error['Error']['Code'] = code...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful