Best Python code snippet using localstack_python
provider.py
Source:provider.py
...559 with self._mutex:560 if key not in self.queues:561 raise QueueDoesNotExist()562 return self.queues[key]563 def _require_queue_by_arn(self, queue_arn: str) -> SqsQueue:564 arn = parse_arn(queue_arn)565 key = QueueKey(region=arn["region"], account_id=arn["account"], name=arn["resource"])566 return self._require_queue(key)567 def _resolve_queue(568 self,569 context: RequestContext,570 queue_name: Optional[str] = None,571 queue_url: Optional[str] = None,572 ) -> SqsQueue:573 """574 Uses resolve_queue_key to determine the QueueKey from the given input, and returns the respective queue,575 or raises QueueDoesNotExist if it does not exist.576 :param context: the request context, used for getting region and account_id, and optionally the queue_url577 :param queue_name: the queue name (if this is set, then this will be used for the key)578 :param queue_url: the queue url (if name is not set, this will be used to determine the queue name)579 :returns: the queue580 :raises QueueDoesNotExist: if the queue does not exist581 """582 key = resolve_queue_key(context, queue_name, queue_url)583 return self._require_queue(key)584 def create_queue(585 self,586 context: RequestContext,587 queue_name: String,588 attributes: QueueAttributeMap = None,589 tags: TagMap = None,590 ) -> CreateQueueResult:591 fifo = attributes and (592 attributes.get(QueueAttributeName.FifoQueue, "false").lower() == "true"593 )594 k = QueueKey(context.region, context.account_id, queue_name)595 # Special Case TODO: why is an emtpy policy passed at all? same in set_queue_attributes596 if attributes and attributes.get(QueueAttributeName.Policy) == "":597 del attributes[QueueAttributeName.Policy]598 if k in self.queues:599 raise QueueNameExists(queue_name)600 if fifo:601 queue = FifoQueue(k, attributes, tags)602 else:603 queue = StandardQueue(k, attributes, tags)604 LOG.debug("creating queue key=%s attributes=%s tags=%s", k, attributes, tags)605 self._add_queue(queue)606 return CreateQueueResult(QueueUrl=queue.url(context))607 def get_queue_url(608 self, context: RequestContext, queue_name: String, queue_owner_aws_account_id: String = None609 ) -> GetQueueUrlResult:610 account_id = queue_owner_aws_account_id or context.account_id611 key = QueueKey(context.region, account_id, queue_name)612 if key not in self.queues:613 raise QueueDoesNotExist("The specified queue does not exist for this wsdl version.")614 queue = self.queues[key]615 self._assert_permission(context, queue)616 return GetQueueUrlResult(QueueUrl=queue.url(context))617 def list_queues(618 self,619 context: RequestContext,620 queue_name_prefix: String = None,621 next_token: Token = None,622 max_results: BoxedInteger = None,623 ) -> ListQueuesResult:624 urls = []625 for queue in self.queues.values():626 if queue.key.region != context.region:627 continue628 if queue.key.account_id != context.account_id:629 continue630 if queue_name_prefix:631 if not queue.name.startswith(queue_name_prefix):632 continue633 urls.append(queue.url(context))634 if max_results:635 # FIXME: also need to solve pagination with stateful iterators: If the total number of items available is636 # more than the value specified, a NextToken is provided in the command's output. To resume pagination,637 # provide the NextToken value in the starting-token argument of a subsequent command. Do not use the638 # NextToken response element directly outside of the AWS CLI.639 urls = urls[:max_results]640 return ListQueuesResult(QueueUrls=urls)641 def change_message_visibility(642 self,643 context: RequestContext,644 queue_url: String,645 receipt_handle: String,646 visibility_timeout: Integer,647 ) -> None:648 queue = self._resolve_queue(context, queue_url=queue_url)649 self._assert_permission(context, queue)650 queue.update_visibility_timeout(receipt_handle, visibility_timeout)651 def change_message_visibility_batch(652 self,653 context: RequestContext,654 queue_url: String,655 entries: ChangeMessageVisibilityBatchRequestEntryList,656 ) -> ChangeMessageVisibilityBatchResult:657 queue = self._resolve_queue(context, queue_url=queue_url)658 self._assert_permission(context, queue)659 self._assert_batch(entries)660 successful = []661 failed = []662 with queue.mutex:663 for entry in entries:664 try:665 queue.update_visibility_timeout(666 entry["ReceiptHandle"], entry["VisibilityTimeout"]667 )668 successful.append({"Id": entry["Id"]})669 except Exception as e:670 failed.append(671 BatchResultErrorEntry(672 Id=entry["Id"],673 SenderFault=False,674 Code=e.__class__.__name__,675 Message=str(e),676 )677 )678 return ChangeMessageVisibilityBatchResult(679 Successful=successful,680 Failed=failed,681 )682 def delete_queue(self, context: RequestContext, queue_url: String) -> None:683 with self._mutex:684 queue = self._resolve_queue(context, queue_url=queue_url)685 self._assert_permission(context, queue)686 del self.queues[queue.key]687 def get_queue_attributes(688 self, context: RequestContext, queue_url: String, attribute_names: AttributeNameList = None689 ) -> GetQueueAttributesResult:690 queue = self._resolve_queue(context, queue_url=queue_url)691 self._assert_permission(context, queue)692 if not attribute_names:693 return GetQueueAttributesResult(Attributes={})694 if QueueAttributeName.All in attribute_names:695 # return GetQueueAttributesResult(Attributes=queue.attributes)696 attribute_names = queue.attributes.keys()697 result: Dict[QueueAttributeName, str] = {}698 for attr in attribute_names:699 try:700 getattr(QueueAttributeName, attr)701 except AttributeError:702 raise InvalidAttributeName(f"Unknown attribute {attr}.")703 if callable(queue.attributes.get(attr)):704 func = queue.attributes.get(attr)705 result[attr] = func()706 else:707 result[attr] = queue.attributes.get(attr)708 return GetQueueAttributesResult(Attributes=result)709 def send_message(710 self,711 context: RequestContext,712 queue_url: String,713 message_body: String,714 delay_seconds: Integer = None,715 message_attributes: MessageBodyAttributeMap = None,716 message_system_attributes: MessageBodySystemAttributeMap = None,717 message_deduplication_id: String = None,718 message_group_id: String = None,719 ) -> SendMessageResult:720 queue = self._resolve_queue(context, queue_url=queue_url)721 self._assert_permission(context, queue)722 message = self._put_message(723 queue,724 context,725 message_body,726 delay_seconds,727 message_attributes,728 message_system_attributes,729 message_deduplication_id,730 message_group_id,731 )732 return SendMessageResult(733 MessageId=message["MessageId"],734 MD5OfMessageBody=message["MD5OfBody"],735 MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),736 SequenceNumber=queue.generate_sequence_number(),737 MD5OfMessageSystemAttributes=_create_message_attribute_hash(message_system_attributes),738 )739 def send_message_batch(740 self, context: RequestContext, queue_url: String, entries: SendMessageBatchRequestEntryList741 ) -> SendMessageBatchResult:742 queue = self._resolve_queue(context, queue_url=queue_url)743 self._assert_permission(context, queue)744 self._assert_batch(entries)745 successful = []746 failed = []747 with queue.mutex:748 for entry in entries:749 try:750 message = self._put_message(751 queue,752 context,753 message_body=entry.get("MessageBody"),754 delay_seconds=entry.get("DelaySeconds"),755 message_attributes=entry.get("MessageAttributes"),756 message_system_attributes=entry.get("MessageSystemAttributes"),757 message_deduplication_id=entry.get("MessageDeduplicationId"),758 message_group_id=entry.get("MessageGroupId"),759 )760 successful.append(761 SendMessageBatchResultEntry(762 Id=entry["Id"],763 MessageId=message.get("MessageId"),764 MD5OfMessageBody=message.get("MD5OfBody"),765 MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),766 MD5OfMessageSystemAttributes=_create_message_attribute_hash(767 message.get("message_system_attributes")768 ),769 SequenceNumber=queue.generate_sequence_number(),770 )771 )772 except Exception as e:773 failed.append(774 BatchResultErrorEntry(775 Id=entry["Id"],776 SenderFault=False,777 Code=e.__class__.__name__,778 Message=str(e),779 )780 )781 return SendMessageBatchResult(782 Successful=successful,783 Failed=failed,784 )785 def _put_message(786 self,787 queue: SqsQueue,788 context: RequestContext,789 message_body: String,790 delay_seconds: Integer = None,791 message_attributes: MessageBodyAttributeMap = None,792 message_system_attributes: MessageBodySystemAttributeMap = None,793 message_deduplication_id: String = None,794 message_group_id: String = None,795 ) -> Message:796 # TODO: default message attributes (SenderId, ApproximateFirstReceiveTimestamp, ...)797 check_message_content(message_body)798 check_attributes(message_attributes)799 check_attributes(message_system_attributes)800 check_fifo_id(message_deduplication_id)801 check_fifo_id(message_group_id)802 message: Message = Message(803 MessageId=generate_message_id(),804 MD5OfBody=md5(message_body),805 Body=message_body,806 Attributes=self._create_message_attributes(context, message_system_attributes),807 MD5OfMessageAttributes=_create_message_attribute_hash(message_attributes),808 MessageAttributes=message_attributes,809 )810 delay_seconds = delay_seconds or queue.attributes.get(QueueAttributeName.DelaySeconds, "0")811 if int(delay_seconds):812 # FIXME: this is a pretty bad implementation (one thread per message...). polling on a priority queue813 # would probably be better. We also need access to delayed messages for the814 # ApproximateNumberrOfDelayedMessages attribute.815 threading.Timer(816 int(delay_seconds),817 queue.put,818 args=(message, message_deduplication_id, message_group_id),819 ).start()820 else:821 queue.put(822 message=message,823 message_deduplication_id=message_deduplication_id,824 message_group_id=message_group_id,825 )826 return message827 def receive_message(828 self,829 context: RequestContext,830 queue_url: String,831 attribute_names: AttributeNameList = None,832 message_attribute_names: MessageAttributeNameList = None,833 max_number_of_messages: Integer = None,834 visibility_timeout: Integer = None,835 wait_time_seconds: Integer = None,836 receive_request_attempt_id: String = None,837 ) -> ReceiveMessageResult:838 queue = self._resolve_queue(context, queue_url=queue_url)839 self._assert_permission(context, queue)840 num = max_number_of_messages or 1841 block = wait_time_seconds is not None842 # collect messages843 messages = []844 while num:845 try:846 standard_message = queue.get(847 block=block, timeout=wait_time_seconds, visibility_timeout=visibility_timeout848 )849 msg = standard_message.message850 except Empty:851 break852 moved_to_dlq = False853 if (854 queue.attributes855 and queue.attributes.get(QueueAttributeName.RedrivePolicy) is not None856 ):857 moved_to_dlq = self._dead_letter_check(queue, standard_message, context)858 if moved_to_dlq:859 continue860 # filter attributes861 if message_attribute_names:862 if "All" not in message_attribute_names:863 msg["MessageAttributes"] = {864 k: v865 for k, v in msg["MessageAttributes"].items()866 if k in message_attribute_names867 }868 # TODO: why is this called even if we receive "All" attributes?869 msg["MD5OfMessageAttributes"] = _create_message_attribute_hash(870 msg["MessageAttributes"]871 )872 else:873 del msg["MessageAttributes"]874 # add message to result875 messages.append(msg)876 num -= 1877 # TODO: how does receiving behave if the queue was deleted in the meantime?878 return ReceiveMessageResult(Messages=messages)879 def _dead_letter_check(880 self, queue: SqsQueue, std_m: SqsMessage, context: RequestContext881 ) -> bool:882 redrive_policy = json.loads(queue.attributes.get(QueueAttributeName.RedrivePolicy))883 # TODO: include the names of the dictionary sub - attributes in the autogenerated code?884 max_receive_count = redrive_policy["maxReceiveCount"]885 if std_m.receive_times > max_receive_count:886 dead_letter_target_arn = redrive_policy["deadLetterTargetArn"]887 dl_queue = self._require_queue_by_arn(dead_letter_target_arn)888 # TODO: this needs to be atomic?889 dead_message = std_m.message890 dl_queue.put(message=dead_message)891 queue.remove(std_m.message["ReceiptHandle"])892 return True893 else:894 return False895 def delete_message(896 self, context: RequestContext, queue_url: String, receipt_handle: String897 ) -> None:898 queue = self._resolve_queue(context, queue_url=queue_url)899 self._assert_permission(context, queue)900 queue.remove(receipt_handle)901 def delete_message_batch(...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!