How to use get_subscriber_client method in testcontainers-python

Best Python code snippet using testcontainers-python_python

pubsubutil.py

Source: pubsubutil.py Github

copy

Full Screen

...10def get_publisher_client(config):11 credential_path = access_secret_version(config['ProjectId'], config['ProjectId'], version_id=1)12 client = pubsub_v1.PublisherClient().from_service_account_json(credential_path)13 return client14def get_subscriber_client(config):15 credential_path = access_secret_version(config['ProjectId'], config['ProjectId'], version_id=1)16 client = pubsub_v1.SubscriberClient().from_service_account_json(credential_path)17 return client18def createpub(config):19 from google.cloud import pubsub_v120 publisher = get_publisher_client(config)21 topic_path = publisher.topic_path(config['ProjectId'], config['TopicName'])22 kms_key_name = None23 if 'Encryption_Key' in config.keys():24 kms_key_name = None if config['Encryption_Key'] == '' else config['Encryption_Key']25 try:26 topic = Topic(name = topic_path , kms_key_name=kms_key_name)27 topic = publisher.create_topic(topic)28 print("Topic created: {}".format(topic))29 except BaseException as e:30 print(e)31def createsubscription(config):32 push_config = None33 expiration_policy = None34 ack_deadline_seconds = None35 message_retention_duration = None36 retain_acked_messages = None37 enable_message_ordering = None38 dead_letter_policy = None39 retry = None40 subscriber = get_subscriber_client(config)41 topic_name = 'projects/​{project_id}/​topics/​{topic}'.format(42 project_id=config['ProjectId'],43 topic=config['TopicName'], # Set this to something appropriate.44 )45 subscription_name = 'projects/​{project_id}/​subscriptions/​{sub}'.format(46 project_id=config['ProjectId'],47 sub=config['SubscritonName'], # Set this to something appropriate.48 )49 if all(['Delivery_type', 'push_endpoint']) in config.keys():50 push_config = None if config['Delivery_type'] == 'Pull' else {'push_endpoint': config['push_endpoint'],51 'attributes': config['attributes']}52 if all(['Subscription_expiration_duration']) in config.keys():53 expiration_policy = None if config['Subscription_expiration_duration'] == '' else {54 'ttl': config['Subscription_expiration_duration']}55 if all(['Acknowledgement_deadline']) in config.keys():56 ack_deadline_seconds = None if config['Acknowledgement_deadline'] == '' else config['Acknowledgement_deadline']57 if all(['Retain_acknowledged_messages', 'Message_retention_duration']) in config.keys():58 retain_acked_messages = True if config['Retain_acknowledged_messages'] == 'Yes' else False59 message_retention_duration = config['Message_retention_duration'] if retain_acked_messages == True else None60 if all(['Message_ordering']) in config.keys():61 enable_message_ordering = True if config['Message_ordering'] == 'Yes' else False62 if all(['Dead_letter_policy_enable', 'dead_letter_topic', 'max_delivery_attempts']) in config.keys():63 dead_letter_policy = None if config['Dead_letter_policy_enable'] == 'No' else {64 'dead_letter_topic': config['dead_letter_topic'], 'max_delivery_attempts': config['max_delivery_attempts']}65 if all(['minimum_backoff', 'maximum_backoff', 'Retry_policy_enable']) in config.keys():66 retry = Retry(initial=config['minimum_backoff'], maximum=config['maximum_backoff']) if config['Retry_policy_enable'] == 'Yes' else None67 try:68 subscription = Subscription(69 name=subscription_name, topic=topic_name, push_config=push_config, expiration_policy=expiration_policy,70 ack_deadline_seconds=ack_deadline_seconds, retain_acked_messages=retain_acked_messages,71 message_retention_duration=message_retention_duration, enable_message_ordering=enable_message_ordering,72 dead_letter_policy=dead_letter_policy, retry_policy=retry73 )74 subscriber.create_subscription(subscription)75 print("Subscription created: {}".format(subscription_name))76 except BaseException as e:77 print(e)78def checktopicavailable(config):79 print('Verifying the topic')80 try:81 client = get_publisher_client(config)82 topic = client.topic_path(config['ProjectId'], config['TopicName'])83 response = client.get_topic(topic=topic)84 return response85 except google.api_core.exceptions.NotFound as er:86 print('Topic doesnot exist. Creating New Topic :{}'.format(config['TopicName']))87 createpub(config)88def checksubscriptionavailable(config):89 print('Verifying the subsciption ')90 client = get_subscriber_client(config)91 try:92 subscription = client.subscription_path(config['ProjectId'], config['SubscritonName'])93 response = client.get_subscription(subscription=subscription)94 return response95 except google.api_core.exceptions.NotFound as er:96 print('Subscription doesnot exist. Creating New Subscription :{}'.format(subscription))97 createsubscription(config)98def publishmessage(config, message):99 checktopicavailable(config)100 batch_settings = ()101 if config['Publish_type'] == 'Recurring':102 max_messages = int(config['Number_of_messages']) # default 100103 max_latency = int(config['Message_interval']) # default 10 ms104 batch_settings = pubsub_v1.types.BatchSettings(...

Full Screen

Full Screen

subscriber.py

Source: subscriber.py Github

copy

Full Screen

...31 The logger that will be used to send messages to stdout.32 """33 super().__init__(logger)34 # get subscriber35 self.subscriber = self.get_subscriber_client(credentials_file)36 self.subscription_path = self.get_subscription_path(project, subscription_id)37 @staticmethod38 def get_subscriber_client(credentials_file: str) -> pubsub.subscriber.Client:39 """Get a subscriber client.40 Parameters41 ----------42 credentials_file : str43 Path to credentials file.44 Returns45 -------46 pubsub.subscriber.Client47 Instance of subscriber client object created with the provided key.48 """49 return pubsub.subscriber.Client.from_service_account_file(credentials_file) # noqa: E50150 def get_subscription_path(self, project: str, subscription_id: str) -> str:51 """Get the subscription path.52 Parameters...

Full Screen

Full Screen

create_subscription.py

Source: create_subscription.py Github

copy

Full Screen

...17@click.argument("subscription")18def main(project: str, endpoint: str, topic: str, subscription: str) -> None:19 # Create PubSub clients20 publisher_client = get_publisher_client(endpoint)21 subscriber_client = get_subscriber_client(endpoint)22 # Create subscriptions23 topic_path = publisher_client.topic_path(project, topic)24 subscription_path = subscriber_client.subscription_path(project, subscription)25 subscription_obj = subscriber_client.create_subscription(26 request={"name": subscription_path, "topic": topic_path}27 )28 print(f"Created subscription {subscription_obj.name}")29if __name__ == "__main__":...

Full Screen

Full Screen

Blogs

Check out the latest blogs from LambdaTest on this topic:

Your Favorite Dev Browser Has Evolved! The All New LT Browser 2.0

We launched LT Browser in 2020, and we were overwhelmed by the response as it was awarded as the #5 product of the day on the ProductHunt platform. Today, after 74,585 downloads and 7,000 total test runs with an average of 100 test runs each day, the LT Browser has continued to help developers build responsive web designs in a jiffy.

Are Agile Self-Managing Teams Realistic with Layered Management?

Agile software development stems from a philosophy that being agile means creating and responding to change swiftly. Agile means having the ability to adapt and respond to change without dissolving into chaos. Being Agile involves teamwork built on diverse capabilities, skills, and talents. Team members include both the business and software development sides working together to produce working software that meets or exceeds customer expectations continuously.

How To Write End-To-End Tests Using Cypress App Actions

When I started writing tests with Cypress, I was always going to use the user interface to interact and change the application’s state when running tests.

Getting Rid of Technical Debt in Agile Projects

Technical debt was originally defined as code restructuring, but in today’s fast-paced software delivery environment, it has evolved. Technical debt may be anything that the software development team puts off for later, such as ineffective code, unfixed defects, lacking unit tests, excessive manual tests, or missing automated tests. And, like financial debt, it is challenging to pay back.

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run testcontainers-python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful