How to use global_message_sequence method in localstack

Best Python code snippet using localstack_python

provider.py

Source:provider.py Github

copy

Full Screen

...110class MissingParameter(CommonServiceException):111 def __init__(self, message):112 super().__init__("MissingParameter", message, 400, True)113@singleton_factory114def global_message_sequence():115 # creates a 20-digit number used as the start for the global sequence116 start = int(time.time()) << 33117 # itertools.count is thread safe over the GIL since its getAndIncrement operation is a single python bytecode op118 return itertools.count(start)119def generate_message_id():120 return long_uid()121def assert_queue_name(queue_name: str, fifo: bool = False):122 if queue_name.endswith(".fifo"):123 if not fifo:124 # Standard queues with .fifo suffix are not allowed125 raise InvalidParameterValue(126 "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"127 )128 # The .fifo suffix counts towards the 80-character queue name quota.129 queue_name = queue_name[:-5] + "_fifo"130 # slashes are actually not allowed, but we've allowed it explicitly in localstack131 if not re.match(r"^[a-zA-Z0-9/_-]{1,80}$", queue_name):132 raise InvalidParameterValue(133 "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"134 )135def check_message_size(message_body: str, max_message_size: int):136 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html137 error = "One or more parameters are invalid. "138 error += f"Reason: Message must be shorter than {max_message_size} bytes."139 # must encode as utf8 to get correct bytes with len140 if len(message_body.encode("utf8")) > max_message_size:141 raise InvalidParameterValue(error)142def check_message_content(message_body: str):143 error = "Invalid characters found. Valid unicode characters are #x9 | #xA | #xD | #x20 to #xD7FF | #xE000 to #xFFFD | #x10000 to #x10FFFF"144 if not re.match(MSG_CONTENT_REGEX, message_body):145 raise InvalidMessageContents(error)146def encode_receipt_handle(queue_arn, message: "SqsMessage") -> str:147 # http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/ImportantIdentifiers.html#ImportantIdentifiers-receipt-handles148 # encode the queue arn in the receipt handle, so we can later check if it belongs to the queue149 # but also add some randomness s.t. the generated receipt handles look like the ones from AWS150 handle = f"{long_uid()} {queue_arn} {message.message.get('MessageId')} {message.last_received}"151 encoded = base64.b64encode(handle.encode("utf-8"))152 return encoded.decode("utf-8")153def decode_receipt_handle(receipt_handle: str) -> str:154 try:155 handle = base64.b64decode(receipt_handle).decode("utf-8")156 _, queue_arn, message_id, last_received = handle.split(" ")157 parse_arn(queue_arn) # raises a ValueError if it is not an arn158 return queue_arn159 except (IndexError, ValueError):160 raise ReceiptHandleIsInvalid(161 f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'162 )163class Permission(NamedTuple):164 # TODO: just a placeholder for real policies165 label: str166 account_id: str167 action: str168class SqsMessage:169 message: Message170 created: float171 visibility_timeout: int172 receive_times: int173 delay_seconds: Optional[int]174 receipt_handles: Set[str]175 last_received: Optional[float]176 first_received: Optional[float]177 visibility_deadline: Optional[float]178 deleted: bool179 priority: float180 message_deduplication_id: str181 message_group_id: str182 sequence_number: str183 def __init__(184 self,185 priority: float,186 message: Message,187 message_deduplication_id: str = None,188 message_group_id: str = None,189 sequence_number: str = None,190 ) -> None:191 self.created = time.time()192 self.message = message193 self.receive_times = 0194 self.receipt_handles = set()195 self.delay_seconds = None196 self.last_received = None197 self.first_received = None198 self.deleted = False199 self.priority = priority200 self.sequence_number = sequence_number201 attributes = {}202 if message_group_id is not None:203 attributes["MessageGroupId"] = message_group_id204 if message_deduplication_id is not None:205 attributes["MessageDeduplicationId"] = message_deduplication_id206 if sequence_number is not None:207 attributes["SequenceNumber"] = sequence_number208 if self.message.get("Attributes"):209 self.message["Attributes"].update(attributes)210 else:211 self.message["Attributes"] = attributes212 @property213 def message_group_id(self) -> Optional[str]:214 return self.message["Attributes"].get("MessageGroupId")215 @property216 def message_deduplication_id(self) -> Optional[str]:217 return self.message["Attributes"].get("MessageDeduplicationId")218 def set_last_received(self, timestamp: float):219 """220 Sets the last received timestamp of the message to the given value, and updates the visibility deadline221 accordingly.222 :param timestamp: the last time the message was received223 """224 self.last_received = timestamp225 self.visibility_deadline = timestamp + self.visibility_timeout226 def update_visibility_timeout(self, timeout: int):227 """228 Sets the visibility timeout of the message to the given value, and updates the visibility deadline accordingly.229 :param timeout: the timeout value in seconds230 """231 self.visibility_timeout = timeout232 self.visibility_deadline = time.time() + timeout233 @property234 def is_visible(self) -> bool:235 """236 Returns false if the message has a visibility deadline that is in the future.237 :return: whether the message is visibile or not.238 """239 if self.visibility_deadline is None:240 return True241 if time.time() >= self.visibility_deadline:242 return True243 return False244 @property245 def is_delayed(self) -> bool:246 if self.delay_seconds is None:247 return False248 return time.time() <= self.created + self.delay_seconds249 def __gt__(self, other):250 return self.priority > other.priority251 def __ge__(self, other):252 return self.priority >= other.priority253 def __lt__(self, other):254 return self.priority < other.priority255 def __le__(self, other):256 return self.priority <= other.priority257 def __eq__(self, other):258 return self.message["MessageId"] == other.message["MessageId"]259 def __hash__(self):260 return self.message["MessageId"].__hash__()261class SqsQueue:262 name: str263 region: str264 account_id: str265 attributes: QueueAttributeMap266 tags: TagMap267 permissions: Set[Permission]268 purge_in_progress: bool269 purge_timestamp: Optional[float]270 visible: PriorityQueue271 delayed: Set[SqsMessage]272 inflight: Set[SqsMessage]273 receipts: Dict[str, SqsMessage]274 def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:275 self.name = name276 self.region = region277 self.account_id = account_id278 self._assert_queue_name(name)279 self.tags = tags or {}280 self.visible = PriorityQueue()281 self.delayed = set()282 self.inflight = set()283 self.receipts = {}284 self.attributes = self.default_attributes()285 if attributes:286 self.attributes.update(attributes)287 self.purge_in_progress = False288 self.purge_timestamp = None289 self.permissions = set()290 self.mutex = threading.RLock()291 def default_attributes(self) -> QueueAttributeMap:292 return {293 QueueAttributeName.ApproximateNumberOfMessages: lambda: self.visible._qsize(),294 QueueAttributeName.ApproximateNumberOfMessagesNotVisible: lambda: len(self.inflight),295 QueueAttributeName.ApproximateNumberOfMessagesDelayed: lambda: len(self.delayed),296 QueueAttributeName.CreatedTimestamp: str(now()),297 QueueAttributeName.DelaySeconds: "0",298 QueueAttributeName.LastModifiedTimestamp: str(now()),299 QueueAttributeName.MaximumMessageSize: str(DEFAULT_MAXIMUM_MESSAGE_SIZE),300 QueueAttributeName.MessageRetentionPeriod: "345600",301 QueueAttributeName.QueueArn: self.arn,302 QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",303 QueueAttributeName.VisibilityTimeout: "30",304 QueueAttributeName.SqsManagedSseEnabled: "false",305 }306 def update_delay_seconds(self, value: int):307 """308 For standard queues, the per-queue delay setting is not retroactive—changing the setting doesn't affect the delay of messages already in the queue.309 For FIFO queues, the per-queue delay setting is retroactive—changing the setting affects the delay of messages already in the queue.310 https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html311 :param value: the number of seconds312 """313 self.attributes[QueueAttributeName.DelaySeconds] = str(value)314 def update_last_modified(self, timestamp: int = None):315 if timestamp is None:316 timestamp = now()317 self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)318 @property319 def arn(self) -> str:320 return f"arn:aws:sqs:{self.region}:{self.account_id}:{self.name}"321 def url(self, context: RequestContext) -> str:322 """Return queue URL using either SQS_PORT_EXTERNAL (if configured), the SQS_ENDPOINT_STRATEGY (if configured)323 or based on the 'Host' request header"""324 host_url = context.request.host_url325 if config.SQS_ENDPOINT_STRATEGY == "domain":326 # queue.localhost.localstack.cloud:4566/000000000000/my-queue (us-east-1)327 # or us-east-2.queue.localhost.localstack.cloud:4566/000000000000/my-queue328 region = "" if self.region == "us-east-1" else self.region + "."329 scheme = context.request.scheme330 host_url = f"{scheme}://{region}queue.{constants.LOCALHOST_HOSTNAME}:{config.EDGE_PORT}"331 elif config.SQS_ENDPOINT_STRATEGY == "path":332 # https?://localhost:4566/queue/us-east-1/00000000000/my-queue (us-east-1)333 host_url = f"{context.request.host_url}/queue/{self.region}"334 else:335 if config.SQS_PORT_EXTERNAL:336 host_url = external_service_url("sqs")337 return "{host}/{account_id}/{name}".format(338 host=host_url.rstrip("/"),339 account_id=self.account_id,340 name=self.name,341 )342 @property343 def visibility_timeout(self) -> int:344 return int(self.attributes[QueueAttributeName.VisibilityTimeout])345 @property346 def delay_seconds(self) -> int:347 return int(self.attributes[QueueAttributeName.DelaySeconds])348 @property349 def wait_time_seconds(self) -> int:350 return int(self.attributes[QueueAttributeName.ReceiveMessageWaitTimeSeconds])351 @property352 def maximum_message_size(self):353 return int(self.attributes[QueueAttributeName.MaximumMessageSize])354 def validate_receipt_handle(self, receipt_handle: str):355 if self.arn != decode_receipt_handle(receipt_handle):356 raise ReceiptHandleIsInvalid(357 f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'358 )359 def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):360 with self.mutex:361 self.validate_receipt_handle(receipt_handle)362 if receipt_handle not in self.receipts:363 raise InvalidParameterValue(364 f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: Message does not exist "365 f"or is not available for visibility timeout change."366 )367 standard_message = self.receipts[receipt_handle]368 if standard_message not in self.inflight:369 raise MessageNotInflight()370 standard_message.update_visibility_timeout(visibility_timeout)371 if visibility_timeout == 0:372 LOG.info(373 "terminating the visibility timeout of %s",374 standard_message.message["MessageId"],375 )376 # Terminating the visibility timeout for a message377 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout378 self.inflight.remove(standard_message)379 self.visible.put_nowait(standard_message)380 def remove(self, receipt_handle: str):381 with self.mutex:382 self.validate_receipt_handle(receipt_handle)383 if receipt_handle not in self.receipts:384 LOG.debug(385 "no in-flight message found for receipt handle %s in queue %s",386 receipt_handle,387 self.arn,388 )389 return390 standard_message = self.receipts[receipt_handle]391 standard_message.deleted = True392 LOG.debug(393 "deleting message %s from queue %s",394 standard_message.message["MessageId"],395 self.arn,396 )397 # remove all handles398 for handle in standard_message.receipt_handles:399 del self.receipts[handle]400 standard_message.receipt_handles.clear()401 # remove in-flight message402 try:403 self.inflight.remove(standard_message)404 except KeyError:405 # this means the message was re-queued in the meantime406 # TODO: remove this message from the visible queue if it exists: a message can be removed with an old407 # receipt handle that was issued before the message was put back in the visible queue.408 self.visible.queue.remove(standard_message)409 heapq.heapify(self.visible.queue)410 def put(411 self,412 message: Message,413 visibility_timeout: int = None,414 message_deduplication_id: str = None,415 message_group_id: str = None,416 delay_seconds: int = None,417 ) -> SqsMessage:418 raise NotImplementedError419 def get(self, block=True, timeout=None, visibility_timeout: int = None) -> SqsMessage:420 start = time.time()421 while True:422 standard_message: SqsMessage = self.visible.get(block=block, timeout=timeout)423 LOG.debug(424 "de-queued message %s from %s", standard_message.message["MessageId"], self.arn425 )426 with self.mutex:427 if standard_message.deleted:428 # TODO: check what the behavior of AWS is here. should we return a deleted message?429 timeout -= time.time() - start430 if timeout < 0:431 timeout = 0432 continue433 # update message attributes434 standard_message.visibility_timeout = (435 self.visibility_timeout if visibility_timeout is None else visibility_timeout436 )437 standard_message.receive_times += 1438 standard_message.set_last_received(time.time())439 if standard_message.first_received is None:440 standard_message.first_received = standard_message.last_received441 # create and manage receipt handle442 receipt_handle = self.create_receipt_handle(standard_message)443 standard_message.receipt_handles.add(receipt_handle)444 self.receipts[receipt_handle] = standard_message445 if standard_message.visibility_timeout == 0:446 self.visible.put_nowait(standard_message)447 else:448 self.inflight.add(standard_message)449 # prepare message for receiver450 copied_message = copy.deepcopy(standard_message)451 copied_message.message["Attributes"][452 MessageSystemAttributeName.ApproximateReceiveCount453 ] = str(standard_message.receive_times)454 copied_message.message["Attributes"][455 MessageSystemAttributeName.ApproximateFirstReceiveTimestamp456 ] = str(int(standard_message.first_received * 1000))457 copied_message.message["ReceiptHandle"] = receipt_handle458 return copied_message459 def clear(self):460 """461 Calls clear on all internal datastructures that hold messages and data related to them.462 """463 with self.mutex:464 self.visible.queue.clear()465 self.inflight.clear()466 self.delayed.clear()467 self.receipts.clear()468 def create_receipt_handle(self, message: SqsMessage) -> str:469 return encode_receipt_handle(self.arn, message)470 def requeue_inflight_messages(self):471 if not self.inflight:472 return473 with self.mutex:474 messages = [message for message in self.inflight if message.is_visible]475 for standard_message in messages:476 LOG.debug(477 "re-queueing inflight messages %s into queue %s",478 standard_message.message["MessageId"],479 self.arn,480 )481 self.inflight.remove(standard_message)482 self.visible.put_nowait(standard_message)483 def enqueue_delayed_messages(self):484 if not self.delayed:485 return486 with self.mutex:487 messages = [message for message in self.delayed if not message.is_delayed]488 for standard_message in messages:489 LOG.debug(490 "enqueueing delayed messages %s into queue %s",491 standard_message.message["MessageId"],492 self.arn,493 )494 self.delayed.remove(standard_message)495 self.visible.put_nowait(standard_message)496 def _assert_queue_name(self, name):497 if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):498 raise InvalidParameterValue(499 "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"500 )501 def validate_queue_attributes(self, attributes):502 valid = [503 k[1]504 for k in inspect.getmembers(QueueAttributeName)505 if k not in INTERNAL_QUEUE_ATTRIBUTES506 ]507 del valid[valid.index(QueueAttributeName.FifoQueue)]508 for k in attributes.keys():509 if k not in valid:510 raise InvalidAttributeName(f"Unknown Attribute {k}.")511class StandardQueue(SqsQueue):512 def put(513 self,514 message: Message,515 visibility_timeout: int = None,516 message_deduplication_id: str = None,517 message_group_id: str = None,518 delay_seconds: int = None,519 ):520 if message_deduplication_id:521 raise InvalidParameterValue(522 f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "523 f"request includes a parameter that is not valid for this queue type. "524 )525 if message_group_id:526 raise InvalidParameterValue(527 f"Value {message_group_id} for parameter MessageGroupId is invalid. Reason: The request includes a "528 f"parameter that is not valid for this queue type. "529 )530 standard_message = SqsMessage(time.time(), message)531 if visibility_timeout is not None:532 standard_message.visibility_timeout = visibility_timeout533 else:534 # use the attribute from the queue535 standard_message.visibility_timeout = self.visibility_timeout536 if delay_seconds is not None:537 standard_message.delay_seconds = delay_seconds538 else:539 standard_message.delay_seconds = self.delay_seconds540 if standard_message.is_delayed:541 self.delayed.add(standard_message)542 else:543 self.visible.put_nowait(standard_message)544 return standard_message545class FifoQueue(SqsQueue):546 deduplication: Dict[str, Dict[str, SqsMessage]]547 def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:548 super().__init__(name, region, account_id, attributes, tags)549 self.deduplication = {}550 def default_attributes(self) -> QueueAttributeMap:551 return {552 **super().default_attributes(),553 QueueAttributeName.ContentBasedDeduplication: "false",554 QueueAttributeName.DeduplicationScope: "queue",555 QueueAttributeName.FifoThroughputLimit: "perQueue",556 }557 def update_delay_seconds(self, value: int):558 super(FifoQueue, self).update_delay_seconds(value)559 for message in self.delayed:560 message.delay_seconds = value561 def put(562 self,563 message: Message,564 visibility_timeout: int = None,565 message_deduplication_id: str = None,566 message_group_id: str = None,567 delay_seconds: int = None,568 ):569 if delay_seconds:570 # in fifo queues, delay is only applied on queue level. However, explicitly setting delay_seconds=0 is valid571 raise InvalidParameterValue(572 f"Value {delay_seconds} for parameter DelaySeconds is invalid. Reason: The request include parameter "573 f"that is not valid for this queue type."574 )575 if not message_group_id:576 raise MissingParameter("The request must contain the parameter MessageGroupId.")577 dedup_id = message_deduplication_id578 content_based_deduplication = (579 "true"580 == (self.attributes.get(QueueAttributeName.ContentBasedDeduplication, "false")).lower()581 )582 if not dedup_id and content_based_deduplication:583 dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()584 if not dedup_id:585 raise InvalidParameterValue(586 "The Queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided "587 "explicitly "588 )589 fifo_message = SqsMessage(590 time.time(),591 message,592 message_deduplication_id=dedup_id,593 message_group_id=message_group_id,594 sequence_number=str(self.next_sequence_number()),595 )596 if visibility_timeout is not None:597 fifo_message.visibility_timeout = visibility_timeout598 else:599 # use the attribute from the queue600 fifo_message.visibility_timeout = self.visibility_timeout601 if delay_seconds is not None:602 fifo_message.delay_seconds = delay_seconds603 else:604 fifo_message.delay_seconds = self.delay_seconds605 original_message = None606 original_message_group = self.deduplication.get(message_group_id)607 if original_message_group:608 original_message = original_message_group.get(dedup_id)609 if (610 original_message611 and not original_message.deleted612 and original_message.priority + DEDUPLICATION_INTERVAL_IN_SEC > fifo_message.priority613 ):614 message["MessageId"] = original_message.message["MessageId"]615 else:616 if fifo_message.is_delayed:617 self.delayed.add(fifo_message)618 else:619 self.visible.put_nowait(fifo_message)620 if not original_message_group:621 self.deduplication[message_group_id] = {}622 self.deduplication[message_group_id][dedup_id] = fifo_message623 return fifo_message624 def _assert_queue_name(self, name):625 if not name.endswith(".fifo"):626 raise InvalidParameterValue(627 "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "628 "must end with .fifo suffix and be 1 to 80 in length"629 )630 # The .fifo suffix counts towards the 80-character queue name quota.631 queue_name = name[:-5] + "_fifo"632 super()._assert_queue_name(queue_name)633 def validate_queue_attributes(self, attributes):634 valid = [635 k[1]636 for k in inspect.getmembers(QueueAttributeName)637 if k not in INTERNAL_QUEUE_ATTRIBUTES638 ]639 for k in attributes.keys():640 if k not in valid:641 raise InvalidAttributeName(f"Unknown Attribute {k}.")642 # Special Cases643 fifo = attributes.get(QueueAttributeName.FifoQueue)644 if fifo and fifo.lower() != "true":645 raise InvalidAttributeValue(646 "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."647 )648 def next_sequence_number(self):649 return next(global_message_sequence())650class QueueUpdateWorker:651 """652 Regularly re-queues inflight and delayed messages whose visibility timeout has expired or delay deadline has been653 reached.654 """655 def __init__(self) -> None:656 super().__init__()657 self.scheduler = Scheduler()658 self.thread: Optional[FuncThread] = None659 self.mutex = threading.RLock()660 def do_update_all_queues(self):661 for region in SqsBackend.regions().keys():662 backend = SqsBackend.get(region)663 for queue in backend.queues.values():...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful