Best Python code snippet using sure_python
test_subscriptions.py
Source:test_subscriptions.py
1# -*- coding: utf-8 -*-2"""3 tests.core.test_subscriptions4 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~5 Tests for the subscription system.6 :copyright: 2009-2011 by the Inyoka Team, see AUTHORS for more details.7 :license: GNU GPL, see LICENSE for more details.8"""9import unittest10from operator import attrgetter11from inyoka.core.api import db, ctx12from inyoka.core.auth.models import User13from inyoka.core.subscriptions import SubscriptionType, SubscriptionAction14from inyoka.core.subscriptions.models import Subscription15from inyoka.core.test import *16# to keep it better understandable classes etc are named like in a blog17class TestSubscriptionCategory(db.Model):18 __tablename__ = '_test_subscription_category'19 manager = TestResourceManager20 id = db.Column(db.Integer, primary_key=True)21 name = db.Column(db.String(30))22class Wrapper(db.Model):23 __tablename__ = '_test_subscription_wrapper'24 manager = TestResourceManager25 id = db.Column(db.Integer, primary_key=True)26class TestSubscriptionEntry(db.Model):27 __tablename__ = '_test_subscription_entry'28 manager = TestResourceManager29 id = db.Column(db.Integer, primary_key=True)30 category_id = db.Column(db.ForeignKey(TestSubscriptionCategory.id))31 category = db.relationship(TestSubscriptionCategory)32 title = db.Column(db.String(30))33class TestSubscriptionComment(db.Model):34 __tablename__ = '_test_subscription_comment'35 manager = TestResourceManager36 id = db.Column(db.Integer, primary_key=True)37 entry_id = db.Column(db.ForeignKey(TestSubscriptionEntry.id), nullable=False)38 entry = db.relationship(TestSubscriptionEntry)39_entry_tag = db.Table('_test_subscription_entry_tag', db.metadata,40 db.Column('entry_id', db.Integer, db.ForeignKey(TestSubscriptionEntry.id)),41 db.Column('tag_id', db.Integer,42 db.ForeignKey('_test_subscription_tag.id')),43)44TestResourceManager.register_models(_entry_tag)45class TestSubscriptionTag(db.Model):46 __tablename__ = '_test_subscription_tag'47 manager = TestResourceManager48 id = db.Column(db.Integer, primary_key=True)49 entries = db.relationship(TestSubscriptionEntry, secondary=_entry_tag,50 backref='tags', lazy='joined')51class Other(db.Model):52 __tablename__ = '_test_subscription_other'53 manager = TestResourceManager54 id = db.Column(db.Integer, primary_key=True)55 wrapper_id = db.Column(db.ForeignKey(Wrapper.id))56 wrapper = db.relationship(Wrapper)57class NotifyTrackerMixin(object):58 tracker = []59 epoch = 060 @classmethod61 def notify(cls, user, object, subjects):62 NotifyTrackerMixin.tracker.append((NotifyTrackerMixin.epoch, cls.name,63 user, object, subjects))64class NewEntrySubscriptionAction(NotifyTrackerMixin, SubscriptionAction):65 name = u'__test_new_entry'66class NewCommentSubscriptionAction(NotifyTrackerMixin, SubscriptionAction):67 name = u'__test_new_comment'68class NewOtherSubscriptionAction(SubscriptionAction):69 name = u'__test_new_other'70 @classmethod71 def notify(cls, user, subscriptions):72 raise RuntimeError('This should not have been called')73class CategorySubscriptionType(SubscriptionType):74 name = u'__test_category'75 subject_type = TestSubscriptionCategory76 object_type = TestSubscriptionEntry77 mode = u'multiple'78 actions = [u'__test_new_entry']79 get_subject = attrgetter('category')80class BlogSubscriptionType(SubscriptionType):81 name = u'__test_blog'82 subject_type = None83 object_type = TestSubscriptionEntry84 mode = u'multiple'85 actions = [u'__test_new_entry']86class BadImplementedType(SubscriptionType):87 name = u'__test_something'88 subject_type = Wrapper89 object_type = Other90 mode = u'multiple'91 actions = [u'__test_new_other']92class CommentsSubscriptionType(SubscriptionType):93 name = u'__test_comments'94 subject_type = TestSubscriptionEntry95 object_type = TestSubscriptionComment96 mode = u'sequent'97 actions = [u'__test_new_comment']98 get_subject = attrgetter('entry')99class TagSubscriptionType(SubscriptionType):100 name = u'__test_tag'101 subject_type = TestSubscriptionTag102 object_type = TestSubscriptionEntry103 mode = u'multiple'104 actions = [u'__test_new_entry']105 get_subjects = attrgetter('tags')106class TestSubscriptions(DatabaseTestCase):107 fixtures = [108 {'User': [{'username': 'one', 'email': 'one@example.com'},109 {'username': 'two', 'email': 'two@example.com'},110 {'username': 'three', 'email': 'three@example.com'},111 {'username': 'four', 'email': 'four@example.com'}]},112 {TestSubscriptionCategory: [{'id': '&c1', 'name': 'cat1'},113 {'id': '&c2', 'name': 'cat2'}]},114 {TestSubscriptionEntry: [{'&e1': {'category_id': '*c1'}},115 {'&e2': {'category_id': '*c2'}},116 {'&e3': {'category_id': '*c1'}},117 {'&e4': {'category_id': '*c1'}}]},118 {TestSubscriptionTag: [{'entries': ['*e1']}, {'entries': ['*e1', '*e2', '*e4']},119 {'entries': ['*e4']}]},120 {TestSubscriptionComment: [{'entry': '*e1'}, {'entry': '*e2'}, {'entry': '*e1'},121 {'entry': '*e1'}, {'entry': '*e1'}, {'entry': '*e2'}]}122 ]123 custom_cleanup_factories = [124 lambda: Subscription.query.all()125 ]126 def _check_sequent_state(self, user, type_name, subject_id, first_unread, count):127 """128 Check if values of `first_unread_object_id` and `count` attributes of129 Subscription with given parameters are equal to the given values.130 """131 s = Subscription.query.filter_by(user=user, type_name=type_name,132 subject_id=subject_id).one()133 eq_(s.first_unread_object_id, first_unread)134 eq_(s.count, count)135 def _check_multiple_state(self, user, type_name, subject_id, unread_object_ids, count):136 """137 Check if values of `unread_object_ids` and `count` attributes of138 Subscription with given parameters are equal to the given values.139 """140 s = Subscription.query.filter_by(user=user, type_name=type_name,141 subject_id=subject_id).one()142 eq_(s.unread_object_ids, unread_object_ids)143 eq_(s.count, count)144 def test_subscriptiontype(self):145 eq_(SubscriptionType.by_name(u'__test_comments'), CommentsSubscriptionType)146 eq_(SubscriptionType.by_object_type(TestSubscriptionComment), [CommentsSubscriptionType])147 eq_(SubscriptionType.by_subject_type(TestSubscriptionCategory), [CategorySubscriptionType])148 eq_(sorted(SubscriptionType.by_action(NewEntrySubscriptionAction)),149 sorted([CategorySubscriptionType, BlogSubscriptionType, TagSubscriptionType]))150 eq_(SubscriptionType.by_action(u'__test_new_comment'), [CommentsSubscriptionType])151 def test_subscription(self):152 one = self.data['User'][0]153 cat1, cat2 = self.data['TestSubscriptionCategory']154 eq_(Subscription.subscribe(one, u'__test_category', cat1), True)155 eq_(Subscription.subscribe(one, u'__test_category', cat1), False)156 eq_(Subscription.subscribe(one, u'__test_blog'), True)157 eq_(Subscription.subscribe(one, u'__test_blog'), False)158 s = Subscription.query.filter_by(user_id=one.id,159 type_name=u'__test_category',160 subject_id=cat1.id).one()161 eq_(s.type, CategorySubscriptionType)162 eq_(s.subject, cat1)163 s = Subscription.query.filter_by(user_id=one.id,164 type_name=u'__test_blog').one()165 eq_(s.type, BlogSubscriptionType)166 eq_(s.subject, None)167 def test_multiple_subscriptions(self):168 """169 Test (with multiple mode) whether the subscription count and the unread170 information is accurate and whether notifications are sent correctly.171 """172 cat1, cat2 = self.data['TestSubscriptionCategory']173 one, two, three, four = self.data['User']174 entries = self.data['TestSubscriptionEntry']175 NotifyTrackerMixin.tracker = []176 #: ensure this does not fail but passes silently177 Subscription.accessed(one, object=entries[0])178 Subscription.subscribe(one, u'__test_category', cat1)179 Subscription.subscribe(two, u'__test_blog')180 NotifyTrackerMixin.epoch = 1181 Subscription.new(entries[0], u'__test_new_entry')182 Subscription.new(entries[1], u'__test_new_entry')183 self._check_multiple_state(one, u'__test_category', cat1.id,184 set((entries[0].id,)), 1)185 self._check_multiple_state(two, u'__test_blog', None,186 set((entries[0].id, entries[1].id)), 2)187 NotifyTrackerMixin.epoch = 2188 Subscription.new(entries[2], u'__test_new_entry')189 Subscription.new(entries[3], u'__test_new_entry')190 self._check_multiple_state(one, u'__test_category', cat1.id,191 set((entries[0].id, entries[2].id, entries[3].id)), 3)192 self._check_multiple_state(two, u'__test_blog', None,193 set((entries[0].id, entries[1].id,194 entries[2].id, entries[3].id)), 4)195 Subscription.accessed(two, object=entries[2])196 Subscription.accessed(two, object=entries[1])197 self._check_multiple_state(two, u'__test_blog', None,198 set((entries[0].id, entries[3].id)), 2)199 Subscription.accessed(one, subject=cat1)200 Subscription.accessed(one, subject=cat1) # must work also if not unread201 self._check_multiple_state(one, u'__test_category', cat1.id,202 set(), 0)203 eq_(sorted(NotifyTrackerMixin.tracker), sorted([204 (1, u'__test_new_entry', one, entries[0], {'__test_category': [cat1]}),205 (1, u'__test_new_entry', two, entries[0], {'__test_blog': [None]}),206 (1, u'__test_new_entry', two, entries[1], {'__test_blog': [None]}),207 (2, u'__test_new_entry', one, entries[2], {'__test_category': [cat1]}),208 (2, u'__test_new_entry', one, entries[3], {'__test_category': [cat1]}),209 (2, u'__test_new_entry', two, entries[2], {'__test_blog': [None]}),210 (2, u'__test_new_entry', two, entries[3], {'__test_blog': [None]}),211 ]))212 def test_multiple_multiple_subscriptions(self):213 """214 Test (with multiple mode and a SubscriptionType where there is more215 then one subject per object) whether the subscription count and the216 unread information is accurate and whether notifications are sent217 correctly.218 """219 entries = self.data['TestSubscriptionEntry']220 cat1, cat2 = self.data['TestSubscriptionCategory']221 one, two, three, four = self.data['User']222 t1, t2, t3 = self.data['TestSubscriptionTag']223 NotifyTrackerMixin.tracker = []224 Subscription.subscribe(three, u'__test_tag', t1)225 Subscription.subscribe(three, u'__test_tag', t2)226 Subscription.subscribe(three, u'__test_category', cat2)227 Subscription.subscribe(four, u'__test_tag', t3)228 self._check_multiple_state(three, u'__test_tag', t1.id,229 set(), 0)230 self._check_multiple_state(three, u'__test_tag', t2.id,231 set(), 0)232 self._check_multiple_state(four, u'__test_tag', t3.id,233 set(), 0)234 NotifyTrackerMixin.epoch = 1235 Subscription.new(entries[0], u'__test_new_entry')236 NotifyTrackerMixin.epoch = 2237 Subscription.new(entries[2], u'__test_new_entry')238 self._check_multiple_state(three, u'__test_tag', t1.id,239 set((entries[0].id,)), 1)240 self._check_multiple_state(three, u'__test_tag', t2.id,241 set((entries[0].id,)), 1)242 NotifyTrackerMixin.epoch = 3243 Subscription.new(entries[1], u'__test_new_entry')244 self._check_multiple_state(three, u'__test_tag', t2.id,245 set((entries[0].id, entries[1].id)), 2)246 self._check_multiple_state(three, u'__test_category', cat2.id,247 set((entries[1].id,)), 1)248 Subscription.accessed(three, object=entries[1])249 Subscription.accessed(three, object=entries[1])250 self._check_multiple_state(three, u'__test_category', cat2.id,251 set(), 0)252 NotifyTrackerMixin.epoch = 4253 Subscription.new(entries[3], u'__test_new_entry')254 self._check_multiple_state(three, u'__test_tag', t1.id,255 set((entries[0].id,)), 1)256 self._check_multiple_state(three, u'__test_tag', t2.id,257 set((entries[0].id, entries[3].id)), 2)258 self._check_multiple_state(four, u'__test_tag', t3.id,259 set((entries[3].id,)), 1)260 eq_(sorted(NotifyTrackerMixin.tracker), sorted([261 (1, u'__test_new_entry', three, entries[0], {'__test_tag': [t1, t2]}),262 (3, u'__test_new_entry', three, entries[1], {'__test_tag': [t2],263 '__test_category': [cat2]}),264 (4, u'__test_new_entry', three, entries[3], {'__test_tag': [t2]}),265 (4, u'__test_new_entry', four, entries[3], {'__test_tag': [t3]}),266 ]))267 def test_sequent_subscriptions(self):268 """269 Test (with sequent mode) whether the subscription count and the unread270 information is accurate and whether notifications are sent correctly.271 """272 cat1, cat2 = self.data['TestSubscriptionCategory']273 one, two, three, four = self.data['User']274 e1, e2 = self.data['TestSubscriptionEntry'][:2]275 comments = self.data['TestSubscriptionComment']276 NotifyTrackerMixin.tracker = []277 Subscription.subscribe(three, u'__test_comments', e1)278 Subscription.subscribe(three, u'__test_comments', e2)279 Subscription.subscribe(four, u'__test_comments', e1)280 self._check_sequent_state(three, u'__test_comments', e1.id,281 None, 0)282 self._check_sequent_state(three, u'__test_comments', e2.id,283 None, 0)284 self._check_sequent_state(four, u'__test_comments', e1.id,285 None, 0)286 NotifyTrackerMixin.epoch = 1287 Subscription.new(comments[0], u'__test_new_comment') # e1288 Subscription.new(comments[1], u'__test_new_comment') # e2289 self._check_sequent_state(three, u'__test_comments', e1.id,290 comments[0].id, 1)291 self._check_sequent_state(three, u'__test_comments', e2.id,292 comments[1].id, 1)293 self._check_sequent_state(four, u'__test_comments', e1.id,294 comments[0].id, 1)295 Subscription.accessed(three, subject=e1)296 self._check_sequent_state(three, u'__test_comments', e1.id,297 None, 0)298 NotifyTrackerMixin.epoch = 2299 Subscription.new(comments[2], u'__test_new_comment') # e1300 self._check_sequent_state(three, u'__test_comments', e1.id,301 comments[2].id, 1)302 NotifyTrackerMixin.epoch = 3303 Subscription.new(comments[3], u'__test_new_comment') # e1304 self._check_sequent_state(three, u'__test_comments', e1.id,305 comments[2].id, 2)306 self._check_sequent_state(four, u'__test_comments', e1.id,307 comments[0].id, 3)308 Subscription.accessed(four, object=comments[3])309 self._check_sequent_state(four, u'__test_comments', e1.id,310 None, 0)311 NotifyTrackerMixin.epoch = 4312 Subscription.new(comments[4], u'__test_new_comment') # e1313 Subscription.new(comments[5], u'__test_new_comment') # e2314 self._check_sequent_state(three, u'__test_comments', e1.id,315 comments[2].id, 3)316 self._check_sequent_state(three, u'__test_comments', e2.id,317 comments[1].id, 2)318 self._check_sequent_state(four, u'__test_comments', e1.id,319 comments[4].id, 1)320 eq_(sorted(NotifyTrackerMixin.tracker), sorted([321 (1, u'__test_new_comment', three, comments[0], {'__test_comments': [e1]}),322 (1, u'__test_new_comment', four, comments[0], {'__test_comments': [e1]}),323 (1, u'__test_new_comment', three, comments[1], {'__test_comments': [e2]}),324 # here three accesses entry 1325 (2, u'__test_new_comment', three, comments[2], {'__test_comments': [e1]}),326 # here four accesses entry 1327 (4, u'__test_new_comment', four, comments[4], {'__test_comments': [e1]}),328 ]))329 def check_bad_implemented_type(self):330 """check for not well implemented subscription types"""331 w1 = Wrapper()332 db.session.commit()333 assert_raises(NotImplementedError,...
test_ensure_ctxmgr.py
Source:test_ensure_ctxmgr.py
1# -*- coding: utf-8 -*-2import sure3def test_ensure_simple_assertion():4 """Test ensure simple assertion"""5 def __test_something():6 # same calculated value7 name = 'Hodor'8 with sure.ensure('the return value actually looks like: {0}', name):9 sure.expect(name).should.contain('whatever')10 # check if the test function raises the custom AssertionError11 sure.expect(__test_something).when.called_with().should.throw(AssertionError, \12 'the return value actually looks like: Hodor')13def test_ensure_just_assertion_error():14 """Test that ensure only captures AssertionErrors"""15 def __test_something():16 # same calculated value17 with sure.ensure('neverused'):18 raise Exception('This is not an AssertionError')19 # check if the test function does not override the original exception20 # if it is not an AssertionError exception21 sure.expect(__test_something).when.called_with().should.throw(Exception, \...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!