Best Python code snippet using sure_python
timeout_test.py
Source:timeout_test.py
1# Copyright 2020 The gRPC Authors2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14"""Tests behavior of the timeout mechanism on client side."""15import asyncio16import datetime17import logging18import platform19import random20import unittest21import grpc22from grpc.experimental import aio23from tests_aio.unit import _common24from tests_aio.unit._test_base import AioTestBase25_SLEEP_TIME_UNIT_S = datetime.timedelta(seconds=1).total_seconds()26_TEST_SLEEPY_UNARY_UNARY = '/test/Test/SleepyUnaryUnary'27_TEST_SLEEPY_UNARY_STREAM = '/test/Test/SleepyUnaryStream'28_TEST_SLEEPY_STREAM_UNARY = '/test/Test/SleepyStreamUnary'29_TEST_SLEEPY_STREAM_STREAM = '/test/Test/SleepyStreamStream'30_REQUEST = b'\x00\x00\x00'31_RESPONSE = b'\x01\x01\x01'32async def _test_sleepy_unary_unary(unused_request, unused_context):33 await asyncio.sleep(_SLEEP_TIME_UNIT_S)34 return _RESPONSE35async def _test_sleepy_unary_stream(unused_request, unused_context):36 yield _RESPONSE37 await asyncio.sleep(_SLEEP_TIME_UNIT_S)38 yield _RESPONSE39async def _test_sleepy_stream_unary(unused_request_iterator, context):40 assert _REQUEST == await context.read()41 await asyncio.sleep(_SLEEP_TIME_UNIT_S)42 assert _REQUEST == await context.read()43 return _RESPONSE44async def _test_sleepy_stream_stream(unused_request_iterator, context):45 assert _REQUEST == await context.read()46 await asyncio.sleep(_SLEEP_TIME_UNIT_S)47 await context.write(_RESPONSE)48_ROUTING_TABLE = {49 _TEST_SLEEPY_UNARY_UNARY:50 grpc.unary_unary_rpc_method_handler(_test_sleepy_unary_unary),51 _TEST_SLEEPY_UNARY_STREAM:52 grpc.unary_stream_rpc_method_handler(_test_sleepy_unary_stream),53 _TEST_SLEEPY_STREAM_UNARY:54 grpc.stream_unary_rpc_method_handler(_test_sleepy_stream_unary),55 _TEST_SLEEPY_STREAM_STREAM:56 grpc.stream_stream_rpc_method_handler(_test_sleepy_stream_stream)57}58class _GenericHandler(grpc.GenericRpcHandler):59 def service(self, handler_call_details):60 return _ROUTING_TABLE.get(handler_call_details.method)61async def _start_test_server():62 server = aio.server()63 port = server.add_insecure_port('[::]:0')64 server.add_generic_rpc_handlers((_GenericHandler(),))65 await server.start()66 return f'localhost:{port}', server67class TestTimeout(AioTestBase):68 async def setUp(self):69 address, self._server = await _start_test_server()70 self._client = aio.insecure_channel(address)71 self.assertEqual(grpc.ChannelConnectivity.IDLE,72 self._client.get_state(True))73 await _common.block_until_certain_state(self._client,74 grpc.ChannelConnectivity.READY)75 async def tearDown(self):76 await self._client.close()77 await self._server.stop(None)78 async def test_unary_unary_success_with_timeout(self):79 multicallable = self._client.unary_unary(_TEST_SLEEPY_UNARY_UNARY)80 call = multicallable(_REQUEST, timeout=2 * _SLEEP_TIME_UNIT_S)81 self.assertEqual(_RESPONSE, await call)82 self.assertEqual(grpc.StatusCode.OK, await call.code())83 async def test_unary_unary_deadline_exceeded(self):84 multicallable = self._client.unary_unary(_TEST_SLEEPY_UNARY_UNARY)85 call = multicallable(_REQUEST, timeout=0.5 * _SLEEP_TIME_UNIT_S)86 with self.assertRaises(aio.AioRpcError) as exception_context:87 await call88 rpc_error = exception_context.exception89 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code())90 async def test_unary_stream_success_with_timeout(self):91 multicallable = self._client.unary_stream(_TEST_SLEEPY_UNARY_STREAM)92 call = multicallable(_REQUEST, timeout=2 * _SLEEP_TIME_UNIT_S)93 self.assertEqual(_RESPONSE, await call.read())94 self.assertEqual(_RESPONSE, await call.read())95 self.assertEqual(grpc.StatusCode.OK, await call.code())96 async def test_unary_stream_deadline_exceeded(self):97 multicallable = self._client.unary_stream(_TEST_SLEEPY_UNARY_STREAM)98 call = multicallable(_REQUEST, timeout=0.5 * _SLEEP_TIME_UNIT_S)99 self.assertEqual(_RESPONSE, await call.read())100 with self.assertRaises(aio.AioRpcError) as exception_context:101 await call.read()102 rpc_error = exception_context.exception103 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code())104 async def test_stream_unary_success_with_timeout(self):105 multicallable = self._client.stream_unary(_TEST_SLEEPY_STREAM_UNARY)106 call = multicallable(timeout=2 * _SLEEP_TIME_UNIT_S)107 await call.write(_REQUEST)108 await call.write(_REQUEST)109 self.assertEqual(grpc.StatusCode.OK, await call.code())110 async def test_stream_unary_deadline_exceeded(self):111 multicallable = self._client.stream_unary(_TEST_SLEEPY_STREAM_UNARY)112 call = multicallable(timeout=0.5 * _SLEEP_TIME_UNIT_S)113 with self.assertRaises(aio.AioRpcError) as exception_context:114 await call.write(_REQUEST)115 await call.write(_REQUEST)116 await call117 rpc_error = exception_context.exception118 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code())119 async def test_stream_stream_success_with_timeout(self):120 multicallable = self._client.stream_stream(_TEST_SLEEPY_STREAM_STREAM)121 call = multicallable(timeout=2 * _SLEEP_TIME_UNIT_S)122 await call.write(_REQUEST)123 self.assertEqual(_RESPONSE, await call.read())124 self.assertEqual(grpc.StatusCode.OK, await call.code())125 async def test_stream_stream_deadline_exceeded(self):126 multicallable = self._client.stream_stream(_TEST_SLEEPY_STREAM_STREAM)127 call = multicallable(timeout=0.5 * _SLEEP_TIME_UNIT_S)128 with self.assertRaises(aio.AioRpcError) as exception_context:129 await call.write(_REQUEST)130 await call.read()131 rpc_error = exception_context.exception132 self.assertEqual(grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.code())133if __name__ == '__main__':134 logging.basicConfig(level=logging.DEBUG)...
kamaitachinoyoru.py
Source:kamaitachinoyoru.py
1import time2import random3items = []4def sleepy_print(str):5 time.sleep(2)6 print(str)7def valid_answer():8 while True:9 ans = ""10 ans = input("(Please enter 1 or 2) Your input:")11 if ans == "1" or ans == "2":12 break13 else:14 sleepy_print("I only understand 1 or 2")15 return ans16def switch_hub(select, one, two):17 if select() == "1":18 return one(items)19 else:20 return two(items)21def end(items):22 return23def start_craw(items):24 sleepy_print("Play the craw game?")25 sleepy_print("1. Sure!\n"26 "2. Nope!")27 switch_hub(valid_answer, craw_game, hotel_entrance)28def craw_game(items):29 prize = ""30 prize = random.choice(["Ugly medal", "None"])31 sleepy_print("You inserted a coin to the craw machine")32 sleepy_print("Boop... You just got ")33 sleepy_print(f"{prize}...")34 if prize == "Ugly medal":35 items.append("Ugly medal")36 sleepy_print("You: I guess I just won...?")37 sleepy_print("(Wow... This is ugly...)")38 sleepy_print("You: let's go to the entrance now")39 hotel_entrance(items)40 else:41 sleepy_print("You lost..")42 start_craw(items)43def slope(items):44 items.clear()45 sleepy_print("You came to ski with Mikiko. The snow is shinning.")46 sleepy_print("Mikiko looks tired.")47 sleepy_print("You said to her...")48 sleepy_print(49 "1. We had fun today, let's go to the hotel before it gets dark.\n"50 "2. Let's get back to the lift again for another round!")51 switch_hub(valid_answer, hotel_front, hotel_entrance)52def hotel_front(items):53 sleepy_print("We arrived at the hotel in the middle of the mountain.")54 sleepy_print("There was an old craw game machine making"55 " nostalgic melody outside of the hotel.")56 sleepy_print("Hotel looked empty and I had a bad feeling.")57 sleepy_print("Mikiko: I don't see anyone.")58 sleepy_print("Mikiko: We should check inside of the hotel.")59 sleepy_print("You told her...")60 sleepy_print("1. Wait..Let me play with this craw game first.\n"61 "2. Sure, I will go and check. You should stay here!")62 switch_hub(valid_answer, craw_game, hotel_entrance)63def hotel_entrance(items):64 sleepy_print("...At the hotel entrance.....")65 sleepy_print("It was already dark")66 sleepy_print("(Something is strange....)")67 sleepy_print("(Everyone is ..dead....)")68 sleepy_print("I noticed someone is standing"69 " behind me with a sharp knife and...")70 sleepy_print("coming to stub me! ")71 if "Ugly medal" in items:72 return good_end()73 else:74 return bad_end()75def bad_end():76 sleepy_print("The next moment, I saw Mikiko was"77 " covering her face with bloody hands..")78 sleepy_print("You: Mi..ki..ko..?")79 sleepy_print(".....")80 sleepy_print(".....")81 sleepy_print("The end")82 sleepy_print("Starting a new story?")83 sleepy_print("1. Yes\n"84 "2. No")85 switch_hub(valid_answer, slope, end)86def good_end():87 sleepy_print("Luckily the ugly medal in your chest"88 " pocket stopped the knife.")89 sleepy_print("The next moment, I saw Mikiko kicked him"90 " at his head and he fall down.")91 sleepy_print("He did not move anymore.")92 sleepy_print("Mikiko: We are safe now.")93 sleepy_print(".....")94 sleepy_print(".....")95 sleepy_print("Congratulations! You survived this game.")96 sleepy_print("Starting a new story?")97 sleepy_print("1. Yes\n"98 "2. No")99 switch_hub(valid_answer, slope, end)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!