Best Python code snippet using localstack_python
provider.py
Source:provider.py
...333 if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):334 raise InvalidParameterValue(335 "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"336 )337 def validate_queue_attributes(self, attributes):338 valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]339 del valid[valid.index(QueueAttributeName.FifoQueue)]340 for k in attributes.keys():341 if k not in valid:342 raise InvalidAttributeName(f"Unknown attribute name {k}")343 def generate_sequence_number(self):344 return None345class StandardQueue(SqsQueue):346 def put(347 self,348 message: Message,349 visibility_timeout: int = None,350 message_deduplication_id: str = None,351 message_group_id: str = None,352 ):353 if message_deduplication_id:354 raise InvalidParameterValue(355 f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "356 f"request includes a parameter that is not valid for this queue type. "357 )358 if message_group_id:359 raise InvalidParameterValue(360 f"Value {message_group_id} for parameter MessageGroupId is invalid. Reason: The request includes a "361 f"parameter that is not valid for this queue type. "362 )363 standard_message = SqsMessage(time.time(), message)364 if visibility_timeout is not None:365 standard_message.visibility_timeout = visibility_timeout366 else:367 # use the attribute from the queue368 standard_message.visibility_timeout = self.visibility_timeout369 self.visible.put_nowait(standard_message)370class FifoQueue(SqsQueue):371 visible: PriorityQueue372 inflight: Set[SqsMessage]373 receipts: Dict[str, SqsMessage]374 deduplication: Dict[str, Dict[str, SqsMessage]]375 def __init__(self, key: QueueKey, attributes=None, tags=None) -> None:376 super().__init__(key, attributes, tags)377 self.deduplication = {}378 def put(379 self,380 message: Message,381 visibility_timeout: int = None,382 message_deduplication_id: str = None,383 message_group_id: str = None,384 ):385 if not message_group_id:386 raise MissingParameter("The request must contain the parameter MessageGroupId.")387 dedup_id = message_deduplication_id388 content_based_deduplication = (389 "true"390 == (self.attributes.get(QueueAttributeName.ContentBasedDeduplication, "false")).lower()391 )392 if not dedup_id and content_based_deduplication:393 dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()394 if not dedup_id:395 raise InvalidParameterValue(396 "The Queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided "397 "explicitly "398 )399 qm = SqsMessage(400 time.time(),401 message,402 message_deduplication_id=dedup_id,403 message_group_id=message_group_id,404 )405 if visibility_timeout is not None:406 qm.visibility_timeout = visibility_timeout407 else:408 # use the attribute from the queue409 qm.visibility_timeout = self.visibility_timeout410 original_message = None411 original_message_group = self.deduplication.get(message_group_id)412 if original_message_group:413 original_message = original_message_group.get(dedup_id)414 if (415 original_message416 and not original_message.deleted417 and original_message.priority + DEDUPLICATION_INTERVAL_IN_SEC > qm.priority418 ):419 message["MessageId"] = original_message.message["MessageId"]420 else:421 self.visible.put_nowait(qm)422 if not original_message_group:423 self.deduplication[message_group_id] = {}424 self.deduplication[message_group_id][message_deduplication_id] = qm425 def _assert_queue_name(self, name):426 if not name.endswith(".fifo"):427 raise InvalidParameterValue(428 "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "429 "must end with .fifo suffix and be 1 to 80 in length"430 )431 # The .fifo suffix counts towards the 80-character queue name quota.432 queue_name = name[:-5] + "_fifo"433 super()._assert_queue_name(queue_name)434 def validate_queue_attributes(self, attributes):435 valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]436 for k in attributes.keys():437 if k not in valid:438 raise InvalidAttributeName(f"Unknown attribute name {k}")439 # Special Cases440 fifo = attributes.get(QueueAttributeName.FifoQueue)441 if fifo and fifo.lower() != "true":442 raise InvalidAttributeValue(443 "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."444 )445 # TODO: If we ever actually need to do something with this number, it needs to be part of446 # SQSMessage. This means changing all *put*() signatures to return the saved message.447 def generate_sequence_number(self):448 return _create_mock_sequence_number()449class InflightUpdateWorker:450 """451 Regularly re-queues inflight messages whose visibility timeout has expired.452 FIXME: very crude implementation. it would be better to have event-driven communication.453 """454 queues: Dict[QueueKey, SqsQueue]455 def __init__(self, queues: Dict[QueueKey, SqsQueue]) -> None:456 super().__init__()457 self.queues = queues458 self.running = False459 self.thread: Optional[FuncThread] = None460 def start(self):461 if self.thread:462 return463 def _run(*_args):464 self.running = True465 self.run()466 self.thread = start_thread(_run)467 def stop(self):468 if self.thread:469 self.thread.stop()470 self.running = False471 self.thread = None472 def run(self):473 while self.running:474 time.sleep(1)475 for queue in self.queues.values():476 queue.requeue_inflight_messages()477def check_attributes(message_attributes: MessageBodyAttributeMap):478 if not message_attributes:479 return480 for attribute_name in message_attributes:481 if len(attribute_name) >= 256:482 raise InvalidParameterValue(483 "Message (user) attribute names must be shorter than 256 Bytes"484 )485 if not re.match(ATTR_NAME_CHAR_REGEX, attribute_name.lower()):486 raise InvalidParameterValue(487 "Message (user) attributes name can only contain upper and lower score characters, digits, periods, "488 "hyphens and underscores. "489 )490 if not re.match(ATTR_NAME_PREFIX_SUFFIX_REGEX, attribute_name.lower()):491 raise InvalidParameterValue(492 "You can't use message attribute names beginning with 'AWS.' or 'Amazon.'. "493 "These strings are reserved for internal use. Additionally, they cannot start or end with '.'."494 )495 attribute = message_attributes[attribute_name]496 attribute_type = attribute.get("DataType")497 if not attribute_type:498 raise InvalidParameterValue("Missing required parameter DataType")499 if not re.match(ATTR_TYPE_REGEX, attribute_type):500 raise InvalidParameterValue(501 f"Type for parameter MessageAttributes.Attribute_name.DataType must be prefixed"502 f'with "String", "Binary", or "Number", but was: {attribute_type}'503 )504 if len(attribute_type) >= 256:505 raise InvalidParameterValue(506 "Message (user) attribute types must be shorter than 256 Bytes"507 )508 if attribute_type == "String":509 try:510 attribute_value = attribute.get("StringValue")511 check_message_content(attribute_value)512 except InvalidMessageContents as e:513 # AWS throws a different exception here514 raise InvalidParameterValue(e.args[0])515def check_fifo_id(fifo_id):516 if not fifo_id:517 return518 if len(fifo_id) >= 128:519 raise InvalidParameterValue(520 "Message deduplication ID and group ID must be shorter than 128 bytes"521 )522 if not re.match(FIFO_MSG_REGEX, fifo_id):523 raise InvalidParameterValue(524 "Invalid characters found. Deduplication ID and group ID can only contain"525 "alphanumeric characters as well as TODO"526 )527class SqsProvider(SqsApi, ServiceLifecycleHook):528 """529 LocalStack SQS Provider.530 LIMITATIONS:531 - Pagination of results (NextToken)532 - Delivery guarantees533 - The region is not encoded in the queue URL534 """535 queues: Dict[QueueKey, SqsQueue]536 def __init__(self) -> None:537 super().__init__()538 self.queues = {}539 self._mutex = threading.RLock()540 self._inflight_worker = InflightUpdateWorker(self.queues)541 def start(self):542 self._inflight_worker.start()543 def shutdown(self):544 self._inflight_worker.stop()545 def on_before_start(self):546 self.start()547 def on_before_stop(self):548 self.shutdown()549 def _add_queue(self, queue: SqsQueue):550 with self._mutex:551 self.queues[queue.key] = queue552 def _require_queue(self, key: QueueKey) -> SqsQueue:553 """554 Returns the queue for the given key, or raises QueueDoesNotExist if it does not exist.555 :param key: the QueueKey to look for556 :returns: the queue557 :raises QueueDoesNotExist: if the queue does not exist558 """559 with self._mutex:560 if key not in self.queues:561 raise QueueDoesNotExist()562 return self.queues[key]563 def _require_queue_by_arn(self, queue_arn: str) -> SqsQueue:564 arn = parse_arn(queue_arn)565 key = QueueKey(region=arn["region"], account_id=arn["account"], name=arn["resource"])566 return self._require_queue(key)567 def _resolve_queue(568 self,569 context: RequestContext,570 queue_name: Optional[str] = None,571 queue_url: Optional[str] = None,572 ) -> SqsQueue:573 """574 Uses resolve_queue_key to determine the QueueKey from the given input, and returns the respective queue,575 or raises QueueDoesNotExist if it does not exist.576 :param context: the request context, used for getting region and account_id, and optionally the queue_url577 :param queue_name: the queue name (if this is set, then this will be used for the key)578 :param queue_url: the queue url (if name is not set, this will be used to determine the queue name)579 :returns: the queue580 :raises QueueDoesNotExist: if the queue does not exist581 """582 key = resolve_queue_key(context, queue_name, queue_url)583 return self._require_queue(key)584 def create_queue(585 self,586 context: RequestContext,587 queue_name: String,588 attributes: QueueAttributeMap = None,589 tags: TagMap = None,590 ) -> CreateQueueResult:591 fifo = attributes and (592 attributes.get(QueueAttributeName.FifoQueue, "false").lower() == "true"593 )594 k = QueueKey(context.region, context.account_id, queue_name)595 # Special Case TODO: why is an emtpy policy passed at all? same in set_queue_attributes596 if attributes and attributes.get(QueueAttributeName.Policy) == "":597 del attributes[QueueAttributeName.Policy]598 if k in self.queues:599 raise QueueNameExists(queue_name)600 if fifo:601 queue = FifoQueue(k, attributes, tags)602 else:603 queue = StandardQueue(k, attributes, tags)604 LOG.debug("creating queue key=%s attributes=%s tags=%s", k, attributes, tags)605 self._add_queue(queue)606 return CreateQueueResult(QueueUrl=queue.url(context))607 def get_queue_url(608 self, context: RequestContext, queue_name: String, queue_owner_aws_account_id: String = None609 ) -> GetQueueUrlResult:610 account_id = queue_owner_aws_account_id or context.account_id611 key = QueueKey(context.region, account_id, queue_name)612 if key not in self.queues:613 raise QueueDoesNotExist("The specified queue does not exist for this wsdl version.")614 queue = self.queues[key]615 self._assert_permission(context, queue)616 return GetQueueUrlResult(QueueUrl=queue.url(context))617 def list_queues(618 self,619 context: RequestContext,620 queue_name_prefix: String = None,621 next_token: Token = None,622 max_results: BoxedInteger = None,623 ) -> ListQueuesResult:624 urls = []625 for queue in self.queues.values():626 if queue.key.region != context.region:627 continue628 if queue.key.account_id != context.account_id:629 continue630 if queue_name_prefix:631 if not queue.name.startswith(queue_name_prefix):632 continue633 urls.append(queue.url(context))634 if max_results:635 # FIXME: also need to solve pagination with stateful iterators: If the total number of items available is636 # more than the value specified, a NextToken is provided in the command's output. To resume pagination,637 # provide the NextToken value in the starting-token argument of a subsequent command. Do not use the638 # NextToken response element directly outside of the AWS CLI.639 urls = urls[:max_results]640 return ListQueuesResult(QueueUrls=urls)641 def change_message_visibility(642 self,643 context: RequestContext,644 queue_url: String,645 receipt_handle: String,646 visibility_timeout: Integer,647 ) -> None:648 queue = self._resolve_queue(context, queue_url=queue_url)649 self._assert_permission(context, queue)650 queue.update_visibility_timeout(receipt_handle, visibility_timeout)651 def change_message_visibility_batch(652 self,653 context: RequestContext,654 queue_url: String,655 entries: ChangeMessageVisibilityBatchRequestEntryList,656 ) -> ChangeMessageVisibilityBatchResult:657 queue = self._resolve_queue(context, queue_url=queue_url)658 self._assert_permission(context, queue)659 self._assert_batch(entries)660 successful = []661 failed = []662 with queue.mutex:663 for entry in entries:664 try:665 queue.update_visibility_timeout(666 entry["ReceiptHandle"], entry["VisibilityTimeout"]667 )668 successful.append({"Id": entry["Id"]})669 except Exception as e:670 failed.append(671 BatchResultErrorEntry(672 Id=entry["Id"],673 SenderFault=False,674 Code=e.__class__.__name__,675 Message=str(e),676 )677 )678 return ChangeMessageVisibilityBatchResult(679 Successful=successful,680 Failed=failed,681 )682 def delete_queue(self, context: RequestContext, queue_url: String) -> None:683 with self._mutex:684 queue = self._resolve_queue(context, queue_url=queue_url)685 self._assert_permission(context, queue)686 del self.queues[queue.key]687 def get_queue_attributes(688 self, context: RequestContext, queue_url: String, attribute_names: AttributeNameList = None689 ) -> GetQueueAttributesResult:690 queue = self._resolve_queue(context, queue_url=queue_url)691 self._assert_permission(context, queue)692 if not attribute_names:693 return GetQueueAttributesResult(Attributes={})694 if QueueAttributeName.All in attribute_names:695 # return GetQueueAttributesResult(Attributes=queue.attributes)696 attribute_names = queue.attributes.keys()697 result: Dict[QueueAttributeName, str] = {}698 for attr in attribute_names:699 try:700 getattr(QueueAttributeName, attr)701 except AttributeError:702 raise InvalidAttributeName(f"Unknown attribute {attr}.")703 if callable(queue.attributes.get(attr)):704 func = queue.attributes.get(attr)705 result[attr] = func()706 else:707 result[attr] = queue.attributes.get(attr)708 return GetQueueAttributesResult(Attributes=result)709 def send_message(710 self,711 context: RequestContext,712 queue_url: String,713 message_body: String,714 delay_seconds: Integer = None,715 message_attributes: MessageBodyAttributeMap = None,716 message_system_attributes: MessageBodySystemAttributeMap = None,717 message_deduplication_id: String = None,718 message_group_id: String = None,719 ) -> SendMessageResult:720 queue = self._resolve_queue(context, queue_url=queue_url)721 self._assert_permission(context, queue)722 message = self._put_message(723 queue,724 context,725 message_body,726 delay_seconds,727 message_attributes,728 message_system_attributes,729 message_deduplication_id,730 message_group_id,731 )732 return SendMessageResult(733 MessageId=message["MessageId"],734 MD5OfMessageBody=message["MD5OfBody"],735 MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),736 SequenceNumber=queue.generate_sequence_number(),737 MD5OfMessageSystemAttributes=_create_message_attribute_hash(message_system_attributes),738 )739 def send_message_batch(740 self, context: RequestContext, queue_url: String, entries: SendMessageBatchRequestEntryList741 ) -> SendMessageBatchResult:742 queue = self._resolve_queue(context, queue_url=queue_url)743 self._assert_permission(context, queue)744 self._assert_batch(entries)745 successful = []746 failed = []747 with queue.mutex:748 for entry in entries:749 try:750 message = self._put_message(751 queue,752 context,753 message_body=entry.get("MessageBody"),754 delay_seconds=entry.get("DelaySeconds"),755 message_attributes=entry.get("MessageAttributes"),756 message_system_attributes=entry.get("MessageSystemAttributes"),757 message_deduplication_id=entry.get("MessageDeduplicationId"),758 message_group_id=entry.get("MessageGroupId"),759 )760 successful.append(761 SendMessageBatchResultEntry(762 Id=entry["Id"],763 MessageId=message.get("MessageId"),764 MD5OfMessageBody=message.get("MD5OfBody"),765 MD5OfMessageAttributes=message.get("MD5OfMessageAttributes"),766 MD5OfMessageSystemAttributes=_create_message_attribute_hash(767 message.get("message_system_attributes")768 ),769 SequenceNumber=queue.generate_sequence_number(),770 )771 )772 except Exception as e:773 failed.append(774 BatchResultErrorEntry(775 Id=entry["Id"],776 SenderFault=False,777 Code=e.__class__.__name__,778 Message=str(e),779 )780 )781 return SendMessageBatchResult(782 Successful=successful,783 Failed=failed,784 )785 def _put_message(786 self,787 queue: SqsQueue,788 context: RequestContext,789 message_body: String,790 delay_seconds: Integer = None,791 message_attributes: MessageBodyAttributeMap = None,792 message_system_attributes: MessageBodySystemAttributeMap = None,793 message_deduplication_id: String = None,794 message_group_id: String = None,795 ) -> Message:796 # TODO: default message attributes (SenderId, ApproximateFirstReceiveTimestamp, ...)797 check_message_content(message_body)798 check_attributes(message_attributes)799 check_attributes(message_system_attributes)800 check_fifo_id(message_deduplication_id)801 check_fifo_id(message_group_id)802 message: Message = Message(803 MessageId=generate_message_id(),804 MD5OfBody=md5(message_body),805 Body=message_body,806 Attributes=self._create_message_attributes(context, message_system_attributes),807 MD5OfMessageAttributes=_create_message_attribute_hash(message_attributes),808 MessageAttributes=message_attributes,809 )810 delay_seconds = delay_seconds or queue.attributes.get(QueueAttributeName.DelaySeconds, "0")811 if int(delay_seconds):812 # FIXME: this is a pretty bad implementation (one thread per message...). polling on a priority queue813 # would probably be better. We also need access to delayed messages for the814 # ApproximateNumberrOfDelayedMessages attribute.815 threading.Timer(816 int(delay_seconds),817 queue.put,818 args=(message, message_deduplication_id, message_group_id),819 ).start()820 else:821 queue.put(822 message=message,823 message_deduplication_id=message_deduplication_id,824 message_group_id=message_group_id,825 )826 return message827 def receive_message(828 self,829 context: RequestContext,830 queue_url: String,831 attribute_names: AttributeNameList = None,832 message_attribute_names: MessageAttributeNameList = None,833 max_number_of_messages: Integer = None,834 visibility_timeout: Integer = None,835 wait_time_seconds: Integer = None,836 receive_request_attempt_id: String = None,837 ) -> ReceiveMessageResult:838 queue = self._resolve_queue(context, queue_url=queue_url)839 self._assert_permission(context, queue)840 num = max_number_of_messages or 1841 block = wait_time_seconds is not None842 # collect messages843 messages = []844 while num:845 try:846 standard_message = queue.get(847 block=block, timeout=wait_time_seconds, visibility_timeout=visibility_timeout848 )849 msg = standard_message.message850 except Empty:851 break852 moved_to_dlq = False853 if (854 queue.attributes855 and queue.attributes.get(QueueAttributeName.RedrivePolicy) is not None856 ):857 moved_to_dlq = self._dead_letter_check(queue, standard_message, context)858 if moved_to_dlq:859 continue860 # filter attributes861 if message_attribute_names:862 if "All" not in message_attribute_names:863 msg["MessageAttributes"] = {864 k: v865 for k, v in msg["MessageAttributes"].items()866 if k in message_attribute_names867 }868 # TODO: why is this called even if we receive "All" attributes?869 msg["MD5OfMessageAttributes"] = _create_message_attribute_hash(870 msg["MessageAttributes"]871 )872 else:873 del msg["MessageAttributes"]874 # add message to result875 messages.append(msg)876 num -= 1877 # TODO: how does receiving behave if the queue was deleted in the meantime?878 return ReceiveMessageResult(Messages=messages)879 def _dead_letter_check(880 self, queue: SqsQueue, std_m: SqsMessage, context: RequestContext881 ) -> bool:882 redrive_policy = json.loads(queue.attributes.get(QueueAttributeName.RedrivePolicy))883 # TODO: include the names of the dictionary sub - attributes in the autogenerated code?884 max_receive_count = redrive_policy["maxReceiveCount"]885 if std_m.receive_times > max_receive_count:886 dead_letter_target_arn = redrive_policy["deadLetterTargetArn"]887 dl_queue = self._require_queue_by_arn(dead_letter_target_arn)888 # TODO: this needs to be atomic?889 dead_message = std_m.message890 dl_queue.put(message=dead_message)891 queue.remove(std_m.message["ReceiptHandle"])892 return True893 else:894 return False895 def delete_message(896 self, context: RequestContext, queue_url: String, receipt_handle: String897 ) -> None:898 queue = self._resolve_queue(context, queue_url=queue_url)899 self._assert_permission(context, queue)900 queue.remove(receipt_handle)901 def delete_message_batch(902 self,903 context: RequestContext,904 queue_url: String,905 entries: DeleteMessageBatchRequestEntryList,906 ) -> DeleteMessageBatchResult:907 queue = self._resolve_queue(context, queue_url=queue_url)908 self._assert_permission(context, queue)909 self._assert_batch(entries)910 successful = []911 failed = []912 with queue.mutex:913 for entry in entries:914 try:915 queue.remove(entry["ReceiptHandle"])916 successful.append(DeleteMessageBatchResultEntry(Id=entry["Id"]))917 except Exception as e:918 failed.append(919 BatchResultErrorEntry(920 Id=entry["Id"],921 SenderFault=False,922 Code=e.__class__.__name__,923 Message=str(e),924 )925 )926 return DeleteMessageBatchResult(927 Successful=successful,928 Failed=failed,929 )930 def purge_queue(self, context: RequestContext, queue_url: String) -> None:931 queue = self._resolve_queue(context, queue_url=queue_url)932 self._assert_permission(context, queue)933 with self._mutex:934 # FIXME: use queue-specific locks935 if queue.purge_in_progress:936 raise PurgeQueueInProgress()937 queue.purge_in_progress = True938 # TODO: how do other methods behave when purge is in progress?939 try:940 while True:941 queue.visible.get_nowait()942 except Empty:943 return944 finally:945 queue.purge_in_progress = False946 def set_queue_attributes(947 self, context: RequestContext, queue_url: String, attributes: QueueAttributeMap948 ) -> None:949 queue = self._resolve_queue(context, queue_url=queue_url)950 if not attributes:951 return952 queue.validate_queue_attributes(attributes)953 for k, v in attributes.items():954 queue.attributes[k] = v955 # Special cases956 if queue.attributes.get(QueueAttributeName.Policy) == "":957 del queue.attributes[QueueAttributeName.Policy]958 redrive_policy = queue.attributes.get(QueueAttributeName.RedrivePolicy)959 if redrive_policy:960 _redrive_policy = json.loads(redrive_policy)961 dl_target_arn = _redrive_policy.get("deadLetterTargetArn")962 max_receive_count = _redrive_policy.get("maxReceiveCount")963 # TODO: use the actual AWS responses964 if not dl_target_arn:965 raise InvalidParameterValue(966 "The required parameter 'deadLetterTargetArn' is missing"967 )968 if not max_receive_count:969 raise InvalidParameterValue("The required parameter 'maxReceiveCount' is missing")970 try:971 max_receive_count = int(max_receive_count)972 valid_count = 1 <= max_receive_count <= 1000973 except ValueError:974 valid_count = False975 if not valid_count:976 raise InvalidParameterValue(977 f"Value {redrive_policy} for parameter RedrivePolicy is invalid. Reason: Invalid value for maxReceiveCount: {max_receive_count}, valid values are from 1 to 1000 both inclusive."978 )979 def tag_queue(self, context: RequestContext, queue_url: String, tags: TagMap) -> None:980 queue = self._resolve_queue(context, queue_url=queue_url)981 self._assert_permission(context, queue)982 if not tags:983 return984 for k, v in tags.items():985 queue.tags[k] = v986 def list_queue_tags(self, context: RequestContext, queue_url: String) -> ListQueueTagsResult:987 queue = self._resolve_queue(context, queue_url=queue_url)988 self._assert_permission(context, queue)989 return ListQueueTagsResult(Tags=queue.tags)990 def untag_queue(self, context: RequestContext, queue_url: String, tag_keys: TagKeyList) -> None:991 queue = self._resolve_queue(context, queue_url=queue_url)992 self._assert_permission(context, queue)993 for k in tag_keys:994 if k in queue.tags:995 del queue.tags[k]996 def add_permission(997 self,998 context: RequestContext,999 queue_url: String,1000 label: String,1001 aws_account_ids: AWSAccountIdList,1002 actions: ActionNameList,1003 ) -> None:1004 queue = self._resolve_queue(context, queue_url=queue_url)1005 self._assert_permission(context, queue)1006 self._validate_actions(actions)1007 for account_id in aws_account_ids:1008 for action in actions:1009 queue.permissions.add(Permission(label, account_id, action))1010 def remove_permission(self, context: RequestContext, queue_url: String, label: String) -> None:1011 queue = self._resolve_queue(context, queue_url=queue_url)1012 self._assert_permission(context, queue)1013 candidates = [p for p in queue.permissions if p.label == label]1014 if candidates:1015 queue.permissions.remove(candidates[0])1016 def _create_message_attributes(1017 self,1018 context: RequestContext,1019 message_system_attributes: MessageBodySystemAttributeMap = None,1020 ) -> Dict[MessageSystemAttributeName, str]:1021 result: Dict[MessageSystemAttributeName, str] = {1022 MessageSystemAttributeName.SenderId: context.account_id,1023 MessageSystemAttributeName.SentTimestamp: str(now()),1024 }1025 if message_system_attributes is not None:1026 for attr in message_system_attributes:1027 result[attr] = message_system_attributes[attr]["StringValue"]1028 return result1029 def _validate_queue_attributes(self, attributes: QueueAttributeMap):1030 valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]1031 for k in attributes.keys():1032 if k not in valid:1033 raise InvalidAttributeName("Unknown Attribute %s" % k)1034 def _validate_actions(self, actions: ActionNameList):1035 service = load_service(service=self.service, version=self.version)1036 # FIXME: this is a bit of a heuristic as it will also include actions like "ListQueues" which is not1037 # associated with an action on a queue1038 valid = list(service.operation_names)1039 valid.append("*")1040 for action in actions:1041 if action not in valid:1042 raise InvalidParameterValue(1043 f"Value SQS:{action} for parameter ActionName is invalid. Reason: Please refer to the appropriate "...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!