Best Python code snippet using autotest_python
test_multikernelmanager.py
Source:test_multikernelmanager.py
...55 km.restart_kernel(kid, now=True)56 assert km.is_alive(kid)57 assert kid in km.list_kernel_ids()58 km.interrupt_kernel(kid)59 k = km.get_kernel(kid)60 assert isinstance(k, KernelManager)61 km.shutdown_kernel(kid, now=True)62 assert kid not in km, f"{kid} not in {km}"63 def _run_cinfo(self, km, transport, ip):64 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)65 km.get_kernel(kid)66 cinfo = km.get_connection_info(kid)67 self.assertEqual(transport, cinfo["transport"])68 self.assertEqual(ip, cinfo["ip"])69 self.assertTrue("stdin_port" in cinfo)70 self.assertTrue("iopub_port" in cinfo)71 stream = km.connect_iopub(kid)72 stream.close()73 self.assertTrue("shell_port" in cinfo)74 stream = km.connect_shell(kid)75 stream.close()76 self.assertTrue("hb_port" in cinfo)77 stream = km.connect_hb(kid)78 stream.close()79 km.shutdown_kernel(kid, now=True)80 # static so picklable for multiprocessing on Windows81 @classmethod82 def test_tcp_lifecycle(cls):83 km = cls._get_tcp_km()84 cls._run_lifecycle(km)85 def test_tcp_lifecycle_with_kernel_id(self):86 km = self._get_tcp_km()87 self._run_lifecycle(km, test_kid=str(uuid.uuid4()))88 def test_shutdown_all(self):89 km = self._get_tcp_km()90 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)91 self.assertIn(kid, km)92 km.shutdown_all()93 self.assertNotIn(kid, km)94 # shutdown again is okay, because we have no kernels95 km.shutdown_all()96 def test_tcp_cinfo(self):97 km = self._get_tcp_km()98 self._run_cinfo(km, "tcp", localhost())99 @skip_win32100 def test_ipc_lifecycle(self):101 km = self._get_ipc_km()102 self._run_lifecycle(km)103 @skip_win32104 def test_ipc_cinfo(self):105 km = self._get_ipc_km()106 self._run_cinfo(km, "ipc", "test")107 def test_start_sequence_tcp_kernels(self):108 """Ensure that a sequence of kernel startups doesn't break anything."""109 self._run_lifecycle(self._get_tcp_km())110 self._run_lifecycle(self._get_tcp_km())111 self._run_lifecycle(self._get_tcp_km())112 @skip_win32113 def test_start_sequence_ipc_kernels(self):114 """Ensure that a sequence of kernel startups doesn't break anything."""115 self._run_lifecycle(self._get_ipc_km())116 self._run_lifecycle(self._get_ipc_km())117 self._run_lifecycle(self._get_ipc_km())118 def tcp_lifecycle_with_loop(self):119 # Ensure each thread has an event loop120 asyncio.set_event_loop(asyncio.new_event_loop())121 self.test_tcp_lifecycle()122 def test_start_parallel_thread_kernels(self):123 self.test_tcp_lifecycle()124 with concurrent.futures.ThreadPoolExecutor(max_workers=2) as thread_executor:125 future1 = thread_executor.submit(self.tcp_lifecycle_with_loop)126 future2 = thread_executor.submit(self.tcp_lifecycle_with_loop)127 future1.result()128 future2.result()129 @pytest.mark.skipif(130 (sys.platform == "darwin") and (sys.version_info >= (3, 6)) and (sys.version_info < (3, 8)),131 reason='"Bad file descriptor" error',132 )133 def test_start_parallel_process_kernels(self):134 self.test_tcp_lifecycle()135 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_executor:136 future1 = thread_executor.submit(self.tcp_lifecycle_with_loop)137 with concurrent.futures.ProcessPoolExecutor(max_workers=1) as process_executor:138 # Windows tests needs this target to be picklable:139 future2 = process_executor.submit(self.test_tcp_lifecycle)140 future2.result()141 future1.result()142 def test_subclass_callables(self):143 km = self._get_tcp_km_sub()144 km.reset_counts()145 kid = km.start_kernel(stdout=PIPE, stderr=PIPE)146 assert km.call_count("start_kernel") == 1147 assert isinstance(km.get_kernel(kid), SyncKMSubclass)148 assert km.get_kernel(kid).call_count("start_kernel") == 1149 assert km.get_kernel(kid).call_count("_launch_kernel") == 1150 assert km.is_alive(kid)151 assert kid in km152 assert kid in km.list_kernel_ids()153 assert len(km) == 1, f"{len(km)} != {1}"154 km.get_kernel(kid).reset_counts()155 km.reset_counts()156 km.restart_kernel(kid, now=True)157 assert km.call_count("restart_kernel") == 1158 assert km.call_count("get_kernel") == 1159 assert km.get_kernel(kid).call_count("restart_kernel") == 1160 assert km.get_kernel(kid).call_count("shutdown_kernel") == 1161 assert km.get_kernel(kid).call_count("interrupt_kernel") == 1162 assert km.get_kernel(kid).call_count("_kill_kernel") == 1163 assert km.get_kernel(kid).call_count("cleanup_resources") == 1164 assert km.get_kernel(kid).call_count("start_kernel") == 1165 assert km.get_kernel(kid).call_count("_launch_kernel") == 1166 assert km.is_alive(kid)167 assert kid in km.list_kernel_ids()168 km.get_kernel(kid).reset_counts()169 km.reset_counts()170 km.interrupt_kernel(kid)171 assert km.call_count("interrupt_kernel") == 1172 assert km.call_count("get_kernel") == 1173 assert km.get_kernel(kid).call_count("interrupt_kernel") == 1174 km.get_kernel(kid).reset_counts()175 km.reset_counts()176 k = km.get_kernel(kid)177 assert isinstance(k, SyncKMSubclass)178 assert km.call_count("get_kernel") == 1179 km.get_kernel(kid).reset_counts()180 km.reset_counts()181 km.shutdown_all(now=True)182 assert km.call_count("shutdown_kernel") == 1183 assert km.call_count("remove_kernel") == 1184 assert km.call_count("request_shutdown") == 0185 assert km.call_count("finish_shutdown") == 0186 assert km.call_count("cleanup_resources") == 0187 assert kid not in km, f"{kid} not in {km}"188class TestAsyncKernelManager(AsyncTestCase):189 # static so picklable for multiprocessing on Windows190 @staticmethod191 def _get_tcp_km():192 c = Config()193 km = AsyncMultiKernelManager(config=c)194 return km195 @staticmethod196 def _get_tcp_km_sub():197 c = Config()198 km = AsyncMKMSubclass(config=c)199 return km200 # static so picklable for multiprocessing on Windows201 @staticmethod202 def _get_ipc_km():203 c = Config()204 c.KernelManager.transport = "ipc"205 c.KernelManager.ip = "test"206 km = AsyncMultiKernelManager(config=c)207 return km208 # static so picklable for multiprocessing on Windows209 @staticmethod210 async def _run_lifecycle(km, test_kid=None):211 if test_kid:212 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE, kernel_id=test_kid)213 assert kid == test_kid214 else:215 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)216 assert await km.is_alive(kid)217 assert kid in km218 assert kid in km.list_kernel_ids()219 assert len(km) == 1, f"{len(km)} != {1}"220 await km.restart_kernel(kid, now=True)221 assert await km.is_alive(kid)222 assert kid in km.list_kernel_ids()223 await km.interrupt_kernel(kid)224 k = km.get_kernel(kid)225 assert isinstance(k, AsyncKernelManager)226 await km.shutdown_kernel(kid, now=True)227 assert kid not in km, f"{kid} not in {km}"228 async def _run_cinfo(self, km, transport, ip):229 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)230 km.get_kernel(kid)231 cinfo = km.get_connection_info(kid)232 self.assertEqual(transport, cinfo["transport"])233 self.assertEqual(ip, cinfo["ip"])234 self.assertTrue("stdin_port" in cinfo)235 self.assertTrue("iopub_port" in cinfo)236 stream = km.connect_iopub(kid)237 stream.close()238 self.assertTrue("shell_port" in cinfo)239 stream = km.connect_shell(kid)240 stream.close()241 self.assertTrue("hb_port" in cinfo)242 stream = km.connect_hb(kid)243 stream.close()244 await km.shutdown_kernel(kid, now=True)245 self.assertNotIn(kid, km)246 @gen_test247 async def test_tcp_lifecycle(self):248 await self.raw_tcp_lifecycle()249 @gen_test250 async def test_tcp_lifecycle_with_kernel_id(self):251 await self.raw_tcp_lifecycle(test_kid=str(uuid.uuid4()))252 @gen_test253 async def test_shutdown_all(self):254 km = self._get_tcp_km()255 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)256 self.assertIn(kid, km)257 await km.shutdown_all()258 self.assertNotIn(kid, km)259 # shutdown again is okay, because we have no kernels260 await km.shutdown_all()261 @gen_test(timeout=20)262 async def test_use_after_shutdown_all(self):263 km = self._get_tcp_km()264 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)265 self.assertIn(kid, km)266 await km.shutdown_all()267 self.assertNotIn(kid, km)268 # Start another kernel269 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)270 self.assertIn(kid, km)271 await km.shutdown_all()272 self.assertNotIn(kid, km)273 # shutdown again is okay, because we have no kernels274 await km.shutdown_all()275 @gen_test(timeout=20)276 async def test_shutdown_all_while_starting(self):277 km = self._get_tcp_km()278 kid_future = asyncio.ensure_future(km.start_kernel(stdout=PIPE, stderr=PIPE))279 # This is relying on the ordering of the asyncio queue, not sure if guaranteed or not:280 kid, _ = await asyncio.gather(kid_future, km.shutdown_all())281 self.assertNotIn(kid, km)282 # Start another kernel283 kid = await km.start_kernel(stdout=PIPE, stderr=PIPE)284 self.assertIn(kid, km)285 self.assertEqual(len(km), 1)286 await km.shutdown_all()287 self.assertNotIn(kid, km)288 # shutdown again is okay, because we have no kernels289 await km.shutdown_all()290 @gen_test291 async def test_tcp_cinfo(self):292 km = self._get_tcp_km()293 await self._run_cinfo(km, "tcp", localhost())294 @skip_win32295 @gen_test296 async def test_ipc_lifecycle(self):297 km = self._get_ipc_km()298 await self._run_lifecycle(km)299 @skip_win32300 @gen_test301 async def test_ipc_cinfo(self):302 km = self._get_ipc_km()303 await self._run_cinfo(km, "ipc", "test")304 @gen_test305 async def test_start_sequence_tcp_kernels(self):306 """Ensure that a sequence of kernel startups doesn't break anything."""307 await self._run_lifecycle(self._get_tcp_km())308 await self._run_lifecycle(self._get_tcp_km())309 await self._run_lifecycle(self._get_tcp_km())310 @skip_win32311 @gen_test312 async def test_start_sequence_ipc_kernels(self):313 """Ensure that a sequence of kernel startups doesn't break anything."""314 await self._run_lifecycle(self._get_ipc_km())315 await self._run_lifecycle(self._get_ipc_km())316 await self._run_lifecycle(self._get_ipc_km())317 def tcp_lifecycle_with_loop(self):318 # Ensure each thread has an event loop319 asyncio.set_event_loop(asyncio.new_event_loop())320 asyncio.get_event_loop().run_until_complete(self.raw_tcp_lifecycle())321 # static so picklable for multiprocessing on Windows322 @classmethod323 async def raw_tcp_lifecycle(cls, test_kid=None):324 # Since @gen_test creates an event loop, we need a raw form of325 # test_tcp_lifecycle that assumes the loop already exists.326 km = cls._get_tcp_km()327 await cls._run_lifecycle(km, test_kid=test_kid)328 # static so picklable for multiprocessing on Windows329 @classmethod330 def raw_tcp_lifecycle_sync(cls, test_kid=None):331 loop = asyncio.get_event_loop()332 if loop.is_running():333 # Forked MP, make new loop334 loop = asyncio.new_event_loop()335 asyncio.set_event_loop(loop)336 loop.run_until_complete(cls.raw_tcp_lifecycle(test_kid=test_kid))337 @gen_test338 async def test_start_parallel_thread_kernels(self):339 await self.raw_tcp_lifecycle()340 with concurrent.futures.ThreadPoolExecutor(max_workers=2) as thread_executor:341 future1 = thread_executor.submit(self.tcp_lifecycle_with_loop)342 future2 = thread_executor.submit(self.tcp_lifecycle_with_loop)343 future1.result()344 future2.result()345 @gen_test346 async def test_start_parallel_process_kernels(self):347 await self.raw_tcp_lifecycle()348 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread_executor:349 future1 = thread_executor.submit(self.tcp_lifecycle_with_loop)350 with concurrent.futures.ProcessPoolExecutor(max_workers=1) as process_executor:351 # Windows tests needs this target to be picklable:352 future2 = process_executor.submit(self.raw_tcp_lifecycle_sync)353 future2.result()354 future1.result()355 @gen_test356 async def test_subclass_callables(self):357 mkm = self._get_tcp_km_sub()358 mkm.reset_counts()359 kid = await mkm.start_kernel(stdout=PIPE, stderr=PIPE)360 assert mkm.call_count("start_kernel") == 1361 assert isinstance(mkm.get_kernel(kid), AsyncKMSubclass)362 assert mkm.get_kernel(kid).call_count("start_kernel") == 1363 assert mkm.get_kernel(kid).call_count("_launch_kernel") == 1364 assert await mkm.is_alive(kid)365 assert kid in mkm366 assert kid in mkm.list_kernel_ids()367 assert len(mkm) == 1, f"{len(mkm)} != {1}"368 mkm.get_kernel(kid).reset_counts()369 mkm.reset_counts()370 await mkm.restart_kernel(kid, now=True)371 assert mkm.call_count("restart_kernel") == 1372 assert mkm.call_count("get_kernel") == 1373 assert mkm.get_kernel(kid).call_count("restart_kernel") == 1374 assert mkm.get_kernel(kid).call_count("shutdown_kernel") == 1375 assert mkm.get_kernel(kid).call_count("interrupt_kernel") == 1376 assert mkm.get_kernel(kid).call_count("_kill_kernel") == 1377 assert mkm.get_kernel(kid).call_count("cleanup_resources") == 1378 assert mkm.get_kernel(kid).call_count("start_kernel") == 1379 assert mkm.get_kernel(kid).call_count("_launch_kernel") == 1380 assert await mkm.is_alive(kid)381 assert kid in mkm.list_kernel_ids()382 mkm.get_kernel(kid).reset_counts()383 mkm.reset_counts()384 await mkm.interrupt_kernel(kid)385 assert mkm.call_count("interrupt_kernel") == 1386 assert mkm.call_count("get_kernel") == 1387 assert mkm.get_kernel(kid).call_count("interrupt_kernel") == 1388 mkm.get_kernel(kid).reset_counts()389 mkm.reset_counts()390 k = mkm.get_kernel(kid)391 assert isinstance(k, AsyncKMSubclass)392 assert mkm.call_count("get_kernel") == 1393 mkm.get_kernel(kid).reset_counts()394 mkm.reset_counts()395 await mkm.shutdown_all(now=True)396 assert mkm.call_count("shutdown_kernel") == 1397 assert mkm.call_count("remove_kernel") == 1398 assert mkm.call_count("request_shutdown") == 0399 assert mkm.call_count("finish_shutdown") == 0400 assert mkm.call_count("cleanup_resources") == 0...
test_kernels_v1.py
Source:test_kernels_v1.py
...25 # self.kernel = class_kernel(M=self.M).k # Linear26 # PPRBF Kernel27 # self.kernel = class_kernel(X=self.X, Ml=self.Ml).k2829 # self.kx, self.gkx1 = get_kernel(X=self.X, Ml=self.Ml, get_kx=True, get_gkx1=True)30 #31 self.kx, self.gkx1, self.gkx2, self.tr_hesskx11, self.hesskx11, self.hesskx12 = \32 get_kernel(X=self.X,33 M=self.M,34 get_kx=True,35 get_gkx1=True,36 get_gkx2=True,37 get_tr_hesskx11=True,38 get_hesskx11=True,39 get_hesskx12=True)4041 def test_kx(self):42 for m, n in itertools.product(range(self.m), range(self.m)):43 assert np.allclose(self.kx[m, n], self.kernel(self.X[m], self.X[n]))4445 def test_gkx1(self):46 gkx1_num = nd.Jacobian(self.kernel)
...
Kernels.py
Source:Kernels.py
...10 11 @property12 def n(self):13 return self._n14 def get_kernel(self, Xval):15 pass16class Polynomial(Kernels):17 def __init__(self, X, Y, bw=0, deg=0):18 super().__init__(X,Y,bw,deg)19 self.Ktr = self.get_kernel(self._Xtr)20 21 def get_kernel(self, Xval):22 return np.power( 1 + np.dot(Xval, self._Xtr.T), self._deg)23 24class Linear(Kernels):25 def __init__(self, X, Y, bw=0, deg=0):26 super().__init__(X,Y,bw,deg)27 self.Ktr = self.get_kernel(self._Xtr)28 29 def get_kernel(self, Xval):30 return np.dot(Xval, self._Xtr.T)31 32class RBF(Kernels):33 def __init__(self, X, Y, bw=0, deg=2): #use different default degree value for RBF kernel, it means degree of norm, usually set to 2.34 super().__init__(X,Y,bw,deg)35 self.Ktr = self.get_kernel(self._Xtr)36 37 def get_kernel(self, Xval):38 K = []39 for x in Xval:40 row = np.exp( -np.power(np.linalg.norm(x - self._Xtr, 2, axis=-1), self._deg) / self._bw )41 row = np.atleast_2d(row)42 K.append(row)43 Kpred = np.concatenate(K)44 return Kpred45 46 47 48 49 50 51 ...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!