How to use shutdown_all method in localstack

Best Python code snippet using localstack_python

test_multikernelmanager.py

Source:test_multikernelmanager.py Github

copy

Full Screen

...71 cls._run_lifecycle(km)72 def test_tcp_lifecycle_with_kernel_id(self):73 km = self._get_tcp_km()74 self._run_lifecycle(km, test_kid=str(uuid.uuid4()))75 def test_shutdown_all(self):76 km = self._get_tcp_km()77 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)78 self.assertIn(kid, km)79 km.shutdown_all()80 self.assertNotIn(kid, km)81 # shutdown again is okay, because we have no kernels82 km.shutdown_all()83 def test_tcp_cinfo(self):84 km = self._get_tcp_km()85 self._run_cinfo(km, 'tcp', localhost())86 @skip_win3287 def test_ipc_lifecycle(self):88 km = self._get_ipc_km()89 self._run_lifecycle(km)90 @skip_win3291 def test_ipc_cinfo(self):92 km = self._get_ipc_km()93 self._run_cinfo(km, 'ipc', 'test')94 def test_start_sequence_tcp_kernels(self):95 """Ensure that a sequence of kernel startups doesn't break anything."""96 self._run_lifecycle(self._get_tcp_km())97 self._run_lifecycle(self._get_tcp_km())98 self._run_lifecycle(self._get_tcp_km())99 @skip_win32100 def test_start_sequence_ipc_kernels(self):101 """Ensure that a sequence of kernel startups doesn't break anything."""102 self._run_lifecycle(self._get_ipc_km())103 self._run_lifecycle(self._get_ipc_km())104 self._run_lifecycle(self._get_ipc_km())105 def tcp_lifecycle_with_loop(self):106 # Ensure each thread has an event loop107 asyncio.set_event_loop(asyncio.new_event_loop())108 self.test_tcp_lifecycle()109 def test_start_parallel_thread_kernels(self):110 self.test_tcp_lifecycle()111 thread = threading.Thread(target=self.tcp_lifecycle_with_loop)112 thread2 = threading.Thread(target=self.tcp_lifecycle_with_loop)113 try:114 thread.start()115 thread2.start()116 finally:117 thread.join()118 thread2.join()119 def test_start_parallel_process_kernels(self):120 self.test_tcp_lifecycle()121 thread = threading.Thread(target=self.tcp_lifecycle_with_loop)122 # Windows tests needs this target to be picklable:123 proc = mp.Process(target=self.test_tcp_lifecycle)124 try:125 thread.start()126 proc.start()127 finally:128 thread.join()129 proc.join()130 assert proc.exitcode == 0131class TestAsyncKernelManager(AsyncTestCase):132 # static so picklable for multiprocessing on Windows133 @staticmethod134 def _get_tcp_km():135 c = Config()136 km = AsyncMultiKernelManager(config=c)137 return km138 # static so picklable for multiprocessing on Windows139 @staticmethod140 def _get_ipc_km():141 c = Config()142 c.KernelManager.transport = 'ipc'143 c.KernelManager.ip = 'test'144 km = AsyncMultiKernelManager(config=c)145 return km146 # static so picklable for multiprocessing on Windows147 @staticmethod148 async def _run_lifecycle(km, test_kid=None):149 if test_kid:150 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE, kernel_id=test_kid)151 assert kid == test_kid152 else:153 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)154 assert await km.is_alive(kid)155 assert kid in km156 assert kid in km.list_kernel_ids()157 assert len(km) == 1, f'{len(km)} != {1}'158 await km.restart_kernel(kid, now=True)159 assert await km.is_alive(kid)160 assert kid in km.list_kernel_ids()161 await km.interrupt_kernel(kid)162 k = km.get_kernel(kid)163 assert isinstance(k, KernelManager)164 await km.shutdown_kernel(kid, now=True)165 assert kid not in km, f'{kid} not in {km}'166 async def _run_cinfo(self, km, transport, ip):167 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)168 k = km.get_kernel(kid)169 cinfo = km.get_connection_info(kid)170 self.assertEqual(transport, cinfo['transport'])171 self.assertEqual(ip, cinfo['ip'])172 self.assertTrue('stdin_port' in cinfo)173 self.assertTrue('iopub_port' in cinfo)174 stream = km.connect_iopub(kid)175 stream.close()176 self.assertTrue('shell_port' in cinfo)177 stream = km.connect_shell(kid)178 stream.close()179 self.assertTrue('hb_port' in cinfo)180 stream = km.connect_hb(kid)181 stream.close()182 await km.shutdown_kernel(kid, now=True)183 self.assertNotIn(kid, km)184 @gen_test185 async def test_tcp_lifecycle(self):186 await self.raw_tcp_lifecycle()187 @gen_test188 async def test_tcp_lifecycle_with_kernel_id(self):189 await self.raw_tcp_lifecycle(test_kid=str(uuid.uuid4()))190 @gen_test191 async def test_shutdown_all(self):192 km = self._get_tcp_km()193 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)194 self.assertIn(kid, km)195 await km.shutdown_all()196 self.assertNotIn(kid, km)197 # shutdown again is okay, because we have no kernels198 await km.shutdown_all()199 @gen_test(timeout=20)200 async def test_use_after_shutdown_all(self):201 km = self._get_tcp_km()202 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)203 self.assertIn(kid, km)204 await km.shutdown_all()205 self.assertNotIn(kid, km)206 # Start another kernel207 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)208 self.assertIn(kid, km)209 await km.shutdown_all()210 self.assertNotIn(kid, km)211 # shutdown again is okay, because we have no kernels212 await km.shutdown_all()213 @gen_test(timeout=20)214 async def test_shutdown_all_while_starting(self):215 km = self._get_tcp_km()216 kid_future = asyncio.ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE))217 # This is relying on the ordering of the asyncio queue, not sure if guaranteed or not:218 kid, _ = await asyncio.gather(kid_future, km.shutdown_all())219 self.assertNotIn(kid, km)220 # Start another kernel221 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)222 self.assertIn(kid, km)223 self.assertEqual(len(km), 1)224 await km.shutdown_all()225 self.assertNotIn(kid, km)226 # shutdown again is okay, because we have no kernels227 await km.shutdown_all()228 @gen_test229 async def test_tcp_cinfo(self):230 km = self._get_tcp_km()231 await self._run_cinfo(km, 'tcp', localhost())232 @skip_win32233 @gen_test234 async def test_ipc_lifecycle(self):235 km = self._get_ipc_km()236 await self._run_lifecycle(km)237 @skip_win32238 @gen_test239 async def test_ipc_cinfo(self):240 km = self._get_ipc_km()241 await self._run_cinfo(km, 'ipc', 'test')...

Full Screen

Full Screen

test_smm.py

Source:test_smm.py Github

copy

Full Screen

...37 """38 manager = DlgSharedMemoryManager()39 manager.register_session("session1")40 self.assertTrue(len(manager.drop_names), 1)41 manager.shutdown_all()42 def test_register_drop(self):43 """44 SMM should register a drop with a pre-existing session45 """46 manager = DlgSharedMemoryManager()47 manager.register_session("session1")48 manager.register_drop("A", "session1")49 self.assertTrue(len(manager.drop_names["session1"]), 1)50 manager.shutdown_all()51 def test_register_drop_nonsession(self):52 """53 SMM should register the session for a drop if it did not already exist54 """55 manager = DlgSharedMemoryManager()56 manager.register_drop("A", "session1")57 self.assertTrue((len(manager.drop_names["session1"]), 1))58 manager.shutdown_all()59 def test_shutdown_session_existing(self):60 """61 SMM should successfully remove a session when requested62 """63 manager = DlgSharedMemoryManager()64 manager.register_session("session1")65 manager.register_session("session2")66 self.assertTrue(len(manager.drop_names.keys()), 2)67 manager.shutdown_session("session1")68 self.assertTrue(len(manager.drop_names.keys()), 1)69 manager.shutdown_all()70 def test_shutdown_session_nonexistent(self):71 """72 SMM should not error when attempting to shutdown a non-existent session73 """74 manager = DlgSharedMemoryManager()75 try:76 manager.shutdown_session("session1")77 except KeyError:78 self.fail("Manager errored when shutting down nonexistent session")79 def test_shutdown_all(self):80 """81 SMM should be able to remove all sessions and drop references when shutdown82 """83 manager = DlgSharedMemoryManager()84 manager.register_session("session1")85 manager.register_session("session2")86 self.assertEqual(len(manager.drop_names.keys()), 2)87 manager.shutdown_all()88 self.assertEqual(len(manager.drop_names.keys()), 0)89 def test_shutdown_all_nosessions(self):90 """91 SMM should successfully shutdown without error when no sessions or drops are present92 """93 manager = DlgSharedMemoryManager()94 try:95 manager.shutdown_all()96 except KeyError:...

Full Screen

Full Screen

run.py

Source:run.py Github

copy

Full Screen

...5import ananas.default6# Add the cwd to the module search path so that we can load user bot classes7sys.path.append(os.getcwd())8bots = []9def shutdown_all(signum, frame):10 for bot in bots:11 if bot.state == PineappleBot.RUNNING: bot.shutdown()12 sys.exit("Shutdown complete")13def main():14 parser = argparse.ArgumentParser(description="Pineapple command line interface.", prog="ananas")15 parser.add_argument("config", help="A cfg file to read bot configuration from.")16 parser.add_argument("-v", "--verbose", action="store_true", help="Log more extensive messages for e.g. debugging purposes.")17 parser.add_argument("-i", "--interactive", action="store_true", help="Use interactive prompts for e.g. mastodon login")18 args = parser.parse_args()19 prog = sys.argv[0]20 cfg = configparser.ConfigParser()21 try: cfg.read(args.config)22 except FileNotFoundError:23 sys.exit("Couldn't open '{}', exiting.".format(args.config))24 for bot in cfg:25 if bot == "DEFAULT": continue26 if not "class" in cfg[bot]:27 print("{}: no class specified, skipping {}.".format(prog, bot))28 continue29 botclass = cfg[bot]["class"]30 module, _, botclass = botclass.rpartition(".")31 if module == "":32 print("{}: no module given in class name '{}', skipping {}.".format(prog, botclass, bot))33 try:34 exec("from {0} import {1}; bots.append({1}('{2}', name='{3}', interactive={4}, verbose={5}))"35 .format(module, botclass, args.config, bot, args.interactive, args.verbose))36 except ModuleNotFoundError as e:37 print("{}: encountered the following error loading module {}:".format(prog, module))38 print("{}: the error was: {}".format(prog, e))39 print("{}: skipping {}!".format(prog, bot))40 continue41 except Exception as e:42 print("{}: fatal exception loading bot {}: {}\n{}".format(prog, bot, repr(e), traceback.format_exc()))43 continue44 except KeyboardInterrupt:45 sys.exit()46 signal.signal(signal.SIGINT, shutdown_all)47 signal.signal(signal.SIGABRT, shutdown_all)48 signal.signal(signal.SIGTERM, shutdown_all)49 try:50 while(True): time.sleep(60)51 except KeyboardInterrupt:52 shutdown_all(None, None)53if __name__ == "__main__":...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful