Best Python code snippet using testcontainers-python_python
pubsubutil.py
Source:pubsubutil.py
...10def get_publisher_client(config):11 credential_path = access_secret_version(config['ProjectId'], config['ProjectId'], version_id=1)12 client = pubsub_v1.PublisherClient().from_service_account_json(credential_path)13 return client14def get_subscriber_client(config):15 credential_path = access_secret_version(config['ProjectId'], config['ProjectId'], version_id=1)16 client = pubsub_v1.SubscriberClient().from_service_account_json(credential_path)17 return client18def createpub(config):19 from google.cloud import pubsub_v120 publisher = get_publisher_client(config)21 topic_path = publisher.topic_path(config['ProjectId'], config['TopicName'])22 kms_key_name = None23 if 'Encryption_Key' in config.keys():24 kms_key_name = None if config['Encryption_Key'] == '' else config['Encryption_Key']25 try:26 topic = Topic(name = topic_path , kms_key_name=kms_key_name)27 topic = publisher.create_topic(topic)28 print("Topic created: {}".format(topic))29 except BaseException as e:30 print(e)31def createsubscription(config):32 push_config = None33 expiration_policy = None34 ack_deadline_seconds = None35 message_retention_duration = None36 retain_acked_messages = None37 enable_message_ordering = None38 dead_letter_policy = None39 retry = None40 subscriber = get_subscriber_client(config)41 topic_name = 'projects/{project_id}/topics/{topic}'.format(42 project_id=config['ProjectId'],43 topic=config['TopicName'], # Set this to something appropriate.44 )45 subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(46 project_id=config['ProjectId'],47 sub=config['SubscritonName'], # Set this to something appropriate.48 )49 if all(['Delivery_type', 'push_endpoint']) in config.keys():50 push_config = None if config['Delivery_type'] == 'Pull' else {'push_endpoint': config['push_endpoint'],51 'attributes': config['attributes']}52 if all(['Subscription_expiration_duration']) in config.keys():53 expiration_policy = None if config['Subscription_expiration_duration'] == '' else {54 'ttl': config['Subscription_expiration_duration']}55 if all(['Acknowledgement_deadline']) in config.keys():56 ack_deadline_seconds = None if config['Acknowledgement_deadline'] == '' else config['Acknowledgement_deadline']57 if all(['Retain_acknowledged_messages', 'Message_retention_duration']) in config.keys():58 retain_acked_messages = True if config['Retain_acknowledged_messages'] == 'Yes' else False59 message_retention_duration = config['Message_retention_duration'] if retain_acked_messages == True else None60 if all(['Message_ordering']) in config.keys():61 enable_message_ordering = True if config['Message_ordering'] == 'Yes' else False62 if all(['Dead_letter_policy_enable', 'dead_letter_topic', 'max_delivery_attempts']) in config.keys():63 dead_letter_policy = None if config['Dead_letter_policy_enable'] == 'No' else {64 'dead_letter_topic': config['dead_letter_topic'], 'max_delivery_attempts': config['max_delivery_attempts']}65 if all(['minimum_backoff', 'maximum_backoff', 'Retry_policy_enable']) in config.keys():66 retry = Retry(initial=config['minimum_backoff'], maximum=config['maximum_backoff']) if config['Retry_policy_enable'] == 'Yes' else None67 try:68 subscription = Subscription(69 name=subscription_name, topic=topic_name, push_config=push_config, expiration_policy=expiration_policy,70 ack_deadline_seconds=ack_deadline_seconds, retain_acked_messages=retain_acked_messages,71 message_retention_duration=message_retention_duration, enable_message_ordering=enable_message_ordering,72 dead_letter_policy=dead_letter_policy, retry_policy=retry73 )74 subscriber.create_subscription(subscription)75 print("Subscription created: {}".format(subscription_name))76 except BaseException as e:77 print(e)78def checktopicavailable(config):79 print('Verifying the topic')80 try:81 client = get_publisher_client(config)82 topic = client.topic_path(config['ProjectId'], config['TopicName'])83 response = client.get_topic(topic=topic)84 return response85 except google.api_core.exceptions.NotFound as er:86 print('Topic doesnot exist. Creating New Topic :{}'.format(config['TopicName']))87 createpub(config)88def checksubscriptionavailable(config):89 print('Verifying the subsciption ')90 client = get_subscriber_client(config)91 try:92 subscription = client.subscription_path(config['ProjectId'], config['SubscritonName'])93 response = client.get_subscription(subscription=subscription)94 return response95 except google.api_core.exceptions.NotFound as er:96 print('Subscription doesnot exist. Creating New Subscription :{}'.format(subscription))97 createsubscription(config)98def publishmessage(config, message):99 checktopicavailable(config)100 batch_settings = ()101 if config['Publish_type'] == 'Recurring':102 max_messages = int(config['Number_of_messages']) # default 100103 max_latency = int(config['Message_interval']) # default 10 ms104 batch_settings = pubsub_v1.types.BatchSettings(...
subscriber.py
Source:subscriber.py
...31 The logger that will be used to send messages to stdout.32 """33 super().__init__(logger)34 # get subscriber35 self.subscriber = self.get_subscriber_client(credentials_file)36 self.subscription_path = self.get_subscription_path(project, subscription_id)37 @staticmethod38 def get_subscriber_client(credentials_file: str) -> pubsub.subscriber.Client:39 """Get a subscriber client.40 Parameters41 ----------42 credentials_file : str43 Path to credentials file.44 Returns45 -------46 pubsub.subscriber.Client47 Instance of subscriber client object created with the provided key.48 """49 return pubsub.subscriber.Client.from_service_account_file(credentials_file) # noqa: E50150 def get_subscription_path(self, project: str, subscription_id: str) -> str:51 """Get the subscription path.52 Parameters...
create_subscription.py
Source:create_subscription.py
...17@click.argument("subscription")18def main(project: str, endpoint: str, topic: str, subscription: str) -> None:19 # Create PubSub clients20 publisher_client = get_publisher_client(endpoint)21 subscriber_client = get_subscriber_client(endpoint)22 # Create subscriptions23 topic_path = publisher_client.topic_path(project, topic)24 subscription_path = subscriber_client.subscription_path(project, subscription)25 subscription_obj = subscriber_client.create_subscription(26 request={"name": subscription_path, "topic": topic_path}27 )28 print(f"Created subscription {subscription_obj.name}")29if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!