Best Python code snippet using localstack_python
provider.py
Source:provider.py
...511 check_message_content(attribute_value)512 except InvalidMessageContents as e:513 # AWS throws a different exception here514 raise InvalidParameterValue(e.args[0])515def check_fifo_id(fifo_id):516 if not fifo_id:517 return518 if len(fifo_id) >= 128:519 raise InvalidParameterValue(520 "Message deduplication ID and group ID must be shorter than 128 bytes"521 )522 if not re.match(FIFO_MSG_REGEX, fifo_id):523 raise InvalidParameterValue(524 "Invalid characters found. Deduplication ID and group ID can only contain"525 "alphanumeric characters as well as TODO"526 )527class SqsProvider(SqsApi, ServiceLifecycleHook):528 """529 LocalStack SQS Provider.530 LIMITATIONS:531 - Pagination of results (NextToken)532 - Delivery guarantees533 - The region is not encoded in the queue URL534 """535 queues: Dict[QueueKey, SqsQueue]536 def __init__(self) -> None:537 super().__init__()538 self.queues = {}539 self._mutex = threading.RLock()540 self._inflight_worker = InflightUpdateWorker(self.queues)541 def start(self):542 self._inflight_worker.start()543 def shutdown(self):544 self._inflight_worker.stop()545 def on_before_start(self):546 self.start()547 def on_before_stop(self):548 self.shutdown()549 def _add_queue(self, queue: SqsQueue):550 with self._mutex:551 self.queues[queue.key] = queue552 def _require_queue(self, key: QueueKey) -> SqsQueue:553 """554 Returns the queue for the given key, or raises QueueDoesNotExist if it does not exist.555 :param key: the QueueKey to look for556 :returns: the queue557 :raises QueueDoesNotExist: if the queue does not exist558 """559 with self._mutex:560 if key not in self.queues:561 raise QueueDoesNotExist()562 return self.queues[key]563 def _require_queue_by_arn(self, queue_arn: str) -> SqsQueue:564 arn = parse_arn(queue_arn)565 key = QueueKey(region=arn["region"], account_id=arn["account"], name=arn["resource"])566 return self._require_queue(key)567 def _resolve_queue(568 self,569 context: RequestContext,570 queue_name: Optional[str] = None,571 queue_url: Optional[str] = None,572 ) -> SqsQueue:573 """574 Uses resolve_queue_key to determine the QueueKey from the given input, and returns the respective queue,575 or raises QueueDoesNotExist if it does not exist.576 :param context: the request context, used for getting region and account_id, and optionally the queue_url577 :param queue_name: the queue name (if this is set, then this will be used for the key)578 :param queue_url: the queue url (if name is not set, this will be used to determine the queue name)579 :returns: the queue580 :raises QueueDoesNotExist: if the queue does not exist581 """582 key = resolve_queue_key(context, queue_name, queue_url)583 return self._require_queue(key)584 def create_queue(585 self,586 context: RequestContext,587 queue_name: String,588 attributes: QueueAttributeMap = None,589 tags: TagMap = None,590 ) -> CreateQueueResult:591 fifo = attributes and (592 attributes.get(QueueAttributeName.FifoQueue, "false").lower() == "true"593 )594 k = QueueKey(context.region, context.account_id, queue_name)595 # Special Case TODO: why is an emtpy policy passed at all? same in set_queue_attributes596 if attributes and attributes.get(QueueAttributeName.Policy) == "":597 del attributes[QueueAttributeName.Policy]598 if k in self.queues:599 raise QueueNameExists(queue_name)600 if fifo:601 queue = FifoQueue(k, attributes, tags)602 else:603 queue = StandardQueue(k, attributes, tags)604 LOG.debug("creating queue key=%s attributes=%s tags=%s", k, attributes, tags)605 self._add_queue(queue)606 return CreateQueueResult(QueueUrl=queue.url(context))607 def get_queue_url(608 self, context: RequestContext, queue_name: String, queue_owner_aws_account_id: String = None609 ) -> GetQueueUrlResult:610 account_id = queue_owner_aws_account_id or context.account_id611 key = QueueKey(context.region, account_id, queue_name)612 if key not in self.queues:613 raise QueueDoesNotExist("The specified queue does not exist for this wsdl version.")614 queue = self.queues[key]615 self._assert_permission(context, queue)616 return GetQueueUrlResult(QueueUrl=queue.url(context))617 def list_queues(618 self,619 context: RequestContext,620 queue_name_prefix: String = None,621 next_token: Token = None,622 max_results: BoxedInteger = None,623 ) -> ListQueuesResult:624 urls = []625 for queue in self.queues.values():626 if queue.key.region != context.region:627 continue628 if queue.key.account_id != context.account_id:629 continue630 if queue_name_prefix:631 if not queue.name.startswith(queue_name_prefix):632 continue633 urls.append(queue.url(context))634 if max_results:635 # FIXME: also need to solve pagination with stateful iterators: If the total number of items available is636 # more than the value specified, a NextToken is provided in the command's output. To resume pagination,637 # provide the NextToken value in the starting-token argument of a subsequent command. Do not use the638 # NextToken response element directly outside of the AWS CLI.639 urls = urls[:max_results]640 return ListQueuesResult(QueueUrls=urls)641 def change_message_visibility(642 self,643 context: RequestContext,644 queue_url: String,645 receipt_handle: String,646 visibility_timeout: Integer,647 ) -> None:648 queue = self._resolve_queue(context, queue_url=queue_url)649 self._assert_permission(context, queue)650 queue.update_visibility_timeout(receipt_handle, visibility_timeout)651 def change_message_visibility_batch(652 self,653 context: RequestContext,654 queue_url: String,655 entries: ChangeMessageVisibilityBatchRequestEntryList,656 ) -> ChangeMessageVisibilityBatchResult:657 queue = self._resolve_queue(context, queue_url=queue_url)658 self._assert_permission(context, queue)659 self._assert_batch(entries)660 successful = []661 failed = []662 with queue.mutex:663 for entry in entries:664 try:665 queue.update_visibility_timeout(666 entry["ReceiptHandle"], entry["VisibilityTimeout"]667 )668 successful.append({"Id": entry["Id"]})669 except Exception as e:670 failed.append(671 BatchResultErrorEntry(672 Id=entry["Id"],673 SenderFault=False,674 Code=e.__class__.__name__,675 Message=str(e),676 )677 )678 return ChangeMessageVisibilityBatchResult(679 Successful=successful,680 Failed=failed,681 )682 def delete_queue(self, context: RequestContext, queue_url: String) -> None:683 with self._mutex:684 queue = self._resolve_queue(context, queue_url=queue_url)685 self._assert_permission(context, queue)686 del self.queues[queue.key]687 def get_queue_attributes(688 self, context: RequestContext, queue_url: String, attribute_names: AttributeNameList = None689 ) -> GetQueueAttributesResult:690 queue = self._resolve_queue(context, queue_url=queue_url)691 self._assert_permission(context, queue)692 if not attribute_names:693 return GetQueueAttributesResult(Attributes={})694 if QueueAttributeName.All in attribute_names:695 # return GetQueueAttributesResult(Attributes=queue.attributes)696 attribute_names = queue.attributes.keys()697 result: Dict[QueueAttributeName, str] = {}698 for attr in attribute_names:699 try:700 getattr(QueueAttributeName, attr)701 except AttributeError:702 raise InvalidAttributeName(f"Unknown attribute {attr}.")703 if callable(queue.attributes.get(attr)):704 func = queue.attributes.get(attr)705 result[attr] = func()706 else:707 result[attr] = queue.attributes.get(attr)708 return GetQueueAttributesResult(Attributes=result)709 def send_message(710 self,711 context: RequestContext,712 queue_url: String,713 message_body: String,714 delay_seconds: Integer = None,715 message_attributes: MessageBodyAttributeMap = None,716 message_system_attributes: MessageBodySystemAttributeMap = None,717 message_deduplication_id: String = None,718 message_group_id: String = None,719 ) -> SendMessageResult:720 queue = self._resolve_queue(context, queue_url=queue_url)721 self._assert_permission(context, queue)722 message = self._put_message(723 queue,724 context,725 message_body,726 delay_seconds,727 message_attributes,728 message_system_attributes,729 message_deduplication_id,730 message_group_id,731 )732 return SendMessageResult(733 MessageId=message["MessageId"],734 MD5OfMessageBody=message["MD5OfBody"],735 MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),736 SequenceNumber=queue.generate_sequence_number(),737 MD5OfMessageSystemAttributes=_create_message_attribute_hash(message_system_attributes),738 )739 def send_message_batch(740 self, context: RequestContext, queue_url: String, entries: SendMessageBatchRequestEntryList741 ) -> SendMessageBatchResult:742 queue = self._resolve_queue(context, queue_url=queue_url)743 self._assert_permission(context, queue)744 self._assert_batch(entries)745 successful = []746 failed = []747 with queue.mutex:748 for entry in entries:749 try:750 message = self._put_message(751 queue,752 context,753 message_body=entry.get("MessageBody"),754 delay_seconds=entry.get("DelaySeconds"),755 message_attributes=entry.get("MessageAttributes"),756 message_system_attributes=entry.get("MessageSystemAttributes"),757 message_deduplication_id=entry.get("MessageDeduplicationId"),758 message_group_id=entry.get("MessageGroupId"),759 )760 successful.append(761 SendMessageBatchResultEntry(762 Id=entry["Id"],763 MessageId=message.get("MessageId"),764 MD5OfMessageBody=message.get("MD5OfBody"),765 MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),766 MD5OfMessageSystemAttributes=_create_message_attribute_hash(767 message.get("message_system_attributes")768 ),769 SequenceNumber=queue.generate_sequence_number(),770 )771 )772 except Exception as e:773 failed.append(774 BatchResultErrorEntry(775 Id=entry["Id"],776 SenderFault=False,777 Code=e.__class__.__name__,778 Message=str(e),779 )780 )781 return SendMessageBatchResult(782 Successful=successful,783 Failed=failed,784 )785 def _put_message(786 self,787 queue: SqsQueue,788 context: RequestContext,789 message_body: String,790 delay_seconds: Integer = None,791 message_attributes: MessageBodyAttributeMap = None,792 message_system_attributes: MessageBodySystemAttributeMap = None,793 message_deduplication_id: String = None,794 message_group_id: String = None,795 ) -> Message:796 # TODO: default message attributes (SenderId, ApproximateFirstReceiveTimestamp, ...)797 check_message_content(message_body)798 check_attributes(message_attributes)799 check_attributes(message_system_attributes)800 check_fifo_id(message_deduplication_id)801 check_fifo_id(message_group_id)802 message: Message = Message(803 MessageId=generate_message_id(),804 MD5OfBody=md5(message_body),805 Body=message_body,806 Attributes=self._create_message_attributes(context, message_system_attributes),807 MD5OfMessageAttributes=_create_message_attribute_hash(message_attributes),808 MessageAttributes=message_attributes,809 )810 delay_seconds = delay_seconds or queue.attributes.get(QueueAttributeName.DelaySeconds, "0")811 if int(delay_seconds):812 # FIXME: this is a pretty bad implementation (one thread per message...). polling on a priority queue813 # would probably be better. We also need access to delayed messages for the814 # ApproximateNumberrOfDelayedMessages attribute.815 threading.Timer(...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!