Best Python code snippet using slash
jupyter_kernel.py
Source:jupyter_kernel.py
...629 #630 # now a couple of things are connected, call the session_cleanup_callback631 #632 if self.task_cnt > 1 and self.session_cleanup_callback:633 self.session_cleanup_callback()634 self.session_cleanup_callback = None635 elif msg[0] == "unregister":636 if msg[1] in self.tasks:637 self.tasks[msg[1]].discard(msg[2])638 self.task_cnt -= 1639 #640 # if there are no connection tasks left, then shutdown the kernel641 #642 if self.task_cnt == 0 and self.task_cnt_max >= 4:643 asyncio.create_task(self.session_shutdown())644 await asyncio.sleep(10000)645 elif msg[0] == "shutdown":646 asyncio.create_task(self.session_shutdown())647 await asyncio.sleep(10000)648 except asyncio.CancelledError:649 raise650 except Exception:651 _LOGGER.error("housekeep task exception: %s", traceback.format_exc(-1))652 async def startup_timeout(self):653 """Shut down the session if nothing connects after 30 seconds."""654 await self.housekeep_q.put(["register", "startup_timeout", asyncio.current_task()])655 await asyncio.sleep(self.no_connect_timeout)656 if self.task_cnt_max <= 1:657 #658 # nothing started other than us, so shut down the session659 #660 _LOGGER.error("No connections to session %s; shutting down", self.global_ctx_name)661 if self.session_cleanup_callback:662 self.session_cleanup_callback()663 self.session_cleanup_callback = None664 await self.housekeep_q.put(["shutdown"])665 await self.housekeep_q.put(["unregister", "startup_timeout", asyncio.current_task()])666 async def start_one_server(self, callback):667 """Start a server by finding an available port."""668 first_port = self.avail_port669 for _ in range(2048):670 try:671 server = await asyncio.start_server(callback, "0.0.0.0", self.avail_port)672 return server, self.avail_port673 except OSError:674 self.avail_port += 1675 _LOGGER.error(676 "unable to find an available port from %d to %d", first_port, self.avail_port - 1,677 )678 return None, None679 def get_ports(self):680 """Return a dict of the port numbers this kernel session is listening to."""681 return {682 "iopub_port": self.iopub_port,683 "hb_port": self.heartbeat_port,684 "control_port": self.control_port,685 "stdin_port": self.stdin_port,686 "shell_port": self.shell_port,687 }688 def set_session_cleanup_callback(self, callback):689 """Set a cleanup callback which is called right after the session has started."""690 self.session_cleanup_callback = callback691 async def session_start(self):692 """Start the kernel session."""693 self.ast_ctx.add_logger_handler(self.console)694 _LOGGER.info("Starting session %s", self.global_ctx_name)695 self.tasks["housekeep"] = {asyncio.create_task(self.housekeep_run())}696 self.tasks["startup_timeout"] = {asyncio.create_task(self.startup_timeout())}697 self.iopub_server, self.iopub_port = await self.start_one_server(self.iopub_listen)698 self.heartbeat_server, self.heartbeat_port = await self.start_one_server(self.heartbeat_listen)699 self.control_server, self.control_port = await self.start_one_server(self.control_listen)700 self.stdin_server, self.stdin_port = await self.start_one_server(self.stdin_listen)701 self.shell_server, self.shell_port = await self.start_one_server(self.shell_listen)702 #...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!