Best Python code snippet using autotest_python
events.py
Source:events.py
...54 """55 def __init__(self, request):56 self.request = request57 self.sailthru = Sailthru(request)58 def _sendmail(self, template_name, to_email, context):59 """60 Use SailThru to deliver a templated e-mail61 :param template_name: SailThru template name62 :param to_email: string E-mail address63 :param context: Dictionary of parameters for the template64 """65 assert isinstance(template_name, basestring)66 assert isinstance(to_email, basestring)67 assert type(context) == dict68 if self.sailthru.enabled():69 response = self.sailthru.send_template(template_name, to_email, context)70 LOG.info("Sent e-mail to %s (SailThru send ID: %s)",71 response['email'], response['send_id'])72 return response73 else:74 LOG.warn(75 'Not sending template %s to %s because SailThru disabled',76 template_name, to_email77 )78 return None79 def logged_in(self, user):80 """81 The user has logged in to the site82 :param user: User object83 """84 # Integration with sailthru85 if self.sailthru.enabled():86 self.sailthru.login(user)87 # Internal LifetimeTrack88 try:89 lt = LifetimeTrack.objects.get(user=user)90 merge_lifetime_track(self.request.lifetime_track, lt)91 except Exception as e:92 print "=-"*34, e93 # Integration with mixpanel94 try:95 mixpanel_track(96 self.request,97 "logged in",98 {99 "$last_seen": datetime.datetime.now()100 }101 )102 mixpanel_engage(self.request, {'$add': {'Times Logged In': 1}})103 try:104 has_stall = user.stall is not None105 except:106 has_stall = False107 if has_stall:108 member_type = 'Regular'109 else:110 member_type = 'Stall Owner'111 mixpanel_engage(self.request, {'$set': {112 'last_login': datetime.datetime.now().isoformat(),113 '$email': user.email,114 '$username': user.username,115 '$created': user.date_joined.isoformat(),116 "$last_seen": datetime.datetime.now().isoformat(),117 '$first_name': user.first_name,118 '$last_name': user.last_name,119 'gender': user.user_profile.get_gender_display(),120 'mp_name_tag': user.get_full_name(),121 'Member Type': member_type,122 }})123 except:124 LOG.warn('Could not send login event to MixPanel', exc_info=True)125 def user_changed_email(self, user, old_email, new_email):126 """127 The user has changed their e-mail address128 :param user: User object129 :param old_email: Old e-mail address130 :param new_email: New e-mail address131 """132 from mailing_lists.models import MailingListSignup133 self.sailthru.change_email(old_email, new_email)134 old_mls = MailingListSignup.objects.filter(user=user)135 # Multiple mailing list signup objects for the same user136 # This shouldn't happen, but we can safely clean up.137 if len(old_mls) > 1:138 MailingListSignup.objects.filter(user=user).exclude(email_address=old_email).delete()139 old_mls = MailingListSignup.objects.filter(user=user) 140 if MailingListSignup.objects.filter(email_address=new_email).count() != 0:141 LOG.warn('Cant update MailingListSignup from %s to %s '142 'because MLS with new email already exists', old_email, new_email)143 return False 144 if len(old_mls) == 1: 145 old_mls[0].email_address = new_email146 old_mls[0].save()147 def cart_updated(self, cart):148 """149 Syncs contents of cart with SailThru and updates MixPanel properties150 for # of items in cart.151 :param cart: Cart object152 """153 try:154 self.sailthru.cart_updated(cart)155 mixpanel_engage(self.request, {'$set': {156 'Items in Cart': cart.num_items()157 }})158 except:159 LOG.error("cart_updated failed", exc_info=True)160 def user_followed(self, follow):161 """162 :param follow: UserFollow object which has just been created163 """164 mixpanel_track(self.request,165 "Followed User",166 {"followed_user": follow.target.username})167 mixpanel_engage(self.request,168 get_mixpanel_info(follow.user),169 follow.user.id)170 mixpanel_engage(self.request,171 get_mixpanel_info(follow.target, ignore_time=True),172 follow.target.id)173 template_name = 'new-follower-notification'174 context = {175 'USER_USERNAME': follow.user.username,176 'USER_PROFILE_URL': absolute_uri(follow.user.get_profile().get_absolute_url()),177 'TARGET_USER_USERNAME': follow.target.username,178 'TARGET_USER_PROFILE_URL': absolute_uri(follow.target.get_profile().get_absolute_url())179 }180 print absolute_uri(follow.user.get_profile().get_absolute_url()), \181 absolute_uri(follow.target.get_profile().get_absolute_url())182 email_notification = follow.target.email_notification183 if email_notification.follower_notifications:184 self._sendmail(template_name, follow.target.email, context)185 else:186 LOG.info(187 "Email not send to %s, since follower_notifications is set to OFF",188 follow.target.email189 )190 def user_unfollowed(self, unfollow):191 """192 :param unfollow: UserFollow object which is about to be deleted193 """194 mixpanel_track(self.request,195 "Unfollowed User",196 {"followed_user": unfollow.target.username})197 mixpanel_engage(self.request,198 get_mixpanel_info(unfollow.user),199 unfollow.user.id)200 mixpanel_engage(self.request,201 get_mixpanel_info(unfollow.target, ignore_time=True),202 unfollow.target.id)203 def message_sent(self, msg):204 """205 A new message has been sent, notify the recipient via e-mail206 :param msg: Message object207 """208 if not msg.parent_msg:209 template_name = "new-message-received"210 else:211 template_name = "message-reply-received"212 # TODO: notification settings213 context = {214 "SENDER_USERNAME": msg.sender.username,215 "MESSAGE_SUBJECT": msg.subject,216 "MESSAGE_BODY": msg.body,217 'MESSAGE_VIEW_URL': absolute_uri(reverse('messaging_inbox'))218 }219 # send mail to hipchat220 template_context = Context({221 'request': self.request,222 'message': msg,223 })224 template = loader.get_template("messaging/fragments/hipchat_message.html")225 output = template.render(template_context)226 send_to_hipchat(227 output,228 room_id=settings.HIPCHAT_MAIL_ROOM,229 notify=1,230 )231 mixpanel_track(self.request, "Sent Message", {})232 mixpanel_engage(self.request, {'$add': {'Messages Sent': 1}})233 self._sendmail(template_name, msg.recipient.email, context)234 def seller_paypal_error(self, payment_attempt):235 """236 The payment attempt failed because the Seller has an invalid Paypal237 address238 :param payment_attempt: PaymentAttempt object239 """240 try:241 stall = payment_attempt.cart_stall.stall242 stall_url = reverse("my_stall", kwargs={"slug": stall.slug})243 settings_url = reverse("account_email_notifications")244 context = {245 'stall': {246 'title': stall.title,247 'url': absolute_uri(stall_url),248 },249 'PAYPAL_SETTINGS_URL': absolute_uri(settings_url)250 }251 to_email = stall.user.email252 self._sendmail("seller-erroneous-paypal-account",253 to_email, context)254 except:255 LOG.error('seller_paypal_error failed', exc_info=True)256 def _order_item_context(self, item):257 """258 XXX: move somewhere else?259 """260 product = item.product261 return {262 'qty': item.quantity,263 'title': product.title,264 'price': str(item.price),265 'total': str(item.price * item.quantity),266 'id': product.id,267 'url': absolute_uri(product.get_absolute_url())268 }269 def _order_context(self, order):270 """271 XXX: move somewhere else?272 """273 items = [self._order_item_context(line_item)274 for line_item in order.line_items.all()] 275 address = order.address276 delta = datetime.datetime.now().date() - order.created277 context = {278 'ORDER': {279 'items': items,280 'subtotal': str(order.subtotal()),281 'shipping': str(order.delivery_charge),282 'total': str(order.total()),283 'note': order.note,284 'address': {285 'name': address.name,286 'city': address.city,287 'country': address.country.title,288 'line1': address.line1,289 'line2': address.line2,290 'postal_code': address.postal_code,291 'state': address.state,292 },293 },294 "FNAME": order.user.first_name,295 "ORDER_DATE": custom_strftime('{S} %B %Y', order.created),296 "CUSTOMER_FIRST_NAME": order.user.first_name,297 "CUSTOMER_FULL_NAME": order.user.get_profile().full_name,298 "CUSTOMER_USERNAME": order.user.username,299 "CUSTOMER_PROFILE_URL": absolute_uri(300 reverse("public_profile",301 kwargs={"username": order.user.username})),302 "ORDER_ID": order.id,303 "INVOICE_URL": absolute_uri(reverse("invoice",304 kwargs={"order_id": order.id})),305 "STALL_TITLE": order.stall.title,306 "STALL_URL": absolute_uri(reverse("my_stall",307 kwargs={"slug": order.stall.slug})),308 "STALL_OWNER_USERNAME": order.stall.user.username,309 'STALL_OWNER_URL': order.stall.user.get_profile().get_absolute_url(),310 "NUMBER_OF_DAYS_SINCE_ORDER": delta.days,311 "MESSAGE_CUSTOMER_URL": absolute_uri(312 reverse("messaging_compose_to", kwargs={313 "recipient": order.user.username,314 })),315 "MESSAGE_STALL_URL": absolute_uri(316 reverse("messaging_compose_to", kwargs={317 "recipient": order.stall.user.username,318 })319 ),320 "UPDATE_PROFILE": absolute_uri(reverse("account_email_notifications")),321 }322 return context323 def _mixpanel_order_info(self, order):324 order_info = {325 "Order ID": order.id,326 "Order Date": order.created.isoformat(),327 "Total Order Value": str(order.total().amount),328 "Total Shipping Value": str(order.delivery_charge.amount),329 "No of Products": order.line_items.all().count(),330 "No of Items": order.num_items(),331 "Delivery Country": order.address.country.title332 }333 return order_info334 def order_reminder(self, order):335 """336 The seller needs to be reminded about an order.337 """338 days_map = {339 3: "3-day-dispatch-chase-up-stall-owner",340 7: "7-day-dispatch-chase-up-stall-owner",341 13: "13-day-dispatch-chase-up-stall-owner",342 14: "14-day-dispatch-chase-up-and-refund-warning",343 }344 days = (datetime.datetime.now().date() - order.created).days345 allowed_days = days_map.keys()346 if days not in allowed_days:347 return348 context = self._order_context(order)349 self._sendmail(days_map[days], order.stall.user.email, context)350 def order_refunded(self, order):351 """352 The seller has refunded an order353 """354 context = self._order_context(order)355 self._sendmail(356 'order-refunded-to-customer',357 order.user.email, context358 )359 self._sendmail(360 'order-refunded-to-stall-owner',361 order.stall.user.email,362 context363 )364 try:365 # Track the Seller refunding the Order366 order_info = self._mixpanel_order_info(order)367 mixpanel_track(self.request, "Clicked Refund Button", order_info)368 mixpanel_engage(self.request, {369 '$add': {370 'Refunds Made': 1371 }372 })373 # Update the Buyers properties, reducing their GMV374 # XXX: we can't delete the transaction after it's refunded!!!375 # adding the transaction needs to happen on Dispatch376 user = order.user377 stall = order.stall378 mixpanel_engage(None, {379 '$set': {380 'Orders': user.orders.completed().count(),381 'Total GMV to Date': str(user.get_profile().total_gmv.amount),382 },383 '$ignore_time': True384 }, distinct_id=user.id)385 except: 386 LOG.warning("Couldn't notify MixPanel of refund", exc_info=True)387 def order_dispatched(self, order):388 """389 The seller has marked an order as dispatched390 """391 try:392 context = self._order_context(order)393 self._sendmail(394 'order-dispatched-to-customer',395 order.user.email,396 context397 )398 except:399 LOG.warning("Couldn't send Order Dispatched email to Customer", exc_info=True)400 try:401 order_info = self._mixpanel_order_info(order)402 mixpanel_track(self.request, 'Clicked Mark Order as Dispatched', order_info)403 mixpanel_engage(self.request, {404 '$add': {405 'Orders Dispatched': 1406 }407 }, distinct_id=order.stall.user.id)408 except:409 LOG.warning("Couldn't update MixPanel after Order Dispatch", exc_info=True)410 def order_placed(self, order):411 """412 1) Syncs cart with SailThru413 2) Sends 'Order Completed' e-mail to customer via Sailthru414 3) Syncs order information with MixPanel415 """416 # Technically the transaction isn't complete yet because the seller hasn't417 # marked the order as complete so could potentially refund the money.418 try:419 context = self._order_context(order)420 self._sendmail(421 'order-placed-to-customer',422 order.user.email,423 context424 )425 self._sendmail(426 'order-placed-to-stall-owner',427 order.stall.user.email,428 context429 )430 except:431 LOG.error("Events.order_placed failed to send email", exc_info=True)432 # Send completed order to SailThru, this updates the users 'Total Revenue'433 try:434 self.sailthru.order_purchased(order)435 except:436 LOG.error("Events.order_placed failed to sync order with SailThru", exc_info=True)437 # Sync user properties with MixPanel438 try:439 user = order.user440 mixpanel_engage(self.request, {441 '$set': {442 'Orders': user.orders.completed().count(),443 'Total GMV to Date': str(user.get_profile().total_gmv.amount),444 }445 }, distinct_id=user.id)446 mixpanel_engage(self.request, {447 '$append': {448 '$transactions': {449 '$time': order.created.isoformat(),450 '$amount': str(order.total().amount),451 'Order ID': order.id452 }453 }454 }, distinct_id=user.id)455 if self.request is not None:456 order_info = self._mixpanel_order_info(order)457 mixpanel_track(self.request, "Purchased Order", order_info)458 except: 459 LOG.warning("Couldn't notify MixPanel of new order", exc_info=True)460 def forgot_password(self, user, token_generator=None):461 """462 Sends 'Forgot Username or Password?' with their username and a link463 to the password reset form.464 """465 from django.utils.http import int_to_base36466 if token_generator is None:467 from django.contrib.auth.tokens import default_token_generator 468 token_generator = default_token_generator469 token = token_generator.make_token(user)470 uid = int_to_base36(user.id)471 ctx = dict(472 PASSWORD_RESET_URL=absolute_uri(473 reverse("password_reset_confirm", kwargs={474 "uidb36": uid,475 "token": token476 })477 )478 )479 self._sendmail('forgot-username-or-password', user.email, ctx)480 def stall_opened(self, stall):481 """482 A new stall has been opened, welcome the user to the site and give them483 information about how to add products & optimize their stuff.484 """485 user = stall.user486 context = {487 "STALL_URL": absolute_uri(reverse("my_stall", kwargs={488 "slug": stall.slug,489 }))490 }491 self._sendmail("stall-owner-welcome", user.email, context)492 def user_signup(self, user, requires_activation=True):493 """494 Welcomes the user to the site495 Includes URL for them to activate their account / verify their e-mail.496 """497 if requires_activation:498 user_profile = user.user_profile499 verify_url = absolute_uri(reverse("verify", args=[user_profile.activation_key]))500 ctx = dict(ACTIVATION_URL=verify_url)501 self._sendmail('regular-user-welcome-email', user.email, ctx)502 profile = user.get_profile()503 # Fix up their MailingLists objects504 from mailing_lists.models import MailingListSignup505 mls = MailingListSignup.objects.create_from_user(user)506 mls.marketing_optin = profile.send_newsletters507 if self.request is not None:508 mls.set_ip_address(self.request)509 mls.save()510 # Apply defaults to EmailNotification preferences511 email_notification = user.email_notification512 email_notification.site_updates_features = profile.send_newsletters513 email_notification.stall_owner_tips = True514 email_notification.product_inspirations = profile.send_newsletters515 email_notification.blogs_you_might_like = profile.send_newsletters516 email_notification.save()517 # Integration with sailthru518 if self.sailthru.enabled():519 self.sailthru.signup(user=user)520 def newsletter_signup(self, email):521 """522 This is a newsletter signup that is available from the homepage modal,523 and at the bottom of the blog.524 """525 # Integration with sailthru526 # XXX: we should be doing the mailinglistsignup thing here too!!!527 if self.sailthru.enabled():528 self.sailthru.signup(email=email)529 def stall_stockcheck(self, stall):530 user = stall.user531 context = {532 "STALL_URL": absolute_uri(reverse("my_stall", kwargs={533 "slug": stall.slug,534 })),535 "DAYS_LEFT": stall.days_to_next_stockcheck536 }537 self._sendmail("stock-check-reminder-1", user.email, context)538 def stall_stockcheck_urgent(self, stall):539 user = stall.user540 context = {541 "STALL_URL": absolute_uri(reverse("my_stall", kwargs={542 "slug": stall.slug,543 })),544 "DAYS_LEFT": stall.days_to_next_stockcheck545 }546 self._sendmail("stock-check-reminder-2", user.email, context)547 def stall_suspended(self, stall):548 user = stall.user549 context = {550 "STALL_URL": absolute_uri(reverse("my_stall", kwargs={551 "slug": stall.slug,552 }))553 }...
test_mail.py
Source:test_mail.py
1# -*- coding: utf-8 -*-2# Copyright 2019 Open End AB3#4# Licensed under the Apache License, Version 2.0 (the "License");5# you may not use this file except in compliance with the License.6# You may obtain a copy of the License at7#8# http://www.apache.org/licenses/LICENSE-2.09#10# Unless required by applicable law or agreed to in writing, software11# distributed under the License is distributed on an "AS IS" BASIS,12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.13# See the License for the specific language governing permissions and14# limitations under the License.15try:16 import ConfigParser #py217except ImportError:18 import configparser as ConfigParser #py319 20import dkim21import email.message22import os23from accounting import config24from .. import mail, smtppipe25import codecs26try:27 unicode #py228 py23txt = lambda t, c = 'us-ascii': t29 py23txtc = lambda t, c = 'us-ascii': t.encode(c)30 py23txtu = lambda t, c = 'us-ascii': unicode(t, c)31except NameError: #py332 py23txt = lambda t, c = 'us-ascii': str(t, c)33 py23txtc = lambda t, c = 'us-ascii': bytes(t,c)34 py23txtu = lambda t, c = 'us-ascii': t35class SMTP(object):36 local_hostname = 'test'37 _quit = False38 def __init__(self, *args, **kw):39 self._args = args40 self._kw = kw41 self._sendmail = []42 def sendmail(self, *args):43 self._sendmail.append(args)44 def quit(self):45 self._quit = True46class TestMail(object):47 def setup_method(self, method):48 self._smtp = smtppipe.SMTP49 smtppipe.SMTP = self.SMTP50 self.smtps = []51 self.orig_config = config.config52 self.config = config.save()53 config.config.set('accounting', 'smtp_domain', 'example.com')54 def teardown_method(self, method):55 smtppipe.SMTP = self._smtp56 config.restore(self.config)57 def SMTP(self, *args, **kw):58 smtp = SMTP(*args, **kw)59 self.smtps.append(smtp)60 return smtp61 def test_sendmail_dkim(self):62 config.config.set('accounting', 'dkim_privkey', os.path.join(63 os.path.dirname(__file__), 'dkim_test.private'))64 config.config.set('accounting', 'dkim_domain', 'example.org')65 config.config.set('accounting', 'smtp_domain', 'test.example.org')66 config.config.set('accounting', 'smtp_to_filter', '.*@example')67 pubkey = open(os.path.join(68 os.path.dirname(__file__), 'dkim_test.txt'),'rb').read()69 body = u'räksmörgÃ¥s'70 subject = u'RäksmörgÃ¥sar!'71 to = u'"Mr. RäksmörgÃ¥s" <foo@example>'72 fromaddr, all_rcpts, message = mail.makemail(73 body, subject=subject, to=to)74 mail.sendmail(fromaddr, all_rcpts, message)75 smtp, = self.smtps76 message = smtp._sendmail[0][2]77 assert dkim.verify(message, dnsfunc=lambda *_: pubkey)78 assert b'i=@test.example.org' in message79 fromaddr, all_rcpts, message = mail.makemail(80 body, subject=subject, to=to)81 mail.sendmail(fromaddr, all_rcpts, message, identity='foo')82 smtp = self.smtps[1]83 message = smtp._sendmail[0][2]84 assert dkim.verify(message, dnsfunc=lambda *_: pubkey)85 assert b'i=foo@test.example.org' in message86 def test_makemail(self):87 body = u'räksmörgÃ¥s'88 subject = u'RäksmörgÃ¥sar!'89 to = u'"Mr. RäksmörgÃ¥s" <foo@example>'90 args = mail.makemail(body, subject=subject, to=to)91 assert len(args) == 392 assert args[:2] == ('<>', ['foo@example'])93 msg = args[2]94 assert 'foo@example' in msg95 # subject utf-8 and base64 encoded96 assert '=?utf-8?b?UsOka3Ntw7ZyZ8Olc2FyIQ==?=' in msg97 #assert msg.strip().endswith(py23txtc(body, 'utf-8').encode('base64').strip())98 assert msg.strip().endswith(py23txt(codecs.encode(py23txtc(body, 'utf-8'), 'base64').strip()))99 def test_makemail_bcc(self):100 body = u'räksmörgÃ¥s'101 subject = u'RäksmörgÃ¥sar!'102 bcc = u'"Mr. RäksmörgÃ¥s" <foo@example>'103 args = mail.makemail(body, envfrom='some@place',104 subject=subject, bcc=bcc)105 assert len(args) == 3106 assert args[:2] == ('some@place', ['foo@example'])107 msg = args[2]108 assert 'foo@example' not in msg109 assert 'Bcc' in msg110 def test_makemail_envfrom(self):111 body = u'räksmörgÃ¥s'112 subject = u'RäksmörgÃ¥sar!'113 to = u'"Mr. RäksmörgÃ¥s" <foo@example>'114 _from = u'test@other'115 args = mail.makemail(body, envfrom='some@place', _from=_from,116 subject=subject, to=to)117 assert len(args) == 3118 assert args[:2] == ('some@place', ['foo@example'])119 msg = args[2]120 #import pdb;pdb.set_trace()121 assert 'From: <test@other>' in msg122 #assert 'From: <test@other>' in msg123 def test_address_filtering(self, monkeypatch):124 config.config.set('accounting', 'smtp_to_filter', '')125 log = []126 monkeypatch.setattr(mail.log, 'warn', lambda *args: log.append(args))127 mail.sendmail('from@test', ['to@test'], 'mail')128 assert self.smtps == []129 assert 'to@test' in log[0]130 log[:] = []131 config.config.set('accounting', 'smtp_to_filter', '.*@openend.se')132 mail.sendmail('from@test', ['to@test'], 'mail')133 assert self.smtps == []134 assert 'to@test' in log[0]135 log[:] = []136 msg = email.message.Message()137 msg['from'] = 'foo@test'138 config.config.set('accounting', 'smtp_to_filter', '.*')139 # xxx hack to make dkim signing work, since the dkim key is for140 # openend.se141 config.config.set('accounting', 'smtp_domain', 'admin.eutaxia.eu')142 mail.sendmail('from@test', ['to@test'], msg.as_string())143 args, = self.smtps[0]._sendmail144 assert args[:2] == ('from@test', ['to@test'])145 assert log == []146 def test_fromaddr_domain(self):147 config.config.set('accounting', 'dkim_privkey', os.path.join(148 os.path.dirname(__file__), 'dkim_test.private'))149 config.config.set('accounting', 'dkim_domain', 'example.org')150 config.config.set('accounting', 'smtp_domain', 'test.example.org')151 config.config.set('accounting', 'smtp_to_filter', '.*')152 msg = email.message.Message()153 msg['from'] = 'foo@test'154 mail.sendmail('from', ['to@test'], msg.as_string())155 args, = self.smtps[-1]._sendmail156 assert args[:2] == ('from@test.example.org', ['to@test'])157 msg = email.message.Message()158 msg['from'] = 'foo@test'159 mail.sendmail('from@bar', ['to@test'], msg.as_string())160 args, = self.smtps[-1]._sendmail161 assert args[:2] == ('from@bar', ['to@test'])162 msg = email.message.Message()163 msg['from'] = 'foo@test'164 mail.sendmail('<>', ['to@test'], msg.as_string())165 args, = self.smtps[-1]._sendmail166 assert args[:2] == ('<>', ['to@test'])167def test_makeAddressHeader():168 addrs = [(u'kalle anka', 'kalle@anka.se'),169 (u'foo@bar', 'foo@bar.com'),170 (u'<>foo,bar', 'foobar@baz.com'),171 (u'räksmörgÃ¥s', 'raksmorgas@mat.se'),172 (u'é»å', 'dentaku@foo.org')]173 tohdr = mail.makeAddressHeader('To', addrs)174 try:175 unicode176 assert tohdr == 'kalle anka <kalle@anka.se>, "foo@bar" <foo@bar.com>, "<>foo,bar" <foobar@baz.com>,' \177 ' =?iso-8859-1?q?r=E4ksm=F6rg=E5s?= <raksmorgas@mat.se>, =?utf-8?b?6Zu75Y2T?= <dentaku@foo.org>'178 except NameError:179 assert tohdr == 'kalle anka <kalle@anka.se>, "foo@bar" <foo@bar.com>, "<>foo,bar" <foobar@baz.com>,' \...
sendmail.py
Source:sendmail.py
1import smtplib2import hashlib3import config4import time, queue, threading, logging5log = logging.getLogger(__name__)6#rate limiting : hash a message and put it in a dictionaty (key). Value is a combination7#of number-of-times-message-is-tried-to-be-sent and a starttime.8#A message is sent once, then blocked for a number of times xx or until a certain time passed9_messages = {}10_messageQueue = queue.Queue()11#base class12class Message:13 pass14 15#class used to transmit message to worker thread16class MessageSend(Message):17 def __init__(self, subject, body):18 self.subject = subject19 self.body = body20 21#class used to transmit flush-command to worker thread22class MessageFlush(Message):23 pass24def start():25 log.info("starting")26 _mainThread = threading.Thread(27 target = mailWorker,28 args = (_messageQueue, ))29 _mainThread.setDaemon(True)30 _mainThread.start()31 32 _flushThread = threading.Thread(target = flushWorker)33 _flushThread.setDaemon(True)34 _flushThread.start()35#flushes the dictionary on regular intervals to avoid a too large dictionary 36def flushWorker():37 while True:38 time.sleep(config.SM_WINDOW + 5)39 flush()40 log.info('flush message cache')41class MessageLimiting:42 def __init__(self):43 self.time = int(time.time())44 self.messageCount = 145 46def send(subject, body):47 m = MessageSend(subject, body)48 _messageQueue.put(m)49 50def flush():51 m = MessageFlush()52 _messageQueue.put(m)53 54 55def mailWorker(q):56 global _messages57 while True:58 m = q.get()59 if isinstance(m, MessageSend):60 _send(m.subject, m.body)61 elif isinstance(m, MessageFlush):62 _messages = {}63 64def _sendMail(m, mc):65 message = '{}\n\nMessage #{}\n\nGreetings HA'.format(m, mc)66 log.error(message)67 if not config.SM_ENABLE_SEND_MAIL:68 return69 server = smtplib.SMTP('smtp.gmail.com', 587)70 server.starttls()71 server.login("automatisatie.borowski@gmail.com", "AuGoo20ToGle16Matisatie")72 server.sendmail("automatisatie.borowski@gmail.com", "emmanuel.borowski@gmail.com", message)73 server.quit()74def _send(subject, body):75 if subject == '': subject = 'heating automation message'76 if body == '' : body = 'heating automation default message'77 78 msg = 'Subject: {}\n\n{}'.format(subject, body)79 b = bytearray()80 b.extend(msg.encode())81 hmsg = hashlib.sha1(b).hexdigest()82 if hmsg in _messages:83 m = _messages[hmsg]84 if m.messageCount < 3:85 _sendMail(msg, m.messageCount)86 else:87 if (int(time.time()) - int(m.time)) > config.SM_WINDOW:88 _sendMail(msg, m.messageCount)89 m.time = time.time()90 m.messageCount += 191 else:92 _messages[hmsg] = MessageLimiting()93 m = _messages[hmsg]94 _sendMail(msg, m.messageCount)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!