How to use spawn_task method in avocado

Best Python code snippet using avocado_python

test_tasks.py

Source:test_tasks.py Github

copy

Full Screen

...56 def test_http_error(self, status_code):57 with patch.object(tasks.lms, self.mocked_get_enrollments_method) as mock_get_enrollments:58 error = HTTPError(request=FakeRequest('registrar.edx.org'), response=FakeResponse(status_code))59 mock_get_enrollments.side_effect = error60 task = self.spawn_task() # pylint: disable=assignment-from-no-return61 task.wait()62 expected_msg = f"HTTP error {status_code} when getting enrollments at registrar.edx.org"63 self.assert_failed(expected_msg)64 def test_invalid_data(self):65 with patch.object(tasks.lms, self.mocked_get_enrollments_method) as mock_get_enrollments:66 mock_get_enrollments.side_effect = ValidationError()67 task = self.spawn_task() # pylint: disable=assignment-from-no-return68 task.wait()69 self.assert_failed("Invalid enrollment data from LMS")70 def test_invalid_format(self):71 with patch.object(tasks.lms, self.mocked_get_enrollments_method) as mock_get_enrollments:72 mock_get_enrollments.return_value = self.enrollment_data73 exception_raised = False74 try:75 task = self.spawn_task(file_format='invalid-format') # pylint: disable=assignment-from-no-return76 task.wait()77 except ValueError as e:78 self.assertIn('Invalid file_format', str(e))79 exception_raised = True80 self.assertTrue(exception_raised)81@patch_discovery_program_details({})82class ListProgramEnrollmentTaskTests(ListEnrollmentTaskTestMixin, TestCase):83 """ Tests for task error behavior. """84 enrollment_statuses = (85 PROGRAM_ENROLLMENT_ENROLLED,86 PROGRAM_ENROLLMENT_PENDING,87 )88 mocked_get_enrollments_method = 'get_program_enrollments'89 def spawn_task(self, program_key=None, **kwargs):90 return tasks.list_program_enrollments.apply_async(91 (92 self.job_id,93 self.user.id,94 kwargs.get('file_format', 'json'),95 program_key or self.program.key,96 ),97 task_id=self.job_id98 )99@patch_discovery_program_details({})100class ListCourseRunEnrollmentTaskTests(ListEnrollmentTaskTestMixin, TestCase):101 """ Tests for task error behavior. """102 enrollment_statuses = (103 COURSE_ENROLLMENT_ACTIVE,104 COURSE_ENROLLMENT_INACTIVE,105 )106 mocked_get_enrollments_method = 'get_course_run_enrollments'107 internal_course_key = 'course-1'108 external_course_key = 'external_course_key'109 @classmethod110 def setUpTestData(cls):111 super().setUpTestData()112 for enrollment in cls.enrollment_data:113 enrollment['course_id'] = cls.external_course_key114 def spawn_task(self, program_key=None, **kwargs):115 return tasks.list_course_run_enrollments.apply_async(116 (117 self.job_id,118 self.user.id,119 kwargs.get('file_format', 'json'),120 program_key or self.program.key,121 self.internal_course_key,122 self.external_course_key,123 ),124 task_id=self.job_id125 )126@patch_discovery_program_details({127 'curricula': [{128 'is_active': True,129 'courses': [{130 'course_runs': [{131 'key': 'course-key',132 'external_key': 'external-key',133 'title': 'title',134 'marketing_url': 'www',135 }],136 }],137 }],138})139class ListAllCourseRunEnrollmentTaskTests(ListEnrollmentTaskTestMixin, TestCase):140 """ Tests for task error behavior. """141 enrollment_statuses = (142 COURSE_ENROLLMENT_ACTIVE,143 COURSE_ENROLLMENT_INACTIVE,144 )145 mocked_get_enrollments_method = 'get_course_run_enrollments'146 course_key = 'course-1'147 external_course_key = 'external_course_key'148 default_course_run = {'key': 'course-key', 'external_key': 'external-key'}149 @classmethod150 def setUpTestData(cls):151 super().setUpTestData()152 for enrollment in cls.enrollment_data:153 enrollment['course_id'] = cls.external_course_key154 def spawn_task(self, program_key=None, **kwargs):155 return tasks.list_all_course_run_enrollments.apply_async(156 (157 self.job_id,158 self.user.id,159 kwargs.get('file_format', 'json'),160 program_key or self.program.key,161 ),162 task_id=self.job_id163 )164class WriteEnrollmentTaskTestMixin(BaseTaskTestMixin, S3MockEnvVarsMixin):165 """166 Tests common for both program and course-run enrollment writing tasks.167 """168 json_filepath = "testfile.csv"169 mock_function = None # Override in subclass170 expected_fieldnames = None # Override in subclass171 @classmethod172 def setUpClass(cls):173 # This is unfortunately duplicated from:174 # registrar.apps.api.v1.tests.test_views:S3MockMixin.175 # It would be ideal to move that mixin to a utilities file and re-use176 # it here, but moto seems to have a bug/"feature" where it only works177 # in modules that explicitly import it.178 super().setUpClass()179 cls._s3_mock = moto.mock_s3()180 cls._s3_mock.start()181 conn = boto3.resource('s3')182 conn.create_bucket(Bucket=settings.REGISTRAR_BUCKET)183 @classmethod184 def tearDownClass(cls):185 cls._s3_mock.stop()186 super().tearDownClass()187 def tearDown(self):188 super().tearDown()189 uploads_filestore.delete(self.json_filepath)190 def mock_write_enrollments(self, any_successes, any_failures):191 """192 Creates a mock function that returns results normally returned193 by an enrollment-writing function.194 """195 raise NotImplementedError() # pragma: no cover196 def test_empty_list_file(self):197 uploads_filestore.store(self.json_filepath, "[]")198 with patch.object(199 tasks.lms,200 self.mock_function,201 new=self.mock_write_enrollments(False, False),202 ):203 self.spawn_task().wait()204 self.assert_succeeded(','.join(self.expected_fieldnames) + "\r\n", "204")205 def test_no_such_file(self):206 self.spawn_task().wait()207 self.assert_failed(208 f"Enrollment file for program_key={self.program.key} not found at {self.json_filepath}"209 )210 def test_file_not_json(self):211 uploads_filestore.store(self.json_filepath, "this is not valid json")212 self.spawn_task().wait()213 self.assert_failed(214 f"Enrollment file for program_key={self.program.key} at {self.json_filepath} is not valid JSON"215 )216@ddt.ddt217@patch_discovery_program_details({})218class WriteProgramEnrollmentTaskTests(WriteEnrollmentTaskTestMixin, TestCase):219 """220 Tests for write_program_enrollments task.221 """222 mock_function = 'write_program_enrollments'223 expected_fieldnames = ('student_key', 'status')224 def spawn_task(self, program_key=None, **kwargs):225 return tasks.write_program_enrollments.apply_async(226 (227 self.job_id,228 self.user.id,229 self.json_filepath,230 program_key or self.program.key,231 ),232 task_id=self.job_id233 )234 def mock_write_enrollments(self, any_successes, any_failures):235 """236 Create mock for data.write_program_enrollments237 Mock will return `any_successes`, `any_failures`, and `enrollments`238 echoed back as a dictionary.239 """240 def inner(_method, program_uuid, enrollments):241 """ Mock for data.write_program_enrollments. """242 self.assertIsInstance(program_uuid, UUID)243 results = OrderedDict([244 (enrollment['student_key'], enrollment['status'])245 for enrollment in enrollments246 ])247 return any_successes, any_failures, results248 return inner249 @ddt.data(250 (True, False, "200"),251 (True, True, "207"),252 (False, True, "422"),253 )254 @ddt.unpack255 def test_success(self, any_successes, any_failures, expected_code_str):256 enrolls = [257 {'student_key': 'john', 'status': 'x'},258 {'student_key': 'bob', 'status': 'y'},259 {'student_key': 'serena', 'status': 'z'},260 ]261 uploads_filestore.store(self.json_filepath, json.dumps(enrolls))262 with patch.object(263 tasks.lms,264 self.mock_function,265 new=self.mock_write_enrollments(any_successes, any_failures),266 ):267 self.spawn_task().wait()268 self.assert_succeeded(269 "student_key,status\r\n"270 "john,x\r\n"271 "bob,y\r\n"272 "serena,z\r\n",273 expected_code_str,274 )275@ddt.ddt276@patch_discovery_program_details({})277class WriteCourseRunEnrollmentTaskTests(WriteEnrollmentTaskTestMixin, TestCase):278 """279 Tests for write_course_run_enrollments task.280 """281 mock_function = 'write_course_run_enrollments'282 expected_fieldnames = ('course_id', 'student_key', 'status')283 def spawn_task(self, program_key=None, **kwargs):284 return tasks.write_course_run_enrollments.apply_async(285 (286 self.job_id,287 self.user.id,288 self.json_filepath,289 program_key or self.program.key,290 ),291 task_id=self.job_id292 )293 # pylint: disable=arguments-differ294 def mock_write_enrollments(self, any_successes, any_failures, expected_enrolls_by_course_key=None):295 """296 Create mock for data.write_course_run_enrollments.297 Mock will return `any_successes`, `any_failures`, and `enrollments`298 echoed back as a dictionary.299 """300 def inner(_method, program_uuid, course_key, enrollments):301 """ Mock for data.write_course_run_enrollments. """302 self.assertIsInstance(program_uuid, UUID)303 if expected_enrolls_by_course_key is not None: # pragma: no branch304 self.assertListEqual(enrollments, expected_enrolls_by_course_key.get(course_key))305 results = OrderedDict([306 (enrollment['student_key'], enrollment['status'])307 for enrollment in enrollments308 ])309 return any_successes, any_failures, results310 return inner311 @ddt.data(312 (True, False, "200"),313 (True, True, "207"),314 (False, True, "422"),315 )316 @ddt.unpack317 @patch.object(318 ProgramDetails, 'get_course_key', lambda _self, x: x + '-internal'319 )320 def test_success_status(self, any_successes, any_failures, expected_code_str):321 enrolls = [322 {'student_key': 'john', 'status': 'x', 'course_id': 'course-1'},323 {'student_key': 'bob', 'status': 'y', 'course_id': 'course-1'},324 {'student_key': 'serena', 'status': 'z', 'course_id': 'course-2'},325 ]326 expected_enrolls_by_course_key = {327 "course-1-internal": [328 {'status': 'x', 'student_key': 'john'},329 {'status': 'y', 'student_key': 'bob'},330 ],331 "course-2-internal": [332 {'status': 'z', 'student_key': 'serena'},333 ]334 }335 uploads_filestore.store(self.json_filepath, json.dumps(enrolls))336 with patch.object(337 tasks.lms,338 self.mock_function,339 new=self.mock_write_enrollments(any_successes, any_failures, expected_enrolls_by_course_key),340 ):341 self.spawn_task().wait()342 self.assert_succeeded(343 "course_id,student_key,status\r\n"344 "course-1,john,x\r\n"345 "course-1,bob,y\r\n"346 "course-2,serena,z\r\n",347 expected_code_str,348 )349 @patch.object(350 ProgramDetails, 'get_course_key', lambda _self, x: x + '-internal'351 )352 def test_success_status_course_staff_included(self):353 enrolls = [354 {'student_key': 'john', 'status': 'x', 'course_id': 'course-1', 'course_staff': 'TRUE'},355 {'student_key': 'bob', 'status': 'y', 'course_id': 'course-1', 'course_staff': 'FALSE'},356 {'student_key': 'serena', 'status': 'z', 'course_id': 'course-2'},357 ]358 expected_enrolls_by_course_key = {359 "course-1-internal": [360 {'status': 'x', 'course_staff': 'TRUE', 'student_key': 'john'},361 {'status': 'y', 'course_staff': 'FALSE', 'student_key': 'bob'},362 ],363 "course-2-internal": [364 {'status': 'z', 'student_key': 'serena'},365 ]366 }367 uploads_filestore.store(self.json_filepath, json.dumps(enrolls))368 with patch.object(369 tasks.lms,370 self.mock_function,371 new=self.mock_write_enrollments(True, False, expected_enrolls_by_course_key),372 ):373 self.spawn_task().wait()374 expected_code_str = '200'375 self.assert_succeeded(376 "course_id,student_key,status,course_staff\r\n"377 "course-1,john,x,TRUE\r\n"378 "course-1,bob,y,FALSE\r\n"379 "course-2,serena,z,\r\n",380 expected_code_str,381 )382 @patch.object(383 ProgramDetails, 'get_course_key', {'course-1': 'course-1-internal'}.get384 )385 @patch_discovery_program_details({})386 def test_success_invalid_course_id(self):387 enrolls = [388 {'student_key': 'john', 'status': 'x', 'course_id': 'course-1'},389 {'student_key': 'bob', 'status': 'y', 'course_id': 'course-1'},390 {'student_key': 'serena', 'status': 'z', 'course_id': 'invalid-course'},391 ]392 expected_enrolls_by_course_key = {393 "course-1-internal": [394 {'status': 'x', 'student_key': 'john'},395 {'status': 'y', 'student_key': 'bob'},396 ],397 "invalid-course-internal": [398 {'status': 'z', 'student_key': 'serena'},399 ]400 }401 uploads_filestore.store(self.json_filepath, json.dumps(enrolls))402 with patch.object(403 tasks.lms,404 self.mock_function,405 new=self.mock_write_enrollments(True, False, expected_enrolls_by_course_key),406 ):407 self.spawn_task().wait()408 self.assert_succeeded(409 "course_id,student_key,status\r\n"410 "course-1,john,x\r\n"411 "course-1,bob,y\r\n"412 "invalid-course,serena,course-not-found\r\n",413 "207",...

Full Screen

Full Screen

session.py

Source:session.py Github

copy

Full Screen

...34 self.got_socket.set()35 def unset_socket(self):36 self.socket = None37 self.got_socket.clear()38 def spawn_task(self, coro):39 task = asyncio.ensure_future(self.exceptionlogger(coro))40 self.tasks.append(task)41 return task42 async def exceptionlogger(self,coro):43 try:44 await coro45 except asyncio.CancelledError:46 pass # so we can await a task being stopped47 except:48 log.error('{}: Exception in task: {}'.format(self.session_id, traceback.format_exc()))49 async def stop_tasks(self):50 waitfor = []51 for task in self.tasks:52 if not task.done() and not task is asyncio.Task.current_task():53 task.cancel()54 waitfor.append(task)55 if waitfor:56 await asyncio.wait(waitfor) # necessary for server close57 async def send(self,text):58 socket = await self.get_socket()59 await socket.send_str(text)60 log.debug('{}: > {}'.format(self.session_id, text[:20]+'...'))61 async def get_socket(self):62 await self.got_socket.wait()63 return self.socket64 async def handle_incoming(self,text):65 log.debug('{}: < {} [IGNORED]'.format(self.session_id,text))66 async def close(self):67 if self.open:68 self.open = False69 log.debug('{}: CLOSING'.format(self.session_id))70 if self.socket is not None:71 await self.socket.close()72 self.unset_socket()73 await self.stop_tasks()74 log.info('{}: CLOSED'.format(self.session_id))75class FeedSession(BaseSession):76 def __init__(self,session_id,app):77 super().__init__(session_id)78 self.app=app79 self.info = SessionInfo()80 self.sendq = FiniteQueue()81 self.recvq = FiniteQueue()82 self.spawn_task(self.process_sendq())83 self.spawn_task(self.run_protocol())84 self.spawn_task(self.listen_info())85 self.spawn_task(self.send_passphrase())86 self.subscriberlists = defaultdict(list) # message class -> [Future]87 async def process_sendq(self):88 async for msg in self.sendq:89 while True:90 try:91 await self.send(msg)92 break93 except RuntimeError as err: #connection drop94 log.info('{}: retry sending {} because of {}'.format(self.session_id, msg, err))95 async def handle_incoming(self,text):96 # dispatch to recvq so we can react immediately on connection drop/close97 await self.recvq.put(m.Message.parse(text))98 async def schedule_send(self,msg):99 await self.sendq.put(str(msg))100 async def run_protocol(self):101 await Redis.of(self.app).publish(self.session_id, m.TTY_Opened(ttyId=self.session_id))102 await self.schedule_banner()103 async for msg in self.recvq:104 subs = self.subscriberlists[msg.__class__]105 if subs:106 log.debug('{}: < {}'.format(self.session_id, msg.__class__.__name__))107 for sub in subs:108 # NB will raise InvalidStateError if prev future was not consumed!109 sub.set_result(msg)110 else:111 log.debug('{}: No subscribers for {}'.format(self.session_id,msg.__class__.__name__))112 async def schedule_banner(self):113 for line in banner.splitlines():114 await self.schedule_send(m.Line(line))115 async def close(self):116 if self.open:117 await Redis.of(self.app).publish(self.session_id, m.TTY_Closed(ttyId=self.session_id))118 await super().close()119 async def subscribe_once(self,mclass):120 fut = asyncio.Future()121 self.subscriberlists[mclass].append(fut)122 res = await fut123 self.subscriberlists[mclass].remove(fut)124 return res125 async def listen_info(self):126 async for msg in Subscriber(self.subscriberlists[m.Info]):127 for k, v in msg.__dict__.items():128 setattr(self.info, k, v)129 log.debug('{}: info acquired'.format(self.session_id))130 async def send_passphrase(self):131 msg = await self.subscribe_once(m.Info)132 text = 'Access this terminal with the following passphrase:'133 await self.schedule_send(m.Line(text))134 await self.schedule_send(m.Line(msg.passphrase))135 async def read_line_conv(self,socket,line=True,echo=False):136 rls = BaseSession(self.session_id+'_R')137 async def copy_from_feedclient():138 try:139 if line:140 await self.schedule_send(m.ReadLine())141 lr = await self.subscribe_once(m.LineRead)142 await rls.send(lr.text)143 else:144 await self.schedule_send(m.ReadKey(echo=echo))145 kr = await self.subscribe_once(m.KeyRead)146 await rls.send(kr.key)147 except asyncio.CancelledError:148 log.info('{} closing because parent session closes:'.format(rls.session_id))149 finally:150 await asyncio.shield(rls.close()) # otherwise it will indirectly cancel itself151 copy_task = self.spawn_task(copy_from_feedclient())152 await rls.run_socket(socket)153 await rls.close()154 if not copy_task.done():155 copy_task.cancel()156 await copy_task...

Full Screen

Full Screen

inteltool.py

Source:inteltool.py Github

copy

Full Screen

...31 mr1.kills = True32 mr1.losses = True33 mr1.after = None34 print("Running Blingy Tackle Report...")35 spawn_task(mr1)36 # Capital Pilots 30 Day Profile37 pr1 = pvpreport.ProfileReport(get_new_session())38 pr1.alliance_ids = self.alliance_ids39 pr1.corporation_ids = self.corporation_ids40 pr1.groupids = self.CAPITAL_GROUP41 pr1.kills = True42 pr1.losses = False43 pr1.after = after44 print("Running Capital Pilots 30 Day Profile...")45 spawn_task(pr1)46 # Subcap Activity Kills 30 Days47 ar1 = pvpreport.ActivityReport(get_new_session())48 ar1.alliance_ids = self.alliance_ids49 ar1.corporation_ids = self.corporation_ids50 ar1.groupids = self.CAPITAL_GROUP51 ar1.groupidsinvert = True52 ar1.kills = True53 ar1.after = after54 print("Running Subcaps 30 Day Kills...")55 spawn_task(ar1)56 # Subcap Activity Loss 30 Days57 ar2 = pvpreport.ActivityReport(get_new_session())58 ar2.alliance_ids = self.alliance_ids59 ar2.corporation_ids = self.corporation_ids60 ar2.groupids = self.CAPITAL_GROUP61 ar2.groupidsinvert = True62 ar2.kills = False63 ar2.losses = True64 ar2.after = after65 spawn_task(ar2)66 # Captial Activity Kills 30 Days67 ar3 = pvpreport.ActivityReport(get_new_session())68 ar3.alliance_ids = self.alliance_ids69 ar3.corporation_ids = self.corporation_ids70 ar3.groupids = self.CAPITAL_GROUP71 ar3.groupidsinvert = False72 ar3.kills = True73 ar3.after = after74 print("Running Subcaps 30 Day Losses...")75 spawn_task(ar3)76 # CapvSub Activity Kills 30 Days77 ar4 = pvpreport.ActivityReport(get_new_session())78 ar4.alliance_ids = self.alliance_ids79 ar4.corporation_ids = self.corporation_ids80 ar4.groupids = self.CAPITAL_GROUP81 ar4.groupidsinvert = False82 ar4.againstgroup_ids = self.ALL_SHIPS83 ar4.kills = True84 ar4.after = after85 print("Running CapvSub Activity 30 Days...")86 spawn_task(ar4)87 kr1 = pvpreport.KillMailReport(get_new_session())88 kr1.alliance_ids = self.alliance_ids89 kr1.corporation_ids = self.corporation_ids90 kr1.groupids = self.CAPITAL_GROUP91 kr1.groupidsinvert = False92 kr1.againstgroup_ids = self.ALL_SHIPS93 kr1.after = after94 print("Running CapvSub Killmails 30 Days...")95 spawn_task(kr1)96 # Capital Activity Loss 30 Days97 ar5 = pvpreport.ActivityReport(get_new_session())98 ar5.alliance_ids = self.alliance_ids99 ar5.corporation_ids = self.corporation_ids100 ar5.groupids = self.CAPITAL_GROUP101 ar5.groupidsinvert = False102 ar5.kills = False103 ar5.losses = True104 ar5.after = after105 print("Running Capitals 30 Day Kills...")106 spawn_task(ar5)107 # Gang Report 30 Days108 gr1 = pvpreport.GangSizeReport(get_new_session())109 gr1.alliance_ids = self.alliance_ids110 gr1.corporation_ids = self.corporation_ids111 print("Running Gang Size Report...")112 spawn_task(gr1)113 # Wait for reports to complete114 wait_tasks()115 # Write report results to excel116 util.write_excel("%s_intel_data.xlsx" % (self.name), "Blingy Tackle", mr1.arr_result())117 util.write_excel("%s_intel_data.xlsx" % (self.name), "Capital Pilots 30 Days", pr1.arr_result())118 util.write_excel("%s_intel_data.xlsx" % (self.name), "Subcap Kill Activity 30 Days", ar1.arr_result())119 util.write_excel("%s_intel_data.xlsx" % (self.name), "Subcap Loss Activity 30 Days", ar2.arr_result())120 util.write_excel("%s_intel_data.xlsx" % (self.name), "Capital Kill Activity 30 Days", ar3.arr_result())121 util.write_excel("%s_intel_data.xlsx" % (self.name), "CapvSub Activity 30 Days", ar4.arr_result())122 util.write_excel("%s_intel_data.xlsx" % (self.name), "CapvSub Killmails 30 Days", kr1.arr_result())123 util.write_excel("%s_intel_data.xlsx" % (self.name), "Capital Loss Activity 30 Days", ar5.arr_result())...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful