Best Python code snippet using tavern
MTPClient.py
Source: MTPClient.py 
...242                        )243                    self.stack.append(s)244                break245246    def _await_response(247        self,248        expected: str,249        exception: str = "Server responded inadequately",250        optional: bool = False,251    ) -> Union[str, List[str]]:252        """253        Looks up the stack for expected response the protocol, returns data254        if they are expected (\"<>\" notation in expected str)255        """256        if self.stack == []:257            self._recv_into_stack()258        response = self.stack.pop(0)259        data = []260261        """262        if the response should contain any kind of non-static information,263        treat the expected string as a regex and save it to264        """265        if "<>" in expected:266            # check if the response is valid in the first place267            pattern = re.compile("^" + expected.replace("<>", ".+") + "$")268            try:269                assert re.match(pattern, response), exception270            except AssertionError as e:271                if not optional:272                    raise AssertionError(exception) from e273                else:274                    self.stack.insert(0, response)275                    return276277            # find the indexes of placeholders in the expected response278            indexes = [279                substr.start() for substr in re.finditer("<>", expected)280            ]281282            """283            go to placeholder's starting index and check what character should284            be right after the placeholder, extract that index and use it285            as the ending index of the data in response286287            (this may be overkill, but would solve the problem,288            if the protocol expanded and would pass, for example,289            multiple tokens via one server response - not each one290            in a separate netstring)291292            fails if the placeholder is followed by a character that293            also occurs inside the data - however, this would be294            an inconsistency on the side of the protocol, not295            the algorithm296            """297            for i in indexes:298                if len(expected) <= i + 2:299                    data.append(response[i:])300                else:301                    stop_char = expected[i + 2]302                    stop_index = response.index(stop_char, i)303                    data.append(response[i:stop_index])304            return data[0] if len(indexes) == 1 else data305        else:306            try:307                assert response == expected, exception308            except AssertionError as e:309                if not optional:310                    raise AssertionError(exception) from e311                else:312                    self.stack.insert(0, response)313314315class MTPClient:316    """317    Client for the usage of MTP (MTP V:1.0)318    ---------------------------319    GUI    (standalone usage)      -> main()\n320    NO GUI (third-party usage)     -> send_memes(*args)321    """322323    def _build_gui(self) -> None:324        """Builds the application GUI"""325        self.gui = MemeGui(self)326        self.gui._build()327328    def send_meme(329        self,330        ip: str,331        port: int,332        name: str,333        password: str,334        is_nsfw: bool,335        description: str,336        meme: str,337    ) -> None:338        """Sends memes to the target location via MTP"""339        self.ip = ip340        self.port = port341        self.name = name342        self.password = password343        self.is_nsfw = is_nsfw344        self.description = description345        self.meme = meme346347        # handle any exception during runtime + case of tcp connection failure348        try:349            # connect via tcp to the server350            main_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)351            main_socket.connect((self.ip, self.port))352353            # initialize main socket handler - custom interface for socket communication354            main_channel = SocketHandler(main_socket)355356            # 3 main stages of the protocol - init, transport, end357            token_a, data_port = self._initialize_connection(main_channel)358            token_b, data_len = self._transport_data(token_a, data_port)359            self._end_connection(main_channel, token_b, data_len)360            messagebox.showinfo(361                "Success",362                "You've successfully sent a meme to the server at {}:{}!".format(363                    self.ip, self.port364                ),365            )366            return367        except socket.error as e:368            print(str(traceback.format_exc()))369            e_msg = "Could not set up a connection / Connection disrupted"370            print(e_msg)371            self.gui.error(e_msg)372        except Exception as e:373            print(str(traceback.format_exc()))374            main_channel._send(f"E {str(e)}")375            self.gui.error(str(e))376        finally:377            main_socket.close()378        self.gui.error(379            "Your meme wasn't successfully delivered, check whether the host is up and try again later"380        )381382    def _initialize_connection(383        self, channel: SocketHandler384    ) -> Tuple[str, int]:385        """Represents the 1st stage of MTP"""386        # verify if both nodes use the same version of MTP387        channel._send("C MTP V:1.0")388        channel._await_response(389            "S MTP V:1.0",390            "Server did not respond correctly to the version challenge (init stage)",391        )392393        # pass a nickname to send memes under394        channel._send(f"C {self.name}")395396        # recieve token and data channel port397        token = channel._await_response(398            "S <>", "Token was not recieved successfully (init stage)"399        )400        data_port = channel._await_response(401            "S <>", "Data port was not recieved successfully (init stage)"402        )403404        return (token, int(data_port))405406    def _transport_data(self, token: str, data_port: int) -> str:407        """Represents the 2nd stage of MTP"""408        # connect to the data channel409        data_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)410        data_socket.connect((self.ip, data_port))411        data_channel = SocketHandler(data_socket)412413        # greet the server with our nick :)414        data_channel._send(f"C {self.name}")415416        # recieve confirmation token and data type, verify token match417        recv_token = data_channel._await_response(418            "S <>",419            "Couldn't start the data transport due to server's unexpected behaviour  (transport stage)",420        )421        assert (422            recv_token == token423        ), "Pre-data transport token mismatch (transport stage)"424425        # TRANSPORT LOOP426        # capture total length of data sent to server427        data_len_sum = 0428        while True:429            current_data_len = 0430431            # identify target data and check if it can be accessed432            requested_data_type = data_channel._await_response(433                "S REQ:<>",434                "Expected data type was not recieved successfully (transport stage)",435            )436            assert requested_data_type in [437                "meme",438                "description",439                "isNSFW",440                "password",441            ], f"Unknown data type requested: {requested_data_type} - use meme, description, isNSFW or password  (transport stage)"442443            # send requested data444            if requested_data_type == "meme":445                data_channel._send(f"C {self.meme}")446                current_data_len = len(self.meme)447            elif requested_data_type == "description":448                data_channel._send(f"C {self.description}")449                current_data_len = len(self.description)450            elif requested_data_type == "isNSFW":451                val = "true" if self.is_nsfw else "false"452                data_channel._send(f"C {val}")453                current_data_len = len(val)454            else:455                data_channel._send(f"C {self.password}")456                current_data_len = len(self.password)457458            # suppose that data corruption doesn't occur and so if the lengths match - data was transported successfully459            recv_data_len = data_channel._await_response(460                "S ACK:<>",461                "Data length of the sent data was not recieved successfully (transport stage)",462            )463            assert (464                int(recv_data_len) == current_data_len465            ), "Recieved data length doesn't match the sent data length (transport stage)"466            data_len_sum += current_data_len467468            # attempt to recieve a token and end communication on this channel469            token = data_channel._await_response(470                "S END:<>",471                exception="Token was not recieved successfully (transport stage)",472                optional=True,473            )474            if token:475                data_socket.close()476                return (token, data_len_sum)477478    def _end_connection(479        self, main_channel: SocketHandler, token: str, data_len: int480    ) -> None:481        """Represents the 3rd stage of MTP"""482        # check if total length of data recieved by server matches the sent483        recv_data_len = main_channel._await_response(484            "S <>",485            "Total length of data was not recieved successfully (end stage)",486        )487        assert (488            int(recv_data_len) == data_len489        ), "Total length of data recieved by server doesn't match the total length of data sent by client (end stage)"490491        # identify with the token recieved at the end of the data transport492        main_channel._send(f"C {token}")493494        # check for ack of the end of the communication495        main_channel._await_response(496            "S ACK", "Server did not acknowledge the end of communication"497        )498499    def main(self) -> None:500        """Entry point of the MTP client"""501        self._build_gui()502503504if __name__ == "__main__":505    client = MTPClient()
...test_client.py
Source: test_client.py 
...25        (b'raw request', b'raw request', b'', b''),26        ('request data', b'request data', b'Test. \xd0\xa2\xd0\xb5\xd1\x81\xd1\x82', 'Test. ТеÑÑ'),27        (b'raw request', b'raw request', b'Raw message \xff', b'Raw message \xff'),28    ])29    async def test_await_response(self, connection, data, sent_payload, received_payload, expected_result):30        """Check that data returned by await_response is parsed and returned correctly."""31        rpc = cabbage.AsyncAmqpRpc(connection=connection)32        await rpc.connect()33        with patch('cabbage.amqp.AsyncAmqpRpc._await_response', return_value=received_payload, autospec=True), \34                patch('cabbage.amqp.uuid.uuid4', return_value=RESPONSE_CORR_ID):35            result = await rpc.send_rpc(destination=TEST_DESTINATION, data=data, await_response=True)36        assert result == expected_result37        rpc.channel.basic_publish.assert_called_once_with(38            exchange_name='', routing_key=TEST_DESTINATION, payload=sent_payload,39            properties={'reply_to': RANDOM_QUEUE, 'correlation_id': RESPONSE_CORR_ID})40    @pytest.mark.parametrize('correlation_id, expected', [41        ('my-correlation-id', 'my-correlation-id'),42        (None, RESPONSE_CORR_ID),  # generated by the library43    ])44    async def test_correlation_id(self, connection, correlation_id, expected):45        """Check that correlation id either matches the custom id or is generated from uuid."""46        rpc = cabbage.AsyncAmqpRpc(connection=connection)47        await rpc.connect()48        with patch('cabbage.amqp.AsyncAmqpRpc._await_response', return_value=b'response', autospec=True), \49                patch('cabbage.amqp.uuid.uuid4', return_value=RESPONSE_CORR_ID):50            await rpc.send_rpc(destination=TEST_DESTINATION, data='payload', await_response=True,51                               correlation_id=correlation_id)52        rpc.channel.basic_publish.assert_called_once_with(53            exchange_name='', routing_key=TEST_DESTINATION, payload=b'payload',54            properties={'reply_to': RANDOM_QUEUE, 'correlation_id': expected})55class TestAwaitResponse:56    """AsyncAmqpRpc._await_response"""57    async def test_ok(self, rpc):58        # schedule awaiting response in another Task59        task = asyncio.ensure_future(rpc._await_response(correlation_id=RESPONSE_CORR_ID, timeout=10.0))60        # but it's not executing yet61        assert set(rpc._responses.keys()) == set()62        # let it run for a bit63        await asyncio.sleep(0)64        # check that it created a Future65        assert set(rpc._responses.keys()) == {RESPONSE_CORR_ID}66        # set Future result67        rpc._responses[RESPONSE_CORR_ID].set_result('task result')68        # let the Task run to completion69        await task70        assert task.done()71        assert task.result() == 'task result'72        # check that it cleaned up rpc._responses73        assert set(rpc._responses.keys()) == set()74    async def test_timeout(self, rpc):75        with pytest.raises(cabbage.ServiceUnavailableError):76            await rpc._await_response(correlation_id=RESPONSE_CORR_ID, timeout=0)77        assert set(rpc._responses.keys()) == set()78class TestOnResponse:79    """AsyncAmqpRpc._on_response: aioamqp handler for messages in callback queue."""80    @pytest.mark.parametrize('body', [b'', b'Test message body. \xd0\xa2\xd0\xb5\xd1\x81\xd1\x82'])81    async def test_ok(self, rpc, body):82        rpc._responses[RESPONSE_CORR_ID] = asyncio.Future()83        await rpc._on_response(channel=rpc.channel, body=body, envelope=MockEnvelope(), properties=MockProperties())84        assert rpc._responses[RESPONSE_CORR_ID].done()85        assert rpc._responses[RESPONSE_CORR_ID].result() == body86        rpc.channel.basic_client_ack.assert_called_once_with(delivery_tag=DELIVERY_TAG)87        rpc.channel.basic_client_nack.assert_not_called()88    async def test_unexpected_tag(self, rpc):89        await rpc._on_response(channel=rpc.channel, body=b'resp', envelope=MockEnvelope(), properties=MockProperties())90        rpc.channel.basic_client_ack.assert_called_once_with(delivery_tag=DELIVERY_TAG)tests.py
Source: tests.py 
...73            Base.metadata.drop_all(self.engine)74    # Utilities ------------------------------------------------------------------------75    def _send_message(self, message):76        self.app.send_message(self.bot, message)77    def _await_response(self):78        while self.response == Empty or self.response == self.last_response:79            pass80        self.last_response = self.response81    def _assertResponseContains(self, *items):82        return all(item in self.response.text for item in items)83    def _assertResponseEquals(self, item):84        return self.response.text == item85    def _assertKeyboardContains(self, *messages):86        return all(87            [message] in self.response.reply_markup["keyboard"] for message in messages88        )89    # Tests ----------------------------------------------------------------------------90    def test_welcome(self):91        self._send_message("/start")92        self._await_response()93        self.assertEqual(self.response.text, CONFIG["MESSAGES"]["WELCOME"])94    def test_address(self, address="New York"):95        self.address = address96        self._send_message(address)97        self._await_response()98        self._assertResponseContains("\u00B0C", self.address)99        self._assertKeyboardContains(100            CONFIG["KEYBOARD"]["SET_DEFAULT"], CONFIG["KEYBOARD"]["SET_DEFAULT"]101        )102    def test_address_wrong(self):103        with self.subTest("narrow"):104            self._send_message("Times Square")105            self._await_response()106            self._assertResponseContains("too general or too narrow")107        with self.subTest("wide"):108            self._send_message("USA")109            self._await_response()110            self._assertResponseContains("too general or too narrow")111    def test_repeat(self):112        self.test_address()113        self._send_message(CONFIG["KEYBOARD"]["REPEAT"])114        self._await_response()115        self._assertResponseContains("\u00B0C", self.address)116    def test_set_default(self, address="Moscow"):117        self.default_address = address118        self.test_address(self.default_address)119        self._send_message(CONFIG["KEYBOARD"]["SET_DEFAULT"])120        self._await_response()121        self._assertResponseEquals(CONFIG["MESSAGES"]["SET_DEFAULT_CONF"])122    def test_get_default(self):123        self.test_set_default()124        self.test_address()125        self._send_message(CONFIG["KEYBOARD"]["GET_DEFAULT"])126        self._await_response()127        self._assertResponseContains("\u00B0C", self.default_address)128if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
