How to use _on_complete method in autotest

Best Python code snippet using autotest_python

test_11_con.py

Source:test_11_con.py Github

copy

Full Screen

...26 self.progress_counter = 027 self.exit_counter = 3 # we expect 3 complete cb calls28 self.test_url = 'http://www.google.com'29 self.received_data = []30 def _on_complete(event, add=1):31 self.assertIsInstance(event, ecore_con.EventUrlComplete)32 self.assertEqual(event.status, 200) # assume net is ok33 self.assertEqual(event.url.url, self.test_url)34 self.complete_counter += add35 self.exit_counter -= 136 if self.exit_counter == 0:37 ecore.main_loop_quit()38 def _on_progress(event, param1, two, one):39 self.assertIsInstance(event, ecore_con.EventUrlProgress)40 self.assertEqual(param1, 'param1')41 self.assertEqual(one, 1)42 self.assertEqual(two, 2)43 self.progress_counter += 144 def _on_data(event):45 self.assertIsInstance(event, ecore_con.EventUrlData)46 self.received_data.append(event.data)47 u = ecore_con.Url(self.test_url)48 self.assertIsInstance(u, ecore_con.Url)49 u.on_complete_event_add(_on_complete, add=100)50 u.on_complete_event_add(_on_complete, add=10)51 u.on_complete_event_del(_on_complete, add=100) #test event_del52 u.on_complete_event_add(_on_complete, add=1)53 u.on_complete_event_add(_on_complete, add=5)54 u.on_progress_event_add(_on_progress, 'param1', one=1, two=2)55 u.on_data_event_add(_on_data)56 self.assertTrue(u.get()) #perform the GET request57 t = ecore.Timer(TIMEOUT, ecore.main_loop_quit)58 ecore.main_loop_begin()59 t.delete()60 self.assertEqual(u.status_code, 200) # assume net is ok61 self.assertEqual(self.complete_counter, 16)62 self.assertTrue(self.progress_counter > 0)63 data = b''.join(self.received_data)64 self.assertEqual(len(data), u.received_bytes)65 u.delete()66 def testUrlDelete(self):67 self.test_url1 = 'http://www.facebook.com'68 self.test_url2 = 'http://www.google.com'69 self.complete_counter = 070 def _on_complete1(event):71 self.assertIsInstance(event, ecore_con.EventUrlComplete)72 self.assertEqual(event.url.url, self.test_url1)73 self.complete_counter += 174 if self.complete_counter >= 11:75 ecore.main_loop_quit()76 def _on_complete2(event):77 self.assertIsInstance(event, ecore_con.EventUrlComplete)78 self.assertEqual(event.url.url, self.test_url2)79 self.complete_counter += 1080 if self.complete_counter >= 11:81 ecore.main_loop_quit()82 83 u1 = ecore_con.Url(self.test_url1)84 u1.on_complete_event_add(_on_complete1)85 u2 = ecore_con.Url(self.test_url2)86 u2.on_complete_event_add(_on_complete2)87 u3 = ecore_con.Url(self.test_url1)88 u3.on_complete_event_add(_on_complete1)89 u3.get() 90 u3.delete() # the previous get will not run91 self.assertTrue(u1.get()) #perform the GET request92 self.assertTrue(u2.get()) #perform the GET request93 t = ecore.Timer(TIMEOUT, ecore.main_loop_quit)94 ecore.main_loop_begin()95 t.delete()96 self.assertEqual(u1.status_code, 200) # assume net is ok97 self.assertEqual(u2.status_code, 200) # assume net is ok98 self.assertEqual(self.complete_counter, 11)99 u1.delete()100 u2.delete()101 def testUrlToFile(self):102 self.test_url = 'http://www.google.com'103 self.complete = False104 def _on_complete(event):105 self.complete = True106 ecore.main_loop_quit()107 fd, path = tempfile.mkstemp()108 u = ecore_con.Url(self.test_url, fd=fd)109 u.on_complete_event_add(_on_complete)110 u.get()111 t = ecore.Timer(TIMEOUT, ecore.main_loop_quit)112 ecore.main_loop_begin()113 t.delete()114 self.assertEqual(self.complete, True)115 self.assertEqual(u.status_code, 200) # assume net is ok116 self.assertEqual(os.path.getsize(path), u.received_bytes)117 os.unlink(path)118 u.delete()119 # def testFtp(self):120 # pass #TODO121 def testPost(self):122 self.test_url = 'https://httpbin.org/post'123 self.data_to_post = b'my data to post'124 def _on_complete(event):125 ecore.main_loop_quit()126 u = ecore_con.Url(self.test_url)127 u.on_complete_event_add(_on_complete)128 # u.post(self.data_to_post, "multipart/form-data")129 u.post(self.data_to_post, "text/txt")130 t = ecore.Timer(TIMEOUT, ecore.main_loop_quit)131 ecore.main_loop_begin()132 t.delete()133 self.assertEqual(u.status_code, 200) # assume net is ok134 u.delete()135if __name__ == '__main__':136 formatter = logging.Formatter("[%(levelname)s] %(name)s (%(filename)s: %(lineno)d) --- %(message)s")137 handler = logging.StreamHandler()138 handler.setFormatter(formatter)...

Full Screen

Full Screen

route_setup.py

Source:route_setup.py Github

copy

Full Screen

...26 try:27 task, name = self._task_iter.__next__()28 self._process_task(task, name)29 except StopIteration:30 self._on_complete()31 def _process_task(self, element, name):32 if element == "exchange":33 self._setup_exchange(name)34 elif element == "queue":35 self._setup_queue(name)36 else:37 raise NotImplementedError("")38 def _setup_exchange(self, exc, exc_type="direct"):39 self._channel.exchange_declare(self._next_task, exc, exc_type)40 def _setup_queue(self, queue, durable=True, passive=False):41 self._channel.queue_declare(self._next_task, queue, durable=durable, passive=passive)42 def setup(self):...

Full Screen

Full Screen

preprocess.py

Source:preprocess.py Github

copy

Full Screen

...7 def process(self):8 self._on_start()9 print("Start processing data for task '{}'.".format(self._get_dataset_node().task))10 self._on_next(**self._get_dataset_node().__dict__)11 self._on_complete()12 def _on_start(self):13 pass14 def _on_next(self, src: str, dst: str, task: str):15 pass16 def _on_complete(self):17 pass18 def load_processed(self):19 self.dataset = self._load_dataset()20 def _load_dataset(self):21 raise NotImplementedError22 def check(self):23 if not Path(self._get_dataset_node().dst).is_dir():24 raise FileNotFoundError("Processed task '{}' does not exist".format(task))25class BasicPreprocessor(AbstractPreprocessor):26 def finished(self) -> bool:27 return Path(self._get_dataset_node().dst).is_dir()28 def _load_dataset(self):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful