Best Python code snippet using autotest_python
__init__.py
Source:__init__.py
...7from pcapi.core.search.backends import base8from pcapi.repository import offer_queries9from pcapi.utils.module_loading import import_string10logger = logging.getLogger(__name__)11def _get_backend() -> base.SearchBackend:12 backend_class = import_string(settings.SEARCH_BACKEND)13 return backend_class()14def async_index_offer_ids(offer_ids: Iterable[int]) -> None:15 """Ask for an asynchronous reindexation of the given list of16 ``Offer.id``.17 This function returns quickly. The "real" reindexation will be18 done later through a cron job.19 """20 backend = _get_backend()21 try:22 backend.enqueue_offer_ids(offer_ids)23 except Exception: # pylint: disable=broad-except24 if settings.IS_RUNNING_TESTS:25 raise26 logger.exception("Could not enqueue offer ids to index", extra={"offers": offer_ids})27def async_index_venue_ids(venue_ids: Iterable[int]) -> None:28 """Ask for an asynchronous reindexation of the given list of29 permanent ``Venue`` ids.30 This function returns quickly. The "real" reindexation will be31 done later through a cron job.32 """33 backend = _get_backend()34 try:35 backend.enqueue_venue_ids(venue_ids)36 except Exception: # pylint: disable=broad-except37 if settings.IS_RUNNING_TESTS:38 raise39 logger.exception("Could not enqueue venue ids to index", extra={"venues": venue_ids})40def async_index_offers_of_venue_ids(venue_ids: Iterable[int]) -> None:41 """Ask for an asynchronous reindexation of the offers attached to venues42 from the list of ``Venue.id``.43 This function returns quickly. The "real" reindexation will be44 done later through a cron job.45 """46 backend = _get_backend()47 try:48 backend.enqueue_venue_ids_for_offers(venue_ids)49 except Exception: # pylint: disable=broad-except50 if settings.IS_RUNNING_TESTS:51 raise52 logger.exception(53 "Could not enqueue venue ids to index their offers",54 extra={"venues": venue_ids},55 )56def index_offers_in_queue(stop_only_when_empty: bool = False, from_error_queue: bool = False) -> None:57 """Pop offers from indexation queue and reindex them.58 If ``from_error_queue`` is True, pop offers from the error queue59 instead of the "standard" indexation queue.60 If ``stop_only_when_empty`` is False (i.e. if called as a cron61 command), we pop from the queue at least once, and stop when there62 is less than REDIS_OFFER_IDS_CHUNK_SIZE in the queue (otherwise63 the cron job may never stop). It means that a cron job may run for64 a long time if the queue has many items. In fact, a subsequent65 cron job may run in parallel if the previous one has not finished.66 It's fine because they both pop from the queue.67 If ``stop_only_when_empty`` is True (i.e. if called from the68 ``process_offers`` Flask command), we pop from the queue and stop69 only when the queue is empty.70 """71 backend = _get_backend()72 while True:73 # We must pop and not get-and-delete. Otherwise two concurrent74 # cron jobs could delete the wrong offers from the queue:75 # 1. Cron job 1 gets the first 1.000 offers from the queue.76 # 2. Cron job 2 gets the same 1.000 offers from the queue.77 # 3. Cron job 1 finishes processing the batch and deletes the78 # first 1.000 offers from the queue. OK.79 # 4. Cron job 2 finishes processing the batch and also deletes80 # the first 1.000 offers from the queue. Not OK, these are81 # not the same offers it just processed!82 offer_ids = backend.pop_offer_ids_from_queue(83 count=settings.REDIS_OFFER_IDS_CHUNK_SIZE, from_error_queue=from_error_queue84 )85 if not offer_ids:86 break87 logger.info("Fetched offers from indexation queue", extra={"count": len(offer_ids)})88 try:89 reindex_offer_ids(offer_ids)90 except Exception as exc: # pylint: disable=broad-except91 if settings.IS_RUNNING_TESTS:92 raise93 logger.exception(94 "Exception while reindexing offers, must fix manually",95 extra={"exc": str(exc), "offers": offer_ids},96 )97 else:98 logger.info(99 "Reindexed offers from queue",100 extra={"count": len(offer_ids), "from_error_queue": from_error_queue},101 )102 left_to_process = backend.count_offers_to_index_from_queue(from_error_queue=from_error_queue)103 if not stop_only_when_empty and left_to_process < settings.REDIS_OFFER_IDS_CHUNK_SIZE:104 break105def index_venues_in_queue(from_error_queue: bool = False) -> None:106 """Pop venues from indexation queue and reindex them."""107 backend = _get_backend()108 try:109 chunk_size = settings.REDIS_VENUE_IDS_CHUNK_SIZE110 venue_ids = backend.pop_venue_ids_from_queue(count=chunk_size, from_error_queue=from_error_queue)111 if not venue_ids:112 return113 _reindex_venue_ids(backend, venue_ids)114 except Exception as exc: # pylint: disable=broad-except115 if settings.IS_RUNNING_TESTS:116 raise117 logger.exception("Could not index venues from queue", extra={"exc": str(exc)})118def _reindex_venue_ids(backend: base.SearchBackend, venue_ids: Iterable[int]) -> None:119 logger.info("Starting to index venues", extra={"count": len(venue_ids)})120 venues = Venue.query.filter(Venue.id.in_(venue_ids)).options(joinedload(Venue.managingOfferer))121 to_add = [venue for venue in venues if venue.is_eligible_for_search]122 to_add_ids = [venue.id for venue in to_add]123 to_delete_ids = [venue.id for venue in venues if not venue.is_eligible_for_search]124 try:125 backend.index_venues(to_add)126 except Exception as exc: # pylint: disable=broad-except127 backend.enqueue_venue_ids_in_error(to_add_ids)128 logger.warning(129 "Could not reindex venues, will automatically retry",130 extra={"exc": str(exc), "venues": to_add_ids},131 exc_info=True,132 )133 else:134 logger.info("Finished indexing venues", extra={"count": len(to_add)})135 if to_delete_ids:136 unindex_venue_ids(to_delete_ids)137 logger.info("Finished unindexing venues", extra={"count": len(to_delete_ids)})138def index_offers_of_venues_in_queue() -> None:139 """Pop venues from indexation queue and reindex their offers."""140 backend = _get_backend()141 try:142 venue_ids = backend.pop_venue_ids_for_offers_from_queue(count=settings.REDIS_VENUE_IDS_FOR_OFFERS_CHUNK_SIZE)143 for venue_id in venue_ids:144 page = 0145 logger.info("Starting to index offers of venue", extra={"venue": venue_id})146 while True:147 offer_ids = offer_queries.get_paginated_offer_ids_by_venue_id(148 limit=settings.ALGOLIA_OFFERS_BY_VENUE_CHUNK_SIZE, page=page, venue_id=venue_id149 )150 if not offer_ids:151 break152 reindex_offer_ids(offer_ids)153 page += 1154 logger.info("Finished indexing offers of venue", extra={"venue": venue_id})155 except Exception: # pylint: disable=broad-except156 if settings.IS_RUNNING_TESTS:157 raise158 logger.exception("Could not index offers of venues from queue")159def reindex_offer_ids(offer_ids: Iterable[int]) -> None:160 """Given a list of `Offer.id`, reindex or unindex each offer161 (i.e. request the external indexation service an update or a162 removal).163 This function calls the external indexation service and may thus164 be slow. It should not be called by usual code. You should rather165 call `async_index_offer_ids()` instead to return quickly.166 """167 backend = _get_backend()168 to_add = []169 to_delete = []170 # FIXME (dbaty, 2021-07-05): join-load Stock, Venue, Offerer,171 # etc. to avoid N+1 queries on each offer.172 offers = Offer.query.filter(Offer.id.in_(offer_ids))173 for offer in offers:174 if offer and offer.is_eligible_for_search:175 to_add.append(offer)176 elif backend.check_offer_is_indexed(offer):177 to_delete.append(offer)178 else:179 # FIXME (dbaty, 2021-06-24). I think we could safely do180 # without the hashmap in Redis. Check the logs and see if181 # I am right!182 logger.info(183 "Redis 'indexed_offers' set avoided unnecessary request to indexation service",184 extra={"source": "reindex_offer_ids", "offer": offer.id},185 )186 # Handle new or updated available offers187 try:188 backend.index_offers(to_add)189 except Exception as exc: # pylint: disable=broad-except190 if settings.IS_RUNNING_TESTS:191 raise192 logger.warning(193 "Could not reindex offers, will automatically retry",194 extra={"exc": str(exc), "offers": [offer.id for offer in to_add]},195 exc_info=True,196 )197 backend.enqueue_offer_ids_in_error([offer.id for offer in to_add])198 # Handle unavailable offers (deleted, expired, sold out, etc.)199 try:200 backend.unindex_offer_ids([offer.id for offer in to_delete])201 except Exception as exc: # pylint: disable=broad-except202 if settings.IS_RUNNING_TESTS:203 raise204 logger.warning(205 "Could not unindex offers, will automatically retry",206 extra={"exc": str(exc), "offers": [offer.id for offer in to_delete]},207 exc_info=True,208 )209 backend.enqueue_offer_ids_in_error([offer.id for offer in to_delete])210def unindex_offer_ids(offer_ids: Iterable[int]) -> None:211 backend = _get_backend()212 try:213 backend.unindex_offer_ids(offer_ids)214 except Exception: # pylint: disable=broad-except215 if settings.IS_RUNNING_TESTS:216 raise217 logger.exception("Could not unindex offers", extra={"offers": offer_ids})218def unindex_all_offers() -> None:219 if settings.IS_PROD:220 raise ValueError("It is forbidden to unindex all offers on this environment")221 backend = _get_backend()222 try:223 backend.unindex_all_offers()224 except Exception: # pylint: disable=broad-except225 if settings.IS_RUNNING_TESTS:226 raise227 logger.exception("Could not unindex all offers")228def reindex_venue_ids(venue_ids: Iterable[int]) -> None:229 """Given a list of `Venue.id`, reindex or unindex each venue230 (i.e. request the external indexation service an update or a231 removal).232 This function calls the external indexation service and may thus233 be slow. It should not be called by usual code. You should rather234 call `async_index_venue_ids()` instead to return quickly.235 """236 backend = _get_backend()237 try:238 _reindex_venue_ids(backend, venue_ids)239 except Exception: # pylint: disable=broad-except240 if settings.IS_RUNNING_TESTS:241 raise242 logger.exception("Could not reindex venues", extra={"venues": venue_ids})243def unindex_venue_ids(venue_ids: Iterable[int]) -> None:244 if not venue_ids:245 return246 backend = _get_backend()247 try:248 backend.unindex_venue_ids(venue_ids)249 except Exception: # pylint: disable=broad-except250 if settings.IS_RUNNING_TESTS:251 raise252 logger.exception("Could not unindex venues", extra={"venues": venue_ids})253def unindex_all_venues() -> None:254 if settings.IS_PROD:255 raise ValueError("It is forbidden to unindex all venues on this environment")256 backend = _get_backend()257 try:258 backend.unindex_all_venues()259 except Exception: # pylint: disable=broad-except260 if settings.IS_RUNNING_TESTS:261 raise...
tests.py
Source:tests.py
...58 "env": "-development",59 },60 "MJ-TemplateErrorReporting": "dev@example.com",61 }62 def _get_backend(self):63 return pcapi.core.mails.backends.mailjet.MailjetBackend()64 def test_send_mail(self):65 backend = self._get_backend()66 with requests_mock.Mocker() as mock:67 posted = mock.post("https://api.mailjet.com/v3/send")68 result = backend.send_mail(recipients=self.recipients, data=self.data)69 assert posted.last_request.json() == self.expected_sent_data70 assert result.successful71 def test_send_mail_with_error_response(self):72 backend = self._get_backend()73 with requests_mock.Mocker() as mock:74 posted = mock.post("https://api.mailjet.com/v3/send", status_code=400)75 result = backend.send_mail(recipients=self.recipients, data=self.data)76 assert posted.last_request.json() == self.expected_sent_data77 assert not result.successful78 def test_send_mail_with_timeout(self):79 backend = self._get_backend()80 with requests_mock.Mocker() as mock:81 mock.post("https://api.mailjet.com/v3/send", exc=requests.exceptions.ConnectTimeout)82 result = backend.send_mail(recipients=self.recipients, data=self.data)83 assert not result.successful84 @patch("pcapi.utils.requests.logger.info")85 def test_use_our_requests_wrapper_that_logs(self, mocked_logger):86 backend = self._get_backend()87 with requests_mock.Mocker() as mock:88 posted = mock.post("https://api.mailjet.com/v3/send")89 backend.send_mail(recipients=self.recipients, data=self.data)90 assert posted.last_request.json() == self.expected_sent_data91 mocked_logger.assert_called_once()92class ToDevMailjetBackendTest:93 recipients = ["real1@example.com", "real2@example.com"]94 data = {"key": "value"}95 expected_sent_data = {96 "FromEmail": "support@example.com",97 "To": ", ".join(recipients),98 "key": "value",99 "MJ-TemplateErrorReporting": "dev@example.com",100 }101 def _get_backend(self):102 return pcapi.core.mails.backends.mailjet.ToDevMailjetBackend()103 def test_send_mail_overrides_recipients(self):104 backend = self._get_backend()105 with requests_mock.Mocker() as mock:106 posted = mock.post("https://api.mailjet.com/v3/send")107 result = backend.send_mail(recipients=self.recipients, data=self.data)108 expected = copy.deepcopy(self.expected_sent_data)109 expected["To"] = "dev@example.com"110 assert posted.last_request.json() == expected111 assert result.successful112 @override_settings(WHITELISTED_EMAIL_RECIPIENTS=["false1@example.com", "real2@example.com"])113 def test_send_mail_if_any_recipient_is_whitelisted(self):114 backend = self._get_backend()115 with requests_mock.Mocker() as mock:116 posted = mock.post("https://api.mailjet.com/v3/send")117 result = backend.send_mail(recipients=self.recipients, data=self.data)118 assert posted.last_request.json() == self.expected_sent_data119 assert result.successful120 def test_send_mail_inject_preamble_in_html(self):121 backend = self._get_backend()122 data = copy.deepcopy(self.data)123 data["Html-part"] = "<div>some HTML...<div>"124 with requests_mock.Mocker() as mock:125 posted = mock.post("https://api.mailjet.com/v3/send")126 result = backend.send_mail(recipients=self.recipients, data=data)127 expected = copy.deepcopy(self.expected_sent_data)128 expected["To"] = "dev@example.com"129 posted_json = posted.last_request.json()130 assert posted_json["Html-part"].endswith("<div>some HTML...<div>")131 assert "would have been sent to real1@example.com, real2@example.com" in posted_json["Html-part"]132 assert result.successful133class SendinblueBackendTest:134 recipients = ["lucy.ellingson@example.com", "avery.kelly@example.com"]135 mock_template = sendinblue_models.Template(136 id_prod=1, id_not_prod=10, tags=["this_is_such_a_great_tag", "it_would_be_a_pity_if_anything_happened_to_it"]137 )138 mock_reply_to = sendinblue_models.EmailInfo(email="reply_to@example.com", name="Tom S.")139 params = {"Name": "Lucy", "City": "Kennet", "OtherCharacteristics": "Awsomeness"}140 data = sendinblue_models.SendinblueTransactionalEmailData(141 template=mock_template, params=params, reply_to=mock_reply_to142 )143 expected_sent_data = sendinblue_tasks.SendTransactionalEmailRequest(144 recipients=recipients,145 params=params,146 template_id=data.template.id,147 tags=data.template.tags,148 sender=dataclasses.asdict(sendinblue_models.SendinblueTransactionalSender.SUPPORT.value),149 reply_to={"email": "reply_to@example.com", "name": "Tom S."},150 )151 def _get_backend(self):152 return pcapi.core.mails.backends.sendinblue.SendinblueBackend()153 @patch("pcapi.core.mails.backends.sendinblue.send_transactional_email_secondary_task.delay")154 def test_send_mail(self, mock_send_transactional_email_secondary_task):155 backend = self._get_backend()156 result = backend.send_mail(recipients=self.recipients, data=self.data)157 assert mock_send_transactional_email_secondary_task.call_count == 1158 task_param = mock_send_transactional_email_secondary_task.call_args[0][0]159 assert list(task_param.recipients) == list(self.expected_sent_data.recipients)160 assert task_param.params == self.expected_sent_data.params161 assert task_param.template_id == self.expected_sent_data.template_id162 assert task_param.tags == self.expected_sent_data.tags163 assert task_param.sender == self.expected_sent_data.sender164 assert task_param.reply_to == self.expected_sent_data.reply_to165 assert result.successful166 @patch("pcapi.core.mails.backends.sendinblue.send_transactional_email_secondary_task.delay")167 def test_send_mail_with_no_reply_equal_overrided_by_sender(self, mock_send_transactional_email_secondary_task):168 self.data = sendinblue_models.SendinblueTransactionalEmailData(169 template=self.mock_template, params=self.params, reply_to=None170 )171 expected_sent_data = sendinblue_tasks.SendTransactionalEmailRequest(172 recipients=self.recipients,173 params=SendinblueBackendTest.params,174 template_id=SendinblueBackendTest.data.template.id,175 tags=SendinblueBackendTest.data.template.tags,176 sender=dataclasses.asdict(sendinblue_models.SendinblueTransactionalSender.SUPPORT.value),177 reply_to=dataclasses.asdict(sendinblue_models.SendinblueTransactionalSender.SUPPORT.value),178 )179 backend = self._get_backend()180 result = backend.send_mail(recipients=self.recipients, data=self.data)181 assert mock_send_transactional_email_secondary_task.call_count == 1182 task_param = mock_send_transactional_email_secondary_task.call_args[0][0]183 assert list(task_param.recipients) == list(self.expected_sent_data.recipients)184 assert task_param.params == expected_sent_data.params185 assert task_param.template_id == expected_sent_data.template_id186 assert task_param.tags == expected_sent_data.tags187 assert task_param.sender == expected_sent_data.sender188 assert task_param.reply_to == expected_sent_data.reply_to189 assert result.successful190class ToDevSendinblueBackendTest(SendinblueBackendTest):191 expected_sent_data = sendinblue_tasks.SendTransactionalEmailRequest(192 recipients=["dev@example.com"],193 params=SendinblueBackendTest.params,194 template_id=SendinblueBackendTest.data.template.id,195 tags=SendinblueBackendTest.data.template.tags,196 sender=dataclasses.asdict(sendinblue_models.SendinblueTransactionalSender.SUPPORT.value),197 reply_to={"email": "reply_to@example.com", "name": "Tom S."},198 )199 def _get_backend(self):...
configurator.py
Source:configurator.py
...7 """ Initialize config. """8 config = self._read_config(filename)9 self._logger = logging.getLogger(__name__)10 self._init_logger(config)11 self.backend = self._get_backend(config)12 self.protectors = self._get_protectors(config)13 self.virtualServer = self._get_virtualServer(config)14 self.sources = self._get_sources(config)15 def _read_config(self, filename):16 try:17 with open(filename) as json_data_file:18 data = json.load(json_data_file)19 except IOError as err:20 print(err)21 sys.exit(1)22 return data23 def _init_logger(self, config):24 """ Initialize logger. """25 config_log_level = config["logs"]["level"].upper()26 log_level = getattr(logging, config_log_level)27 logging.basicConfig(28 filename=config["logs"]["file"],29 level=log_level,30 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'31 )32 def _get_virtualServer(self, config):33 """ Read and retun configuration. """34 return {35 "interface": config["virtual"]["interface"] or "localhost",36 "port": int(config["virtual"]["port"]) or 808037 }38 def _get_protectors(self, config):39 """ Read and retun configuration. """40 protectors = {}41 for item, value in config.get("protectors").items():42 protectors[item] = value43 return protectors44 def _get_backend(self, config):45 return {46 "host": config["backend"]["host"],47 "port": int(config["backend"]["port"]) or 8048 }49 def _get_sources(self, config):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!