Best Python code snippet using localstack_python
provider.py
Source:provider.py
...172 inflight: Set[SqsMessage]173 receipts: Dict[str, SqsMessage]174 def __init__(self, key: QueueKey, attributes=None, tags=None) -> None:175 super().__init__()176 self._assert_queue_name(key.name)177 self.key = key178 self.tags = tags or {}179 self.visible = PriorityQueue()180 self.inflight = set()181 self.receipts = {}182 self.attributes = self.default_attributes()183 if attributes:184 self.attributes.update(attributes)185 self.purge_in_progress = False186 self.permissions = set()187 self.mutex = threading.RLock()188 def default_attributes(self) -> QueueAttributeMap:189 return {190 QueueAttributeName.QueueArn: self.arn,191 QueueAttributeName.ApproximateNumberOfMessages: self.visible._qsize,192 QueueAttributeName.ApproximateNumberOfMessagesNotVisible: lambda: len(self.inflight),193 QueueAttributeName.ApproximateNumberOfMessagesDelayed: "0", # FIXME: this should also be callable194 QueueAttributeName.CreatedTimestamp: str(now()),195 QueueAttributeName.LastModifiedTimestamp: str(now()),196 QueueAttributeName.VisibilityTimeout: "30",197 QueueAttributeName.MaximumMessageSize: "262144",198 QueueAttributeName.MessageRetentionPeriod: "345600",199 QueueAttributeName.DelaySeconds: "0",200 QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",201 }202 def update_last_modified(self, timestamp: int = None):203 if timestamp is None:204 timestamp = now()205 self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)206 @property207 def name(self):208 return self.key.name209 @property210 def owner(self):211 return self.key.account_id212 @property213 def arn(self) -> str:214 return f"arn:aws:sqs:{self.key.region}:{self.key.account_id}:{self.key.name}"215 def url(self, context: RequestContext) -> str:216 """Return queue URL using either SQS_PORT_EXTERNAL (if configured), or based on the 'Host' request header"""217 host_url = context.request.host_url218 if config.SQS_PORT_EXTERNAL:219 host_url = external_service_url("sqs")220 return "{host}/{account_id}/{name}".format(221 host=host_url.rstrip("/"),222 account_id=self.key.account_id,223 name=self.key.name,224 )225 @property226 def visibility_timeout(self) -> int:227 return int(self.attributes[QueueAttributeName.VisibilityTimeout])228 def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):229 with self.mutex:230 if receipt_handle not in self.receipts:231 raise ReceiptHandleIsInvalid(232 f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'233 )234 standard_message = self.receipts[receipt_handle]235 if standard_message not in self.inflight:236 raise MessageNotInflight()237 standard_message.visibility_timeout = visibility_timeout238 if visibility_timeout == 0:239 LOG.info(240 "terminating the visibility timeout of %s",241 standard_message.message["MessageId"],242 )243 # Terminating the visibility timeout for a message244 # https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html#terminating-message-visibility-timeout245 self.inflight.remove(standard_message)246 self.visible.put_nowait(standard_message)247 def remove(self, receipt_handle: str):248 with self.mutex:249 if receipt_handle not in self.receipts:250 LOG.debug(251 "no in-flight message found for receipt handle %s in queue %s",252 receipt_handle,253 self.arn,254 )255 return256 standard_message = self.receipts[receipt_handle]257 standard_message.deleted = True258 LOG.debug(259 "deleting message %s from queue %s", standard_message.message["MessageId"], self.arn260 )261 # remove all handles262 for handle in standard_message.receipt_handles:263 del self.receipts[handle]264 standard_message.receipt_handles.clear()265 # remove in-flight message266 try:267 self.inflight.remove(standard_message)268 except KeyError:269 # this means the message was re-queued in the meantime270 # TODO: remove this message from the visible queue if it exists: a message can be removed with an old271 # receipt handle that was issued before the message was put back in the visible queue.272 self.visible.queue.remove(standard_message)273 heapq.heapify(self.visible.queue)274 pass275 def put(276 self,277 message: Message,278 visibility_timeout: int = None,279 message_deduplication_id: str = None,280 message_group_id: str = None,281 ):282 raise NotImplementedError283 def get(self, block=True, timeout=None, visibility_timeout: int = None) -> SqsMessage:284 start = time.time()285 while True:286 standard_message: SqsMessage = self.visible.get(block=block, timeout=timeout)287 LOG.debug(288 "de-queued message %s from %s", standard_message.message["MessageId"], self.arn289 )290 with self.mutex:291 if standard_message.deleted:292 # TODO: check what the behavior of AWS is here. should we return a deleted message?293 timeout -= time.time() - start294 if timeout < 0:295 timeout = 0296 continue297 # update message attributes298 standard_message.visibility_timeout = (299 self.visibility_timeout if visibility_timeout is None else visibility_timeout300 )301 standard_message.receive_times += 1302 standard_message.last_received = time.time()303 if standard_message.first_received is None:304 standard_message.first_received = standard_message.last_received305 # create and manage receipt handle306 receipt_handle = generate_receipt_handle()307 standard_message.receipt_handles.add(receipt_handle)308 self.receipts[receipt_handle] = standard_message309 if standard_message.visibility_timeout == 0:310 self.visible.put_nowait(standard_message)311 else:312 self.inflight.add(standard_message)313 # prepare message for receiver314 # TODO: update message attributes (ApproximateFirstReceiveTimestamp, ApproximateReceiveCount)315 copied_message = copy.deepcopy(standard_message)316 copied_message.message["ReceiptHandle"] = receipt_handle317 return copied_message318 def requeue_inflight_messages(self):319 if not self.inflight:320 return321 with self.mutex:322 messages = list(self.inflight)323 for standard_message in messages:324 if standard_message.is_visible:325 LOG.debug(326 "re-queueing inflight messages %s into queue %s",327 standard_message.message["MessageId"],328 self.arn,329 )330 self.inflight.remove(standard_message)331 self.visible.put_nowait(standard_message)332 def _assert_queue_name(self, name):333 if not re.match(r"^[a-zA-Z0-9_-]{1,80}$", name):334 raise InvalidParameterValue(335 "Can only include alphanumeric characters, hyphens, or underscores. 1 to 80 in length"336 )337 def validate_queue_attributes(self, attributes):338 valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]339 del valid[valid.index(QueueAttributeName.FifoQueue)]340 for k in attributes.keys():341 if k not in valid:342 raise InvalidAttributeName(f"Unknown attribute name {k}")343 def generate_sequence_number(self):344 return None345class StandardQueue(SqsQueue):346 def put(347 self,348 message: Message,349 visibility_timeout: int = None,350 message_deduplication_id: str = None,351 message_group_id: str = None,352 ):353 if message_deduplication_id:354 raise InvalidParameterValue(355 f"Value {message_deduplication_id} for parameter MessageDeduplicationId is invalid. Reason: The "356 f"request includes a parameter that is not valid for this queue type. "357 )358 if message_group_id:359 raise InvalidParameterValue(360 f"Value {message_group_id} for parameter MessageGroupId is invalid. Reason: The request includes a "361 f"parameter that is not valid for this queue type. "362 )363 standard_message = SqsMessage(time.time(), message)364 if visibility_timeout is not None:365 standard_message.visibility_timeout = visibility_timeout366 else:367 # use the attribute from the queue368 standard_message.visibility_timeout = self.visibility_timeout369 self.visible.put_nowait(standard_message)370class FifoQueue(SqsQueue):371 visible: PriorityQueue372 inflight: Set[SqsMessage]373 receipts: Dict[str, SqsMessage]374 deduplication: Dict[str, Dict[str, SqsMessage]]375 def __init__(self, key: QueueKey, attributes=None, tags=None) -> None:376 super().__init__(key, attributes, tags)377 self.deduplication = {}378 def put(379 self,380 message: Message,381 visibility_timeout: int = None,382 message_deduplication_id: str = None,383 message_group_id: str = None,384 ):385 if not message_group_id:386 raise MissingParameter("The request must contain the parameter MessageGroupId.")387 dedup_id = message_deduplication_id388 content_based_deduplication = (389 "true"390 == (self.attributes.get(QueueAttributeName.ContentBasedDeduplication, "false")).lower()391 )392 if not dedup_id and content_based_deduplication:393 dedup_id = hashlib.sha256(message.get("Body").encode("utf-8")).hexdigest()394 if not dedup_id:395 raise InvalidParameterValue(396 "The Queue should either have ContentBasedDeduplication enabled or MessageDeduplicationId provided "397 "explicitly "398 )399 qm = SqsMessage(400 time.time(),401 message,402 message_deduplication_id=dedup_id,403 message_group_id=message_group_id,404 )405 if visibility_timeout is not None:406 qm.visibility_timeout = visibility_timeout407 else:408 # use the attribute from the queue409 qm.visibility_timeout = self.visibility_timeout410 original_message = None411 original_message_group = self.deduplication.get(message_group_id)412 if original_message_group:413 original_message = original_message_group.get(dedup_id)414 if (415 original_message416 and not original_message.deleted417 and original_message.priority + DEDUPLICATION_INTERVAL_IN_SEC > qm.priority418 ):419 message["MessageId"] = original_message.message["MessageId"]420 else:421 self.visible.put_nowait(qm)422 if not original_message_group:423 self.deduplication[message_group_id] = {}424 self.deduplication[message_group_id][message_deduplication_id] = qm425 def _assert_queue_name(self, name):426 if not name.endswith(".fifo"):427 raise InvalidParameterValue(428 "The name of a FIFO queue can only include alphanumeric characters, hyphens, or underscores, "429 "must end with .fifo suffix and be 1 to 80 in length"430 )431 # The .fifo suffix counts towards the 80-character queue name quota.432 queue_name = name[:-5] + "_fifo"433 super()._assert_queue_name(queue_name)434 def validate_queue_attributes(self, attributes):435 valid = [k[1] for k in inspect.getmembers(QueueAttributeName)]436 for k in attributes.keys():437 if k not in valid:438 raise InvalidAttributeName(f"Unknown attribute name {k}")439 # Special Cases440 fifo = attributes.get(QueueAttributeName.FifoQueue)441 if fifo and fifo.lower() != "true":442 raise InvalidAttributeValue(443 "Invalid value for the parameter FifoQueue. Reason: Modifying queue type is not supported."444 )445 # TODO: If we ever actually need to do something with this number, it needs to be part of446 # SQSMessage. This means changing all *put*() signatures to return the saved message.447 def generate_sequence_number(self):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!