Best Python code snippet using localstack_python
provider.py
Source:provider.py
...131 # but also add some randomness s.t. the generated receipt handles look like the ones from AWS132 handle = f"{long_uid()} {queue_arn} {message.message.get('MessageId')} {message.last_received}"133 encoded = base64.b64encode(handle.encode("utf-8"))134 return encoded.decode("utf-8")135def decode_receipt_handle(receipt_handle: str) -> str:136 try:137 handle = base64.b64decode(receipt_handle).decode("utf-8")138 _, queue_arn, message_id, last_received = handle.split(" ")139 parse_arn(queue_arn) # raises a ValueError if it is not an arn140 return queue_arn141 except (IndexError, ValueError):142 raise ReceiptHandleIsInvalid(143 f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'144 )145class Permission(NamedTuple):146 # TODO: just a placeholder for real policies147 label: str148 account_id: str149 action: str150class SqsMessage:151 message: Message152 created: float153 visibility_timeout: int154 receive_times: int155 delay_seconds: Optional[int]156 receipt_handles: Set[str]157 last_received: Optional[float]158 first_received: Optional[float]159 visibility_deadline: Optional[float]160 deleted: bool161 priority: float162 message_deduplication_id: str163 message_group_id: str164 def __init__(165 self,166 priority: float,167 message: Message,168 message_deduplication_id: str = None,169 message_group_id: str = None,170 ) -> None:171 self.created = time.time()172 self.message = message173 self.receive_times = 0174 self.receipt_handles = set()175 self.delay_seconds = None176 self.last_received = None177 self.first_received = None178 self.deleted = False179 self.priority = priority180 attributes = {}181 if message_group_id is not None:182 attributes["MessageGroupId"] = message_group_id183 if message_deduplication_id is not None:184 attributes["MessageDeduplicationId"] = message_deduplication_id185 if self.message.get("Attributes"):186 self.message["Attributes"].update(attributes)187 else:188 self.message["Attributes"] = attributes189 @property190 def message_group_id(self) -> Optional[str]:191 return self.message["Attributes"].get("MessageGroupId")192 @property193 def message_deduplication_id(self) -> Optional[str]:194 return self.message["Attributes"].get("MessageDeduplicationId")195 def set_last_received(self, timestamp: float):196 """197 Sets the last received timestamp of the message to the given value, and updates the visibility deadline198 accordingly.199 :param timestamp: the last time the message was received200 """201 self.last_received = timestamp202 self.visibility_deadline = timestamp + self.visibility_timeout203 def update_visibility_timeout(self, timeout: int):204 """205 Sets the visibility timeout of the message to the given value, and updates the visibility deadline accordingly.206 :param timeout: the timeout value in seconds207 """208 self.visibility_timeout = timeout209 self.visibility_deadline = time.time() + timeout210 @property211 def is_visible(self) -> bool:212 """213 Returns false if the message has a visibility deadline that is in the future.214 :return: whether the message is visibile or not.215 """216 if self.visibility_deadline is None:217 return True218 if time.time() >= self.visibility_deadline:219 return True220 return False221 @property222 def is_delayed(self) -> bool:223 if self.delay_seconds is None:224 return False225 return time.time() <= self.created + self.delay_seconds226 def __gt__(self, other):227 return self.priority > other.priority228 def __ge__(self, other):229 return self.priority >= other.priority230 def __lt__(self, other):231 return self.priority < other.priority232 def __le__(self, other):233 return self.priority <= other.priority234 def __eq__(self, other):235 return self.message["MessageId"] == other.message["MessageId"]236 def __hash__(self):237 return self.message["MessageId"].__hash__()238class SqsQueue:239 name: str240 region: str241 account_id: str242 attributes: QueueAttributeMap243 tags: TagMap244 permissions: Set[Permission]245 purge_in_progress: bool246 visible: PriorityQueue247 delayed: Set[SqsMessage]248 inflight: Set[SqsMessage]249 receipts: Dict[str, SqsMessage]250 def __init__(self, name: str, region: str, account_id: str, attributes=None, tags=None) -> None:251 self.name = name252 self.region = region253 self.account_id = account_id254 self._assert_queue_name(name)255 self.tags = tags or {}256 self.visible = PriorityQueue()257 self.delayed = set()258 self.inflight = set()259 self.receipts = {}260 self.attributes = self.default_attributes()261 if attributes:262 self.attributes.update(attributes)263 self.purge_in_progress = False264 self.permissions = set()265 self.mutex = threading.RLock()266 def default_attributes(self) -> QueueAttributeMap:267 return {268 QueueAttributeName.ApproximateNumberOfMessages: self.visible._qsize,269 QueueAttributeName.ApproximateNumberOfMessagesNotVisible: lambda: len(self.inflight),270 QueueAttributeName.ApproximateNumberOfMessagesDelayed: lambda: len(self.delayed),271 QueueAttributeName.CreatedTimestamp: str(now()),272 QueueAttributeName.DelaySeconds: "0",273 QueueAttributeName.LastModifiedTimestamp: str(now()),274 QueueAttributeName.MaximumMessageSize: "262144",275 QueueAttributeName.MessageRetentionPeriod: "345600",276 QueueAttributeName.QueueArn: self.arn,277 QueueAttributeName.ReceiveMessageWaitTimeSeconds: "0",278 QueueAttributeName.VisibilityTimeout: "30",279 QueueAttributeName.SqsManagedSseEnabled: "false",280 }281 def update_delay_seconds(self, value: int):282 """283 For standard queues, the per-queue delay setting is not retroactiveâchanging the setting doesn't affect the delay of messages already in the queue.284 For FIFO queues, the per-queue delay setting is retroactiveâchanging the setting affects the delay of messages already in the queue.285 https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-delay-queues.html286 :param value: the number of seconds287 """288 self.attributes[QueueAttributeName.DelaySeconds] = str(value)289 def update_last_modified(self, timestamp: int = None):290 if timestamp is None:291 timestamp = now()292 self.attributes[QueueAttributeName.LastModifiedTimestamp] = str(timestamp)293 @property294 def arn(self) -> str:295 return f"arn:aws:sqs:{self.region}:{self.account_id}:{self.name}"296 def url(self, context: RequestContext) -> str:297 """Return queue URL using either SQS_PORT_EXTERNAL (if configured), the SQS_ENDPOINT_STRATEGY (if configured)298 or based on the 'Host' request header"""299 host_url = context.request.host_url300 if config.SQS_ENDPOINT_STRATEGY == "domain":301 # queue.localhost.localstack.cloud:4566/000000000000/my-queue (us-east-1)302 # or us-east-2.queue.localhost.localstack.cloud:4566/000000000000/my-queue303 region = "" if self.region == "us-east-1" else self.region + "."304 scheme = context.request.scheme305 host_url = f"{scheme}://{region}queue.{constants.LOCALHOST_HOSTNAME}:{config.EDGE_PORT}"306 elif config.SQS_ENDPOINT_STRATEGY == "path":307 # https?://localhost:4566/queue/us-east-1/00000000000/my-queue (us-east-1)308 host_url = f"{context.request.host_url}/queue/{self.region}"309 else:310 if config.SQS_PORT_EXTERNAL:311 host_url = external_service_url("sqs")312 return "{host}/{account_id}/{name}".format(313 host=host_url.rstrip("/"),314 account_id=self.account_id,315 name=self.name,316 )317 @property318 def visibility_timeout(self) -> int:319 return int(self.attributes[QueueAttributeName.VisibilityTimeout])320 @property321 def delay_seconds(self) -> int:322 return int(self.attributes[QueueAttributeName.DelaySeconds])323 @property324 def wait_time_seconds(self) -> int:325 return int(self.attributes[QueueAttributeName.ReceiveMessageWaitTimeSeconds])326 def validate_receipt_handle(self, receipt_handle: str):327 if self.arn != decode_receipt_handle(receipt_handle):328 raise ReceiptHandleIsInvalid(329 f'The input receipt handle "{receipt_handle}" is not a valid receipt handle.'330 )331 def update_visibility_timeout(self, receipt_handle: str, visibility_timeout: int):332 with self.mutex:333 self.validate_receipt_handle(receipt_handle)334 if receipt_handle not in self.receipts:335 raise InvalidParameterValue(336 f"Value {receipt_handle} for parameter ReceiptHandle is invalid. Reason: Message does not exist "337 f"or is not available for visibility timeout change."338 )339 standard_message = self.receipts[receipt_handle]340 if standard_message not in self.inflight:341 raise MessageNotInflight()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!