Best Python code snippet using pytest-asyncio_python
test_protocol.py
Source:test_protocol.py
1"""2 Copyright 2018 Inmanta3 Licensed under the Apache License, Version 2.0 (the "License");4 you may not use this file except in compliance with the License.5 You may obtain a copy of the License at6 http://www.apache.org/licenses/LICENSE-2.07 Unless required by applicable law or agreed to in writing, software8 distributed under the License is distributed on an "AS IS" BASIS,9 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.10 See the License for the specific language governing permissions and11 limitations under the License.12 Contact: code@inmanta.com13"""14import asyncio15import base6416import datetime17import json18import os19import random20import threading21import time22import urllib.parse23import uuid24from enum import Enum25from itertools import chain26from typing import Any, Dict, Iterator, List, Optional, Union27import pydantic28import pytest29import tornado30from pydantic.types import StrictBool31from tornado import gen, web32from tornado.httpclient import AsyncHTTPClient, HTTPRequest33from tornado.httputil import url_concat34from tornado.platform.asyncio import AnyThreadEventLoopPolicy35from inmanta import config, const, protocol36from inmanta.const import ClientType37from inmanta.data.model import BaseModel38from inmanta.protocol import VersionMatch, exceptions, json_encode39from inmanta.protocol.common import (40 HTML_CONTENT,41 HTML_CONTENT_WITH_UTF8_CHARSET,42 OCTET_STREAM_CONTENT,43 ZIP_CONTENT,44 ArgOption,45 InvalidMethodDefinition,46 InvalidPathException,47 MethodProperties,48 Result,49 ReturnValue,50)51from inmanta.protocol.methods import ENV_OPTS52from inmanta.protocol.rest import CallArguments53from inmanta.protocol.return_value_meta import ReturnValueWithMeta54from inmanta.server import config as opt55from inmanta.server.config import server_bind_port56from inmanta.server.protocol import Server, ServerSlice57from inmanta.types import Apireturn58from inmanta.util import hash_file59from utils import configure60def make_random_file(size=0):61 """62 Generate a random file.63 :param size: If size is > 0 content is generated that is equal or more than size.64 """65 randomvalue = str(random.randint(0, 10000))66 if size > 0:67 while len(randomvalue) < size:68 randomvalue += randomvalue69 content = ("Hello world %s\n" % (randomvalue)).encode()70 hash = hash_file(content)71 body = base64.b64encode(content).decode("ascii")72 return (hash, content, body)73async def test_client_files(client):74 (hash, content, body) = make_random_file()75 # Check if the file exists76 result = await client.stat_file(id=hash)77 assert result.code == 40478 # Create the file79 result = await client.upload_file(id=hash, content=body)80 assert result.code == 20081 # Get the file82 result = await client.get_file(id=hash)83 assert result.code == 20084 assert "content" in result.result85 assert result.result["content"] == body86async def test_client_files_lost(client):87 (hash, content, body) = make_random_file()88 # Get the file89 result = await client.get_file(id=hash)90 assert result.code == 40491async def test_sync_client_files(client):92 # work around for https://github.com/pytest-dev/pytest-asyncio/issues/16893 asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())94 done = []95 limit = 10096 sleep = 0.0197 def do_test():98 sync_client = protocol.SyncClient("client")99 (hash, content, body) = make_random_file()100 # Check if the file exists101 result = sync_client.stat_file(id=hash)102 assert result.code == 404103 # Create the file104 result = sync_client.upload_file(id=hash, content=body)105 assert result.code == 200106 # Get the file107 result = sync_client.get_file(id=hash)108 assert result.code == 200109 assert "content" in result.result110 assert result.result["content"] == body111 done.append(True)112 thread = threading.Thread(target=do_test)113 thread.start()114 while len(done) == 0 and limit > 0:115 await gen.sleep(sleep)116 limit -= 1117 thread.join()118 assert len(done) > 0119async def test_client_files_stat(client):120 file_names = []121 i = 0122 while i < 10:123 (hash, content, body) = make_random_file()124 if hash not in file_names:125 file_names.append(hash)126 result = await client.upload_file(id=hash, content=body)127 assert result.code == 200128 i += 1129 result = await client.stat_files(files=file_names)130 assert len(result.result["files"]) == 0131 other_files = ["testtest"]132 result = await client.stat_files(files=file_names + other_files)133 assert len(result.result["files"]) == len(other_files)134async def test_diff(client):135 ca = "Hello world\n".encode()136 ha = hash_file(ca)137 result = await client.upload_file(id=ha, content=base64.b64encode(ca).decode("ascii"))138 assert result.code == 200139 cb = "Bye bye world\n".encode()140 hb = hash_file(cb)141 result = await client.upload_file(id=hb, content=base64.b64encode(cb).decode("ascii"))142 assert result.code == 200143 diff = await client.diff(ha, hb)144 assert diff.code == 200145 assert len(diff.result["diff"]) == 5146 diff = await client.diff(0, hb)147 assert diff.code == 200148 assert len(diff.result["diff"]) == 4149 diff = await client.diff(ha, 0)150 assert diff.code == 200151 assert len(diff.result["diff"]) == 4152async def test_client_files_bad(server, client):153 (hash, content, body) = make_random_file()154 # Create the file155 result = await client.upload_file(id=hash + "a", content=body)156 assert result.code == 400157async def test_client_files_corrupt(client):158 (hash, content, body) = make_random_file()159 # Create the file160 result = await client.upload_file(id=hash, content=body)161 assert result.code == 200162 state_dir = opt.state_dir.get()163 file_dir = os.path.join(state_dir, "server", "files")164 file_name = os.path.join(file_dir, hash)165 with open(file_name, "wb+") as fd:166 fd.write("Haha!".encode())167 opt.server_delete_currupt_files.set("false")168 result = await client.get_file(id=hash)169 assert result.code == 500170 result = await client.upload_file(id=hash, content=body)171 assert result.code == 500172 opt.server_delete_currupt_files.set("true")173 result = await client.get_file(id=hash)174 assert result.code == 500175 result = await client.upload_file(id=hash, content=body)176 assert result.code == 200177async def test_gzip_encoding(server):178 """179 Test if the server accepts gzipped encoding and returns gzipped encoding.180 """181 (hash, content, body) = make_random_file(size=1024)182 port = opt.get_bind_port()183 url = "http://localhost:%s/api/v1/file/%s" % (port, hash)184 zipped, body = protocol.gzipped_json({"content": body})185 assert zipped186 request = HTTPRequest(187 url=url,188 method="PUT",189 headers={"Accept-Encoding": "gzip", "Content-Encoding": "gzip"},190 body=body,191 decompress_response=True,192 )193 client = AsyncHTTPClient()194 response = await client.fetch(request)195 assert response.code == 200196 request = HTTPRequest(url=url, method="GET", headers={"Accept-Encoding": "gzip"}, decompress_response=True)197 client = AsyncHTTPClient()198 response = await client.fetch(request)199 assert response.code == 200200 assert response.headers["X-Consumed-Content-Encoding"] == "gzip"201class MainHandler(web.RequestHandler):202 def get(self):203 time.sleep(1.1)204@pytest.fixture(scope="function")205async def app(unused_tcp_port):206 http_app = web.Application([(r"/api/v1/file/abc", MainHandler)])207 server = tornado.httpserver.HTTPServer(http_app)208 server.bind(unused_tcp_port)209 server.start()210 yield server211 server.stop()212 await server.close_all_connections()213async def test_timeout_error(app):214 """215 Test test verifies that the protocol client can handle requests that timeout. This means it receives a http error216 status that is not generated by the server but by the client.217 """218 from inmanta.config import Config219 Config.load_config()220 port = str(list(app._sockets.values())[0].getsockname()[1])221 Config.set("client_rest_transport", "port", port)222 Config.set("client_rest_transport", "request_timeout", "1")223 from inmanta import protocol224 client = protocol.Client("client")225 x = await client.get_file(id="abc")226 assert x.code == 599227 assert "message" in x.result228async def test_method_properties():229 """230 Test method properties decorator and helper functions231 """232 @protocol.method(path="/test", operation="PUT", client_types=["api"], api_prefix="x", api_version=2)233 def test_method(name):234 """235 Create a new project236 """237 props = protocol.common.MethodProperties.methods["test_method"][0]238 assert "Authorization" in props.get_call_headers()239 assert props.get_listen_url() == "/x/v2/test"240 assert props.get_call_url({}) == "/x/v2/test"241async def test_invalid_client_type():242 """243 Test invalid client ype244 """245 with pytest.raises(InvalidMethodDefinition) as e:246 @protocol.method(path="/test", operation="PUT", client_types=["invalid"])247 def test_method(name):248 """249 Create a new project250 """251 assert "Invalid client type invalid specified for function" in str(e)252async def test_call_arguments_defaults():253 """254 Test processing RPC messages255 """256 @protocol.method(path="/test", operation="PUT", client_types=["api"])257 def test_method(name: str, value: int = 10):258 """259 Create a new project260 """261 call = CallArguments(protocol.common.MethodProperties.methods["test_method"][0], {"name": "test"}, {})262 await call.process()263 assert call.call_args["name"] == "test"264 assert call.call_args["value"] == 10265def test_create_client():266 with pytest.raises(AssertionError):267 protocol.SyncClient("agent", "120")268 with pytest.raises(AssertionError):269 protocol.Client("agent", "120")270async def test_pydantic():271 """272 Test validating pydantic objects273 """274 class Project(BaseModel):275 id: uuid.UUID276 name: str277 @protocol.method(path="/test", operation="PUT", client_types=["api"])278 def test_method(project: Project):279 """280 Create a new project281 """282 id = uuid.uuid4()283 call = CallArguments(284 protocol.common.MethodProperties.methods["test_method"][0], {"project": {"name": "test", "id": str(id)}}, {}285 )286 await call.process()287 project = call.call_args["project"]288 assert project.name == "test"289 assert project.id == id290 with pytest.raises(exceptions.BadRequest):291 call = CallArguments(292 protocol.common.MethodProperties.methods["test_method"][0], {"project": {"name": "test", "id": "abcd"}}, {}293 )294 await call.process()295def test_pydantic_json():296 """297 Test running pydanyic objects through the json encoder298 """299 class Options(str, Enum):300 yes = "yes"301 no = "no"302 class Project(BaseModel):303 id: uuid.UUID304 name: str305 opts: Options306 project = Project(id=uuid.uuid4(), name="test", opts="no")307 assert project.opts == Options.no308 json_string = json_encode(project)309 data = json.loads(json_string)310 assert "id" in data311 assert "name" in data312 assert data["id"] == str(project.id)313 assert data["name"] == "test"314 # Now create the project again315 new = Project(**data)316 assert project == new317 assert project is not new318async def test_pydantic_alias(unused_tcp_port, postgres_db, database_name, async_finalizer):319 """320 Round trip test on aliased object321 """322 configure(unused_tcp_port, database_name, postgres_db.port)323 class Project(BaseModel):324 source: str325 validate_: bool326 class Config:327 fields = {"validate_": {"alias": "validate"}}328 class ProjectServer(ServerSlice):329 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])330 def test_method(project: Project) -> ReturnValue[Project]: # NOQA331 """332 Create a new project333 """334 @protocol.typedmethod(path="/test2", operation="POST", client_types=["api"])335 def test_method2(project: List[Project]) -> ReturnValue[List[Project]]: # NOQA336 """337 Create a new project338 """339 @protocol.handle(test_method)340 async def test_methodi(self, project: Project) -> ReturnValue[Project]:341 new_project = project.copy()342 return ReturnValue(response=new_project)343 @protocol.handle(test_method2)344 async def test_method2i(self, project: List[Project]) -> ReturnValue[List[Project]]:345 return ReturnValue(response=project)346 rs = Server()347 server = ProjectServer(name="projectserver")348 rs.add_slice(server)349 await rs.start()350 async_finalizer.add(server.stop)351 async_finalizer.add(rs.stop)352 client = protocol.Client("client")353 projectt = Project(id=uuid.uuid4(), source="test", validate=True)354 assert projectt.validate_ is True355 projectf = Project(id=uuid.uuid4(), source="test", validate=False)356 assert projectf.validate_ is False357 async def roundtrip(obj: Project) -> None:358 data = await client.test_method(obj)359 assert obj.validate_ == data.result["data"]["validate"]360 data = await client.test_method2([obj])361 assert obj.validate_ == data.result["data"][0]["validate"]362 await roundtrip(projectf)363 await roundtrip(projectt)364async def test_return_non_warnings(unused_tcp_port, postgres_db, database_name, async_finalizer):365 """366 Test return none but pushing warnings367 """368 configure(unused_tcp_port, database_name, postgres_db.port)369 class ProjectServer(ServerSlice):370 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])371 def test_method(name: str) -> ReturnValue[None]: # NOQA372 """373 Create a new project374 """375 @protocol.handle(test_method)376 async def test_method_handler(self, name) -> ReturnValue[None]:377 rv = ReturnValue()378 rv.add_warnings(["error1", "error2"])379 return rv380 rs = Server()381 server = ProjectServer(name="projectserver")382 rs.add_slice(server)383 await rs.start()384 async_finalizer.add(server.stop)385 async_finalizer.add(rs.stop)386 client = protocol.Client("client")387 response = await client.test_method("x")388 assert response.code == 200389 assert "data" in response.result390 assert response.result["data"] is None391 assert "metadata" in response.result392 assert "warnings" in response.result["metadata"]393 assert "error1" in response.result["metadata"]["warnings"]394async def test_invalid_handler():395 """396 Handlers should be async397 """398 with pytest.raises(ValueError):399 class ProjectServer(ServerSlice):400 @protocol.method(path="/test", operation="POST", client_types=["api"])401 def test_method(self):402 """403 Create a new project404 """405 @protocol.handle(test_method)406 def test_method(self):407 return408async def test_return_value(unused_tcp_port, postgres_db, database_name, async_finalizer):409 """410 Test the use and validation of methods that use common.ReturnValue411 """412 configure(unused_tcp_port, database_name, postgres_db.port)413 class Project(BaseModel):414 id: uuid.UUID415 name: str416 class ProjectServer(ServerSlice):417 @protocol.method(path="/test", operation="POST", client_types=["api"])418 def test_method(project: Project) -> ReturnValue[Project]: # NOQA419 """420 Create a new project421 """422 @protocol.handle(test_method)423 async def test_method(self, project: Project) -> ReturnValue[Project]:424 new_project = project.copy()425 return ReturnValue(response=new_project)426 rs = Server()427 server = ProjectServer(name="projectserver")428 rs.add_slice(server)429 await rs.start()430 async_finalizer.add(server.stop)431 async_finalizer.add(rs.stop)432 client = protocol.Client("client")433 result = await client.test_method({"name": "test", "id": str(uuid.uuid4())})434 assert result.code == 200435 assert "id" in result.result436 assert "name" in result.result437async def test_return_model(unused_tcp_port, postgres_db, database_name, async_finalizer):438 """439 Test the use and validation of methods that use common.ReturnValue440 """441 configure(unused_tcp_port, database_name, postgres_db.port)442 class Project(BaseModel):443 id: uuid.UUID444 name: str445 class ProjectServer(ServerSlice):446 @protocol.method(path="/test", operation="POST", client_types=["api"])447 def test_method(project: Project) -> Project: # NOQA448 """449 Create a new project450 """451 @protocol.method(path="/test2", operation="POST", client_types=["api"])452 def test_method2(project: Project) -> None: # NOQA453 pass454 @protocol.method(path="/test3", operation="POST", client_types=["api"])455 def test_method3(project: Project) -> None: # NOQA456 pass457 @protocol.handle(test_method)458 async def test_method(self, project: Project) -> Project:459 new_project = project.copy()460 return new_project461 @protocol.handle(test_method2)462 async def test_method2(self, project: Project) -> None:463 pass464 @protocol.handle(test_method3)465 async def test_method3(self, project: Project) -> None:466 return 1467 rs = Server()468 server = ProjectServer(name="projectserver")469 rs.add_slice(server)470 await rs.start()471 async_finalizer.add(server.stop)472 async_finalizer.add(rs.stop)473 client = protocol.Client("client")474 result = await client.test_method({"name": "test", "id": str(uuid.uuid4())})475 assert result.code == 200476 assert "id" in result.result477 assert "name" in result.result478 result = await client.test_method2({"name": "test", "id": str(uuid.uuid4())})479 assert result.code == 200480 result = await client.test_method3({"name": "test", "id": str(uuid.uuid4())})481 assert result.code == 500482async def test_data_envelope(unused_tcp_port, postgres_db, database_name, async_finalizer):483 """484 Test the use and validation of methods that use common.ReturnValue485 """486 configure(unused_tcp_port, database_name, postgres_db.port)487 class Project(BaseModel):488 id: uuid.UUID489 name: str490 class ProjectServer(ServerSlice):491 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])492 def test_method(project: Project) -> ReturnValue[Project]: # NOQA493 pass494 @protocol.handle(test_method)495 async def test_method(self, project: Project) -> ReturnValue[Project]:496 new_project = project.copy()497 return ReturnValue(response=new_project)498 @protocol.typedmethod(path="/test2", operation="POST", client_types=["api"], envelope_key="method")499 def test_method2(project: Project) -> ReturnValue[Project]: # NOQA500 pass501 @protocol.handle(test_method2)502 async def test_method2(self, project: Project) -> ReturnValue[Project]:503 new_project = project.copy()504 return ReturnValue(response=new_project)505 @protocol.method(path="/test3", operation="POST", client_types=["api"], envelope=True)506 def test_method3(project: Project): # NOQA507 pass508 @protocol.handle(test_method3)509 async def test_method3(self, project: dict) -> Apireturn:510 return 200, {"id": 1, "name": 2}511 @protocol.method(path="/test4", operation="POST", client_types=["api"], envelope=True, envelope_key="project")512 def test_method4(project: Project): # NOQA513 pass514 @protocol.handle(test_method4)515 async def test_method4(self, project: dict) -> Apireturn:516 return 200, {"id": 1, "name": 2}517 rs = Server()518 server = ProjectServer(name="projectserver")519 rs.add_slice(server)520 await rs.start()521 async_finalizer.add(server.stop)522 async_finalizer.add(rs.stop)523 client = protocol.Client("client")524 # 1525 result = await client.test_method({"name": "test", "id": str(uuid.uuid4())})526 assert result.code == 200527 assert "data" in result.result528 assert "id" in result.result["data"]529 assert "name" in result.result["data"]530 # 2531 result = await client.test_method2({"name": "test", "id": str(uuid.uuid4())})532 assert result.code == 200533 assert "method" in result.result534 assert "id" in result.result["method"]535 assert "name" in result.result["method"]536 # 3537 result = await client.test_method3({"name": "test", "id": str(uuid.uuid4())})538 assert result.code == 200539 assert "data" in result.result540 assert "id" in result.result["data"]541 assert "name" in result.result["data"]542 # 4543 result = await client.test_method4({"name": "test", "id": str(uuid.uuid4())})544 assert result.code == 200545 assert "project" in result.result546 assert "id" in result.result["project"]547 assert "name" in result.result["project"]548async def test_invalid_paths():549 """550 Test path validation551 """552 with pytest.raises(InvalidPathException) as e:553 @protocol.method(path="test", operation="PUT", client_types=["api"], api_prefix="x", api_version=2)554 def test_method(name):555 pass556 assert "test should start with a /" == str(e.value)557 with pytest.raises(InvalidPathException) as e:558 @protocol.method(path="/test/<othername>", operation="PUT", client_types=["api"], api_prefix="x", api_version=2)559 def test_method2(name):560 pass561 assert str(e.value).startswith("Variable othername in path /test/<othername> is not defined in function")562async def test_nested_paths(unused_tcp_port, postgres_db, database_name, async_finalizer):563 """Test overlapping path definition"""564 configure(unused_tcp_port, database_name, postgres_db.port)565 class Project(BaseModel):566 name: str567 class ProjectServer(ServerSlice):568 @protocol.typedmethod(path="/test/<data>", operation="GET", client_types=["api"])569 def test_method(data: str) -> Project: # NOQA570 pass571 @protocol.typedmethod(path="/test/<data>/config", operation="GET", client_types=["api"])572 def test_method2(data: str) -> Project: # NOQA573 pass574 @protocol.handle(test_method)575 async def test_method(self, data: str) -> Project:576 # verify that URL encoded data is properly decoded577 assert "%20" not in data578 return Project(name="test_method")579 @protocol.handle(test_method2)580 async def test_method2(self, data: str) -> Project:581 return Project(name="test_method2")582 rs = Server()583 server = ProjectServer(name="projectserver")584 rs.add_slice(server)585 await rs.start()586 async_finalizer.add(server.stop)587 async_finalizer.add(rs.stop)588 client = protocol.Client("client")589 result = await client.test_method({"data": "test "})590 assert result.code == 200591 assert "test_method" == result.result["data"]["name"]592 client = protocol.Client("client")593 result = await client.test_method2({"data": "test"})594 assert result.code == 200595 assert "test_method2" == result.result["data"]["name"]596async def test_list_basemodel_argument(unused_tcp_port, postgres_db, database_name, async_finalizer):597 """Test list of basemodel arguments and primitive types"""598 configure(unused_tcp_port, database_name, postgres_db.port)599 class Project(BaseModel):600 name: str601 class ProjectServer(ServerSlice):602 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])603 def test_method(data: List[Project], data2: List[int]) -> Project: # NOQA604 pass605 @protocol.handle(test_method)606 async def test_method(self, data: List[Project], data2: List[int]) -> Project:607 assert len(data) == 1608 assert data[0].name == "test"609 assert len(data2) == 3610 return Project(name="test_method")611 rs = Server()612 server = ProjectServer(name="projectserver")613 rs.add_slice(server)614 await rs.start()615 async_finalizer.add(server.stop)616 async_finalizer.add(rs.stop)617 client = protocol.Client("client")618 result = await client.test_method(data=[{"name": "test"}], data2=[1, 2, 3])619 assert result.code == 200620 assert "test_method" == result.result["data"]["name"]621async def test_dict_basemodel_argument(unused_tcp_port, postgres_db, database_name, async_finalizer):622 """Test dict of basemodel arguments and primitive types"""623 configure(unused_tcp_port, database_name, postgres_db.port)624 class Project(BaseModel):625 name: str626 class ProjectServer(ServerSlice):627 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])628 def test_method(data: Dict[str, Project], data2: Dict[str, int]) -> Project: # NOQA629 pass630 @protocol.handle(test_method)631 async def test_method(self, data: Dict[str, Project], data2: Dict[str, int]) -> Project:632 assert len(data) == 1633 assert data["projectA"].name == "test"634 assert len(data2) == 3635 return Project(name="test_method")636 rs = Server()637 server = ProjectServer(name="projectserver")638 rs.add_slice(server)639 await rs.start()640 async_finalizer.add(server.stop)641 async_finalizer.add(rs.stop)642 client = protocol.Client("client")643 result = await client.test_method(data={"projectA": {"name": "test"}}, data2={"1": 1, "2": 2, "3": 3})644 assert result.code == 200645 assert "test_method" == result.result["data"]["name"]646async def test_dict_with_optional_values(unused_tcp_port, postgres_db, database_name, async_finalizer):647 """Test dict which may have None as a value"""648 configure(unused_tcp_port, database_name, postgres_db.port)649 types = Union[pydantic.StrictInt, pydantic.StrictStr]650 class Result(BaseModel):651 val: Optional[types]652 class ProjectServer(ServerSlice):653 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])654 def test_method(data: Dict[str, Optional[types]]) -> Result: # NOQA655 pass656 @protocol.handle(test_method)657 async def test_method(self, data: Dict[str, Optional[types]]) -> Result:658 assert len(data) == 1659 assert "test" in data660 return Result(val=data["test"])661 @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])662 def test_method2(data: Optional[str] = None) -> None: # NOQA663 pass664 @protocol.handle(test_method2)665 async def test_method2(self, data: Optional[str] = None) -> None:666 assert data is None667 rs = Server()668 server = ProjectServer(name="projectserver")669 rs.add_slice(server)670 await rs.start()671 async_finalizer.add(server.stop)672 async_finalizer.add(rs.stop)673 client = protocol.Client("client")674 result = await client.test_method(data={"test": None})675 assert result.code == 200676 assert result.result["data"]["val"] is None677 result = await client.test_method(data={"test": 5})678 assert result.code == 200679 assert result.result["data"]["val"] == 5680 result = await client.test_method(data={"test": "test123"})681 assert result.code == 200682 assert result.result["data"]["val"] == "test123"683 result = await client.test_method2()684 assert result.code == 200685 result = await client.test_method2(data=None)686 assert result.code == 200687async def test_dict_and_list_return(unused_tcp_port, postgres_db, database_name, async_finalizer):688 """Test list of basemodel arguments"""689 configure(unused_tcp_port, database_name, postgres_db.port)690 class Project(BaseModel):691 name: str692 class ProjectServer(ServerSlice):693 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])694 def test_method(data: Project) -> List[Project]: # NOQA695 pass696 @protocol.handle(test_method)697 async def test_method(self, data: Project) -> List[Project]: # NOQA698 return [Project(name="test_method")]699 @protocol.typedmethod(path="/test2", operation="POST", client_types=["api"])700 def test_method2(data: Project) -> List[str]: # NOQA701 pass702 @protocol.handle(test_method2)703 async def test_method2(self, data: Project) -> List[str]: # NOQA704 return ["test_method"]705 rs = Server()706 server = ProjectServer(name="projectserver")707 rs.add_slice(server)708 await rs.start()709 async_finalizer.add(server.stop)710 async_finalizer.add(rs.stop)711 client = protocol.Client("client")712 result = await client.test_method(data={"name": "test"})713 assert result.code == 200714 assert len(result.result["data"]) == 1715 assert "test_method" == result.result["data"][0]["name"]716 result = await client.test_method2(data={"name": "test"})717 assert result.code == 200718 assert len(result.result["data"]) == 1719 assert "test_method" == result.result["data"][0]720async def test_method_definition():721 """722 Test typed methods with wrong annotations723 """724 with pytest.raises(InvalidMethodDefinition) as e:725 @protocol.typedmethod(path="/test", operation="PUT", client_types=["api"])726 def test_method1(name) -> None:727 """728 Create a new project729 """730 assert "has no type annotation." in str(e.value)731 with pytest.raises(InvalidMethodDefinition) as e:732 @protocol.typedmethod(path="/test", operation="PUT", client_types=["api"])733 def test_method2(name: Iterator[str]) -> None:734 """735 Create a new project736 """737 assert "Type typing.Iterator[str] of argument name can only be generic List or Dict" in str(e.value)738 with pytest.raises(InvalidMethodDefinition) as e:739 @protocol.typedmethod(path="/test", operation="PUT", client_types=["api"])740 def test_method3(name: List[object]) -> None:741 """742 Create a new project743 """744 assert (745 "Type object of argument name must be a either BaseModel, Enum, UUID, str, float, int, StrictNonIntBool, datetime, "746 "bytes or a List of these types or a Dict with str keys and values of these types."747 ) in str(e.value)748 with pytest.raises(InvalidMethodDefinition) as e:749 @protocol.typedmethod(path="/test", operation="PUT", client_types=["api"])750 def test_method4(name: Dict[int, str]) -> None:751 """752 Create a new project753 """754 assert "Type typing.Dict[int, str] of argument name must be a Dict with str keys and not int" in str(e.value)755 with pytest.raises(InvalidMethodDefinition) as e:756 @protocol.typedmethod(path="/test", operation="PUT", client_types=["api"])757 def test_method5(name: Dict[str, object]) -> None:758 """759 Create a new project760 """761 assert (762 "Type object of argument name must be a either BaseModel, Enum, UUID, str, float, int, StrictNonIntBool, datetime, "763 "bytes or a List of these types or a Dict with str keys and values of these types."764 ) in str(e.value)765 @protocol.typedmethod(path="/service_types/<service_type>", operation="DELETE", client_types=["api"])766 def lcm_service_type_delete(tid: uuid.UUID, service_type: str) -> None:767 """Delete an existing service type."""768def test_optional():769 @protocol.typedmethod(path="/service_types/<service_type>", operation="DELETE", client_types=["api"])770 def lcm_service_type_delete(tid: uuid.UUID, service_type: str, version: Optional[str] = None) -> None:771 """Delete an existing service type."""772async def test_union_types(unused_tcp_port, postgres_db, database_name, async_finalizer):773 """Test use of union types"""774 configure(unused_tcp_port, database_name, postgres_db.port)775 SimpleTypes = Union[float, int, StrictBool, str] # NOQA776 AttributeTypes = Union[SimpleTypes, List[SimpleTypes], Dict[str, SimpleTypes]] # NOQA777 class ProjectServer(ServerSlice):778 @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])779 def test_method(data: SimpleTypes, version: Optional[int] = None) -> List[SimpleTypes]: # NOQA780 pass781 @protocol.handle(test_method)782 async def test_method(self, data: SimpleTypes, version: Optional[int] = None) -> List[SimpleTypes]: # NOQA783 if isinstance(data, list):784 return data785 return [data]786 @protocol.typedmethod(path="/testp", operation="POST", client_types=["api"])787 def test_methodp(data: AttributeTypes, version: Optional[int] = None) -> List[SimpleTypes]: # NOQA788 pass789 @protocol.handle(test_methodp)790 async def test_methodp(self, data: AttributeTypes, version: Optional[int] = None) -> List[SimpleTypes]: # NOQA791 if isinstance(data, list):792 return data793 return [data]794 rs = Server()795 server = ProjectServer(name="projectserver")796 rs.add_slice(server)797 await rs.start()798 async_finalizer.add(server.stop)799 async_finalizer.add(rs.stop)800 client = protocol.Client("client")801 result = await client.test_methodp(data=[5], version=7)802 assert result.code == 200803 assert len(result.result["data"]) == 1804 assert 5 == result.result["data"][0]805 result = await client.test_method(data=5, version=3)806 assert result.code == 200807 assert len(result.result["data"]) == 1808 assert 5 == result.result["data"][0]809 result = await client.test_method(data=5)810 assert result.code == 200811 assert len(result.result["data"]) == 1812 assert 5 == result.result["data"][0]813 result = await client.test_method(data=5, version=7)814 assert result.code == 200815 assert len(result.result["data"]) == 1816 assert 5 == result.result["data"][0]817async def test_basemodel_validation(unused_tcp_port, postgres_db, database_name, async_finalizer):818 """Test validation of basemodel arguments and return, and how they are reported"""819 configure(unused_tcp_port, database_name, postgres_db.port)820 class Project(BaseModel):821 name: str822 value: str823 class ProjectServer(ServerSlice):824 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])825 def test_method(data: Project) -> Project: # NOQA826 pass827 @protocol.handle(test_method)828 async def test_method(self, data: Project) -> Project: # NOQA829 return Project()830 rs = Server()831 server = ProjectServer(name="projectserver")832 rs.add_slice(server)833 await rs.start()834 async_finalizer.add(server.stop)835 async_finalizer.add(rs.stop)836 client = protocol.Client("client")837 # Check validation of arguments838 result = await client.test_method(data={})839 assert result.code == 400840 assert "error_details" in result.result841 details = result.result["error_details"]["validation_errors"]842 assert len(details) == 2843 name = [d for d in details if d["loc"] == ["data", "name"]][0]844 value = [d for d in details if d["loc"] == ["data", "value"]][0]845 assert name["msg"] == "field required"846 assert value["msg"] == "field required"847 # Check the validation of the return value848 result = await client.test_method(data={"name": "X", "value": "Y"})849 assert result.code == 500850 assert "data validation error" in result.result["message"]851async def test_ACOA_header(server):852 """853 Test if the server accepts gzipped encoding and returns gzipped encoding.854 """855 port = opt.get_bind_port()856 url = f"http://localhost:{port}/api/v1/environment"857 request = HTTPRequest(url=url, method="GET")858 client = AsyncHTTPClient()859 response = await client.fetch(request)860 assert response.code == 200861 assert response.headers.get("Access-Control-Allow-Origin") is None862 config.Config.set("server", "access-control-allow-origin", "*")863 response = await client.fetch(request)864 assert response.code == 200865 assert response.headers.get("Access-Control-Allow-Origin") == "*"866async def test_multi_version_method(unused_tcp_port, postgres_db, database_name, async_finalizer):867 """Test multi version methods"""868 configure(unused_tcp_port, database_name, postgres_db.port)869 class Project(BaseModel):870 name: str871 value: str872 class ProjectServer(ServerSlice):873 @protocol.typedmethod(path="/test2", operation="POST", client_types=["api"], api_version=3)874 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"], api_version=2, envelope_key="data")875 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"], api_version=1, envelope_key="project")876 def test_method(project: Project) -> Project: # NOQA877 pass878 @protocol.handle(test_method)879 async def test_method(self, project: Project) -> Project: # NOQA880 return project881 rs = Server()882 server = ProjectServer(name="projectserver")883 rs.add_slice(server)884 await rs.start()885 async_finalizer.add(server.stop)886 async_finalizer.add(rs.stop)887 # rest call888 port = opt.get_bind_port()889 request = HTTPRequest(890 url=f"http://localhost:{port}/api/v1/test", method="POST", body=json_encode({"project": {"name": "a", "value": "b"}})891 )892 client = AsyncHTTPClient()893 response = await client.fetch(request)894 assert response.code == 200895 body = json.loads(response.body)896 assert "project" in body897 request = HTTPRequest(898 url=f"http://localhost:{port}/api/v2/test", method="POST", body=json_encode({"project": {"name": "a", "value": "b"}})899 )900 client = AsyncHTTPClient()901 response = await client.fetch(request)902 assert response.code == 200903 body = json.loads(response.body)904 assert "data" in body905 request = HTTPRequest(906 url=f"http://localhost:{port}/api/v3/test2", method="POST", body=json_encode({"project": {"name": "a", "value": "b"}})907 )908 client = AsyncHTTPClient()909 response = await client.fetch(request)910 assert response.code == 200911 body = json.loads(response.body)912 assert "data" in body913 # client based calls914 client = protocol.Client("client")915 response = await client.test_method(project=Project(name="a", value="b"))916 assert response.code == 200917 assert "project" in response.result918 client = protocol.Client("client", version_match=VersionMatch.highest)919 response = await client.test_method(project=Project(name="a", value="b"))920 assert response.code == 200921 assert "data" in response.result922 client = protocol.Client("client", version_match=VersionMatch.exact, exact_version=1)923 response = await client.test_method(project=Project(name="a", value="b"))924 assert response.code == 200925 assert "project" in response.result926 client = protocol.Client("client", version_match=VersionMatch.exact, exact_version=2)927 response = await client.test_method(project=Project(name="a", value="b"))928 assert response.code == 200929 assert "data" in response.result930async def test_multi_version_handler(unused_tcp_port, postgres_db, database_name, async_finalizer):931 """Test multi version methods"""932 configure(unused_tcp_port, database_name, postgres_db.port)933 class Project(BaseModel):934 name: str935 value: str936 class ProjectServer(ServerSlice):937 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"], api_version=2, envelope_key="data")938 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"], api_version=1, envelope_key="project")939 def test_method(project: Project) -> Project: # NOQA940 pass941 @protocol.handle(test_method, api_version=1)942 async def test_methodX(self, project: Project) -> Project: # NOQA943 return Project(name="v1", value="1")944 @protocol.handle(test_method, api_version=2)945 async def test_methodY(self, project: Project) -> Project: # NOQA946 return Project(name="v2", value="2")947 rs = Server()948 server = ProjectServer(name="projectserver")949 rs.add_slice(server)950 await rs.start()951 async_finalizer.add(server.stop)952 async_finalizer.add(rs.stop)953 # client based calls954 client = protocol.Client("client")955 response = await client.test_method(project=Project(name="a", value="b"))956 assert response.code == 200957 assert "project" in response.result958 assert response.result["project"]["name"] == "v1"959 client = protocol.Client("client", version_match=VersionMatch.highest)960 response = await client.test_method(project=Project(name="a", value="b"))961 assert response.code == 200962 assert "data" in response.result963 assert response.result["data"]["name"] == "v2"964async def test_simple_return_type(unused_tcp_port, postgres_db, database_name, async_finalizer):965 """Test methods with simple return types"""966 configure(unused_tcp_port, database_name, postgres_db.port)967 class ProjectServer(ServerSlice):968 @protocol.typedmethod(path="/test", operation="POST", client_types=["api"])969 def test_method(project: str) -> str: # NOQA970 pass971 @protocol.handle(test_method)972 async def test_methodY(self, project: str) -> str: # NOQA973 return project974 rs = Server()975 server = ProjectServer(name="projectserver")976 rs.add_slice(server)977 await rs.start()978 async_finalizer.add(server.stop)979 async_finalizer.add(rs.stop)980 # client based calls981 client = protocol.Client("client")982 response = await client.test_method(project="x")983 assert response.code == 200984 assert response.result["data"] == "x"985async def test_html_content_type(unused_tcp_port, postgres_db, database_name, async_finalizer):986 """Test whether API endpoints with a text/html content-type work."""987 configure(unused_tcp_port, database_name, postgres_db.port)988 html_content = "<html><body>test</body></html>"989 @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])990 def test_method() -> ReturnValue[str]: # NOQA991 pass992 class TestServer(ServerSlice):993 @protocol.handle(test_method)994 async def test_methodY(self) -> ReturnValue[str]: # NOQA995 return ReturnValue(response=html_content, content_type=HTML_CONTENT)996 rs = Server()997 server = TestServer(name="testserver")998 rs.add_slice(server)999 await rs.start()1000 async_finalizer.add(server.stop)1001 async_finalizer.add(rs.stop)1002 # client based calls1003 client = protocol.Client("client")1004 response = await client.test_method()1005 assert response.code == 2001006 assert response.result == html_content1007async def test_html_content_type_with_utf8_encoding(unused_tcp_port, postgres_db, database_name, async_finalizer):1008 """Test whether API endpoints with a "text/html; charset=UTF-8" content-type work."""1009 configure(unused_tcp_port, database_name, postgres_db.port)1010 html_content = "<html><body>test</body></html>"1011 @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])1012 def test_method() -> ReturnValue[str]: # NOQA1013 pass1014 class TestServer(ServerSlice):1015 @protocol.handle(test_method)1016 async def test_methodY(self) -> ReturnValue[str]: # NOQA1017 return ReturnValue(response=html_content, content_type=HTML_CONTENT_WITH_UTF8_CHARSET)1018 rs = Server()1019 server = TestServer(name="testserver")1020 rs.add_slice(server)1021 await rs.start()1022 async_finalizer.add(server.stop)1023 async_finalizer.add(rs.stop)1024 # client based calls1025 client = protocol.Client("client")1026 response = await client.test_method()1027 assert response.code == 2001028 assert response.result == html_content1029async def test_octet_stream_content_type(unused_tcp_port, postgres_db, database_name, async_finalizer):1030 """Test whether API endpoints with an application/octet-stream content-type work."""1031 configure(unused_tcp_port, database_name, postgres_db.port)1032 byte_stream = b"test123"1033 @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])1034 def test_method() -> ReturnValue[bytes]: # NOQA1035 pass1036 class TestServer(ServerSlice):1037 @protocol.handle(test_method)1038 async def test_methodY(self) -> ReturnValue[bytes]: # NOQA1039 return ReturnValue(response=byte_stream, content_type=OCTET_STREAM_CONTENT)1040 rs = Server()1041 server = TestServer(name="testserver")1042 rs.add_slice(server)1043 await rs.start()1044 async_finalizer.add(server.stop)1045 async_finalizer.add(rs.stop)1046 # client based calls1047 client = protocol.Client("client")1048 response = await client.test_method()1049 assert response.code == 2001050 assert response.result == byte_stream1051async def test_zip_content_type(unused_tcp_port, postgres_db, database_name, async_finalizer):1052 """Test whether API endpoints with an application/zip content-type work."""1053 configure(unused_tcp_port, database_name, postgres_db.port)1054 zip_content = b"test123"1055 @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])1056 def test_method() -> ReturnValue[bytes]: # NOQA1057 pass1058 class TestServer(ServerSlice):1059 @protocol.handle(test_method)1060 async def test_methodY(self) -> ReturnValue[bytes]: # NOQA1061 return ReturnValue(response=zip_content, content_type=ZIP_CONTENT)1062 rs = Server()1063 server = TestServer(name="testserver")1064 rs.add_slice(server)1065 await rs.start()1066 async_finalizer.add(server.stop)1067 async_finalizer.add(rs.stop)1068 # client based calls1069 client = protocol.Client("client")1070 response = await client.test_method()1071 assert response.code == 2001072 assert response.result == zip_content1073@pytest.fixture1074async def options_server():1075 @protocol.typedmethod(path="/test", operation="OPTIONS", client_types=["api"])1076 def test_method() -> ReturnValue[str]: # NOQA1077 pass1078 class TestServer(ServerSlice):1079 @protocol.handle(test_method)1080 async def test_methodY(self) -> ReturnValue[str]: # NOQA1081 return ReturnValue(response="content")1082 return TestServer(name="testserver")1083@pytest.fixture1084def options_request(unused_tcp_port):1085 return HTTPRequest(1086 url=f"http://localhost:{unused_tcp_port}/api/v1/test",1087 method="OPTIONS",1088 connect_timeout=1.0,1089 request_timeout=1.0,1090 decompress_response=True,1091 )1092@pytest.mark.parametrize("auth_enabled, auth_header_allowed", [(True, True), (False, False)])1093async def test_auth_enabled_options_method(1094 auth_enabled,1095 auth_header_allowed,1096 unused_tcp_port,1097 postgres_db,1098 database_name,1099 async_finalizer,1100 options_server,1101 options_request,1102):1103 configure(unused_tcp_port, database_name, postgres_db.port)1104 config.Config.set("server", "auth", str(auth_enabled))1105 rs = Server()1106 rs.add_slice(options_server)1107 await rs.start()1108 async_finalizer.add(options_server.stop)1109 async_finalizer.add(rs.stop)1110 client = AsyncHTTPClient()1111 response = await client.fetch(options_request)1112 assert response.code == 2001113 assert ("Authorization" in response.headers.get("Access-Control-Allow-Headers")) == auth_header_allowed1114async def test_required_header_not_present(server):1115 client = AsyncHTTPClient()1116 response = await client.fetch(f"http://localhost:{server_bind_port.get()}/api/v2/environment_settings", raise_error=False)1117 assert response.code == 4001118async def test_malformed_json(server):1119 """1120 Tests sending malformed json to the server1121 """1122 port = opt.get_bind_port()1123 url = f"http://localhost:{port}/api/v2/environment"1124 request = HTTPRequest(url=url, method="PUT", body='{"name": env}')1125 client = AsyncHTTPClient()1126 response = await client.fetch(request, raise_error=False)1127 assert response.code == 4001128 assert (1129 json.loads(response.body)["message"]1130 == "The request body couldn't be decoded as a JSON: Expecting value: line 1 column 10 (char 9)"1131 )1132async def test_tuple_index_out_of_range(unused_tcp_port, postgres_db, database_name, async_finalizer):1133 configure(unused_tcp_port, database_name, postgres_db.port)1134 class Project(BaseModel):1135 name: str1136 value: str1137 class ProjectServer(ServerSlice):1138 @protocol.typedmethod(1139 api_prefix="test", path="/project/<project>", operation="GET", arg_options=ENV_OPTS, client_types=["api"]1140 )1141 def test_method(1142 tid: uuid.UUID, project: str, include_deleted: bool = False1143 ) -> List[Union[uuid.UUID, Project, bool]]: # NOQA1144 pass1145 @protocol.handle(test_method)1146 async def test_method(1147 tid: uuid.UUID, project: Project, include_deleted: bool = False1148 ) -> List[Union[uuid.UUID, Project, bool]]: # NOQA1149 return [tid, project, include_deleted]1150 rs = Server()1151 server = ProjectServer(name="projectserver")1152 rs.add_slice(server)1153 await rs.start()1154 async_finalizer.add(server.stop)1155 async_finalizer.add(rs.stop)1156 port = opt.get_bind_port()1157 url = f"http://localhost:{port}/test/v1/project/afcb51dc-1043-42b6-bb99-b4fc88603126"1158 request = HTTPRequest(url=url, method="GET")1159 client = AsyncHTTPClient()1160 response = await client.fetch(request, raise_error=False)1161 assert response.code == 4001162 assert json.loads(response.body)["message"] == "Invalid request: Field 'tid' is required."1163async def test_multiple_path_params(unused_tcp_port, postgres_db, database_name, async_finalizer):1164 configure(unused_tcp_port, database_name, postgres_db.port)1165 class ProjectServer(ServerSlice):1166 @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1167 def test_method(id: str, name: str, age: int) -> str: # NOQA1168 pass1169 @protocol.handle(test_method)1170 async def test_methodY(self, id: str, name: str, age: int) -> str: # NOQA1171 return name1172 rs = Server()1173 server = ProjectServer(name="projectserver")1174 rs.add_slice(server)1175 await rs.start()1176 async_finalizer.add(server.stop)1177 async_finalizer.add(rs.stop)1178 request = MethodProperties.methods["test_method"][0].build_call(args=[], kwargs={"id": "1", "name": "monty", "age": 42})1179 assert request.url == "/api/v1/test/1/monty?age=42"1180async def test_2151_method_header_parameter_in_body(async_finalizer) -> None:1181 async def _id(x: object, dct: Dict[str, str]) -> object:1182 return x1183 @protocol.method(1184 path="/testmethod",1185 operation="POST",1186 arg_options={"header_param": ArgOption(header="X-Inmanta-Header-Param", getter=_id)},1187 client_types=[const.ClientType.api],1188 )1189 def test_method(header_param: str, body_param: str) -> None:1190 """1191 A method used for testing.1192 """1193 class TestSlice(ServerSlice):1194 @protocol.handle(test_method)1195 async def test_method_implementation(self, header_param: str, body_param: str) -> None:1196 pass1197 server: Server = Server()1198 server_slice: ServerSlice = TestSlice("my_test_slice")1199 server.add_slice(server_slice)1200 await server.start()1201 async_finalizer.add(server_slice.stop)1202 async_finalizer.add(server.stop)1203 client = tornado.httpclient.AsyncHTTPClient()1204 # valid request should succeed1205 request = tornado.httpclient.HTTPRequest(1206 url=f"http://localhost:{opt.get_bind_port()}/api/v1/testmethod",1207 method="POST",1208 body=json_encode({"body_param": "body_param_value"}),1209 headers={"X-Inmanta-Header-Param": "header_param_value"},1210 )1211 response: tornado.httpclient.HTTPResponse = await client.fetch(request)1212 assert response.code == 2001213 # invalid request should fail1214 request = tornado.httpclient.HTTPRequest(1215 url=f"http://localhost:{opt.get_bind_port()}/api/v1/testmethod",1216 method="POST",1217 body=json_encode({"header_param": "header_param_value", "body_param": "body_param_value"}),1218 )1219 with pytest.raises(tornado.httpclient.HTTPClientError):1220 await client.fetch(request)1221@pytest.mark.parametrize("return_value,valid", [(1, True), (None, True), ("Hello World!", False)])1222async def test_2277_typedmethod_return_optional(async_finalizer, return_value: object, valid: bool) -> None:1223 @protocol.typedmethod(1224 path="/typedtestmethod",1225 operation="GET",1226 client_types=[const.ClientType.api],1227 api_version=1,1228 )1229 def test_method_typed() -> Optional[int]:1230 """1231 A typedmethod used for testing.1232 """1233 class TestSlice(ServerSlice):1234 @protocol.handle(test_method_typed)1235 async def test_method_typed_implementation(self) -> Optional[int]:1236 return return_value # type: ignore1237 server: Server = Server()1238 server_slice: ServerSlice = TestSlice("my_test_slice")1239 server.add_slice(server_slice)1240 await server.start()1241 async_finalizer.add(server_slice.stop)1242 async_finalizer.add(server.stop)1243 client: protocol.Client = protocol.Client("client")1244 response: Result = await client.test_method_typed()1245 if valid:1246 assert response.code == 2001247 assert response.result == {"data": return_value}1248 else:1249 assert response.code == 4001250def test_method_strict_exception() -> None:1251 with pytest.raises(InvalidMethodDefinition, match="Invalid type for argument arg: Any type is not allowed in strict mode"):1252 @protocol.typedmethod(path="/testmethod", operation="POST", client_types=[const.ClientType.api])1253 def test_method(arg: Any) -> None:1254 pass1255async def test_method_nonstrict_allowed(async_finalizer) -> None:1256 @protocol.typedmethod(path="/zipsingle", operation="POST", client_types=[const.ClientType.api], strict_typing=False)1257 def merge_dicts(one: Dict[str, Any], other: Dict[str, int], any_arg: Any) -> Dict[str, Any]:1258 """1259 Merge two dicts.1260 """1261 class TestSlice(ServerSlice):1262 @protocol.handle(merge_dicts)1263 async def merge_dicts_impl(self, one: Dict[str, Any], other: Dict[str, int], any_arg: Any) -> Dict[str, Any]:1264 return {**one, **other}1265 server: Server = Server()1266 server_slice: ServerSlice = TestSlice("my_test_slice")1267 server.add_slice(server_slice)1268 await server.start()1269 async_finalizer.add(server_slice.stop)1270 async_finalizer.add(server.stop)1271 client: protocol.Client = protocol.Client("client")1272 one: Dict[str, Any] = {"my": {"nested": {"keys": 42}}}1273 other: Dict[str, int] = {"single_level": 42}1274 response: Result = await client.merge_dicts(one, other, None)1275 assert response.code == 2001276 assert response.result == {"data": {**one, **other}}1277@pytest.mark.parametrize(1278 "param_type,param_value,expected_url",1279 [1280 (1281 Dict[str, str],1282 {"a": "b", "c": "d", ",&?=%": ",&?=%."},1283 "/api/v1/test/1/monty?filter.a=b&filter.c=d&filter.%2C%26%3F%3D%25=%2C%26%3F%3D%25.",1284 ),1285 (1286 Dict[str, List[str]],1287 {"a": ["b"], "c": ["d", "e"], "g": ["h"]},1288 "/api/v1/test/1/monty?filter.a=b&filter.c=d&filter.c=e&filter.g=h",1289 ),1290 (1291 Dict[str, List[str]],1292 {"a": ["b"], "c": ["d", "e"], ",&?=%": [",&?=%", "f"], ".g.h": ["i"]},1293 "/api/v1/test/1/monty?filter.a=b&filter.c=d&filter.c=e"1294 "&filter.%2C%26%3F%3D%25=%2C%26%3F%3D%25&filter.%2C%26%3F%3D%25=f&filter..g.h=i",1295 ),1296 (1297 List[str],1298 [1299 "a ",1300 "b,",1301 "c",1302 ],1303 "/api/v1/test/1/monty?filter=a+&filter=b%2C&filter=c",1304 ),1305 (1306 List[str],1307 ["a", "b", ",&?=%", "c", "."],1308 "/api/v1/test/1/monty?filter=a&filter=b&filter=%2C%26%3F%3D%25&filter=c&filter=.",1309 ),1310 (List[str], ["a ", "b", "c", ","], "/api/v1/test/1/monty?filter=a+&filter=b&filter=c&filter=%2C"),1311 ],1312)1313async def test_dict_list_get_roundtrip(1314 unused_tcp_port, postgres_db, database_name, async_finalizer, param_type, param_value, expected_url1315):1316 configure(unused_tcp_port, database_name, postgres_db.port)1317 class ProjectServer(ServerSlice):1318 @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"], strict_typing=False)1319 def test_method(id: str, name: str, filter: param_type) -> Any: # NOQA1320 pass1321 @protocol.handle(test_method)1322 async def test_method(self, id: str, name: str, filter: param_type) -> Any: # NOQA1323 return filter1324 rs = Server()1325 server = ProjectServer(name="projectserver")1326 rs.add_slice(server)1327 await rs.start()1328 async_finalizer.add(server.stop)1329 async_finalizer.add(rs.stop)1330 request = MethodProperties.methods["test_method"][0].build_call(1331 args=[], kwargs={"id": "1", "name": "monty", "filter": param_value}1332 )1333 assert request.url == expected_url1334 client: protocol.Client = protocol.Client("client")1335 response: Result = await client.test_method(1, "monty", filter=param_value)1336 assert response.code == 2001337 assert response.result["data"] == param_value1338async def test_dict_get_optional(unused_tcp_port, postgres_db, database_name, async_finalizer):1339 configure(unused_tcp_port, database_name, postgres_db.port)1340 class ProjectServer(ServerSlice):1341 @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1342 def test_method(id: str, name: str, filter: Optional[Dict[str, str]] = None) -> str: # NOQA1343 pass1344 @protocol.handle(test_method)1345 async def test_method(self, id: str, name: str, filter: Optional[Dict[str, str]] = None) -> str: # NOQA1346 return ",".join(filter.keys()) if filter is not None else ""1347 rs = Server()1348 server = ProjectServer(name="projectserver")1349 rs.add_slice(server)1350 await rs.start()1351 async_finalizer.add(server.stop)1352 async_finalizer.add(rs.stop)1353 request = MethodProperties.methods["test_method"][0].build_call(1354 args=[], kwargs={"id": "1", "name": "monty", "filter": {"a": "b"}}1355 )1356 assert request.url == "/api/v1/test/1/monty?filter.a=b"1357 client: protocol.Client = protocol.Client("client")1358 response: Result = await client.test_method(1, "monty", filter={"a": "b", "c": "d"})1359 assert response.code == 2001360 assert response.result["data"] == "a,c"1361 response: Result = await client.test_method(1, "monty")1362 assert response.code == 2001363 assert response.result["data"] == ""1364async def test_dict_list_nested_get_optional(unused_tcp_port, postgres_db, database_name, async_finalizer):1365 configure(unused_tcp_port, database_name, postgres_db.port)1366 class ProjectServer(ServerSlice):1367 @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1368 def test_method(id: str, name: str, filter: Optional[Dict[str, List[str]]] = None) -> str: # NOQA1369 pass1370 @protocol.handle(test_method)1371 async def test_method(self, id: str, name: str, filter: Optional[Dict[str, List[str]]] = None) -> str: # NOQA1372 return ",".join(filter.keys()) if filter is not None else ""1373 rs = Server()1374 server = ProjectServer(name="projectserver")1375 rs.add_slice(server)1376 await rs.start()1377 async_finalizer.add(server.stop)1378 async_finalizer.add(rs.stop)1379 request = MethodProperties.methods["test_method"][0].build_call(1380 args=[], kwargs={"id": "1", "name": "monty", "filter": {"a": ["b"]}}1381 )1382 assert request.url == "/api/v1/test/1/monty?filter.a=b"1383 client: protocol.Client = protocol.Client("client")1384 response: Result = await client.test_method(1, "monty", filter={"a": "b", "c": ["d", "e"]})1385 assert response.code == 2001386 assert response.result["data"] == "a,c"1387 response: Result = await client.test_method(1, "monty")1388 assert response.code == 2001389 assert response.result["data"] == ""1390@pytest.mark.parametrize(1391 "param_type,expected_error_message",1392 [1393 (1394 Dict[str, Dict[str, str]],1395 "nested dictionaries and union types for dictionary values are not supported for GET requests",1396 ),1397 (1398 Dict[str, Union[str, List[str]]],1399 "nested dictionaries and union types for dictionary values are not supported for GET requests",1400 ),1401 (List[Dict[str, str]], "lists of dictionaries and lists of lists are not supported for GET requests"),1402 (List[List[str]], "lists of dictionaries and lists of lists are not supported for GET requests"),1403 ],1404)1405async def test_dict_list_get_invalid(1406 unused_tcp_port, postgres_db, database_name, async_finalizer, param_type, expected_error_message1407):1408 configure(unused_tcp_port, database_name, postgres_db.port)1409 with pytest.raises(InvalidMethodDefinition) as e:1410 class ProjectServer(ServerSlice):1411 @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1412 def test_method(id: str, name: str, filter: param_type) -> str: # NOQA1413 pass1414 @protocol.handle(test_method)1415 async def test_method(self, id: str, name: str, filter: param_type) -> str: # NOQA1416 return ""1417 assert expected_error_message in str(e)1418async def test_list_get_optional(unused_tcp_port, postgres_db, database_name, async_finalizer):1419 configure(unused_tcp_port, database_name, postgres_db.port)1420 class ProjectServer(ServerSlice):1421 @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1422 def test_method(id: str, name: str, sort: Optional[List[int]] = None) -> str: # NOQA1423 pass1424 @protocol.typedmethod(path="/test_uuid/<id>", operation="GET", client_types=["api"])1425 def test_method_uuid(id: str, sort: Optional[List[uuid.UUID]] = None) -> str: # NOQA1426 pass1427 @protocol.handle(test_method)1428 async def test_method(self, id: str, name: str, sort: Optional[List[int]] = None) -> str: # NOQA1429 return str(sort) if sort else ""1430 @protocol.handle(test_method_uuid)1431 async def test_method_uuid(self, id: str, sort: Optional[List[uuid.UUID]] = None) -> str: # NOQA1432 return str(sort) if sort else ""1433 rs = Server()1434 server = ProjectServer(name="projectserver")1435 rs.add_slice(server)1436 await rs.start()1437 async_finalizer.add(server.stop)1438 async_finalizer.add(rs.stop)1439 request = MethodProperties.methods["test_method"][0].build_call(1440 args=[], kwargs={"id": "1", "name": "monty", "sort": [1, 2]}1441 )1442 assert request.url == "/api/v1/test/1/monty?sort=1&sort=2"1443 client: protocol.Client = protocol.Client("client")1444 response: Result = await client.test_method(1, "monty", sort=[1, 2])1445 assert response.code == 2001446 assert response.result["data"] == "[1, 2]"1447 response: Result = await client.test_method(1, "monty")1448 assert response.code == 2001449 assert response.result["data"] == ""1450 uuids = [uuid.uuid4(), uuid.uuid4()]1451 request = MethodProperties.methods["test_method_uuid"][0].build_call(args=[], kwargs={"id": "1", "sort": uuids})1452 assert request.url == f"/api/v1/test_uuid/1?sort={uuids[0]}&sort={uuids[1]}"1453async def test_dicts_multiple_get(unused_tcp_port, postgres_db, database_name, async_finalizer):1454 configure(unused_tcp_port, database_name, postgres_db.port)1455 class ProjectServer(ServerSlice):1456 @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1457 def test_method(id: str, name: str, filter: Dict[str, List[str]], another_filter: Dict[str, str]) -> str: # NOQA1458 pass1459 @protocol.handle(test_method)1460 async def test_method(1461 self, id: str, name: str, filter: Dict[str, List[str]], another_filter: Dict[str, str]1462 ) -> str: # NOQA1463 return ",".join(chain(filter.keys(), another_filter.keys()))1464 rs = Server()1465 server = ProjectServer(name="projectserver")1466 rs.add_slice(server)1467 await rs.start()1468 async_finalizer.add(server.stop)1469 async_finalizer.add(rs.stop)1470 request = MethodProperties.methods["test_method"][0].build_call(1471 args=[], kwargs={"id": "1", "name": "monty", "filter": {"a": ["b", "c"]}, "another_filter": {"d": "e"}}1472 )1473 assert request.url == "/api/v1/test/1/monty?filter.a=b&filter.a=c&another_filter.d=e"1474 client: protocol.Client = protocol.Client("client")1475 response: Result = await client.test_method(1, "monty", filter={"a": ["b"], "c": ["d", "e"]}, another_filter={"x": "y"})1476 assert response.code == 2001477 assert response.result["data"] == "a,c,x"1478async def test_dict_list_get_by_url(unused_tcp_port, postgres_db, database_name, async_finalizer):1479 configure(unused_tcp_port, database_name, postgres_db.port)1480 class ProjectServer(ServerSlice):1481 @protocol.typedmethod(path="/test/<id>/<name>", operation="GET", client_types=["api"])1482 def test_method(id: str, name: str, filter: Dict[str, str]) -> str: # NOQA1483 pass1484 @protocol.typedmethod(path="/test_list/<id>", operation="GET", client_types=["api"])1485 def test_method_list(id: str, filter: List[int]) -> str: # NOQA1486 pass1487 @protocol.typedmethod(path="/test_dict_of_lists/<id>", operation="GET", client_types=["api"])1488 def test_method_dict_of_lists(id: str, filter: Dict[str, List[str]]) -> str: # NOQA1489 pass1490 @protocol.handle(test_method)1491 async def test_method(self, id: str, name: str, filter: Dict[str, str]) -> str: # NOQA1492 return ",".join(filter.keys())1493 @protocol.handle(test_method_list)1494 async def test_method_list(self, id: str, filter: List[int]) -> str: # NOQA1495 return str(filter)1496 @protocol.handle(test_method_dict_of_lists)1497 async def test_method_dict_of_lists(self, id: str, filter: Dict[str, List[str]]) -> str: # NOQA1498 return ",".join(filter.keys())1499 rs = Server()1500 server = ProjectServer(name="projectserver")1501 rs.add_slice(server)1502 await rs.start()1503 async_finalizer.add(server.stop)1504 async_finalizer.add(rs.stop)1505 client = AsyncHTTPClient()1506 response = await client.fetch(f"http://localhost:{server_bind_port.get()}/api/v1/test/1/monty?filter.=b", raise_error=False)1507 assert response.code == 4001508 response = await client.fetch(1509 f"http://localhost:{server_bind_port.get()}/api/v1/test/1/monty?filter.a=b&filter.=c", raise_error=False1510 )1511 assert response.code == 4001512 response = await client.fetch(1513 f"http://localhost:{server_bind_port.get()}/api/v1/test/1/monty?filter.a=b&filter.a=c", raise_error=False1514 )1515 assert response.code == 4001516 response = await client.fetch(1517 f"http://localhost:{server_bind_port.get()}/api/v1/test_list/1?filter.a=b&filter.=c", raise_error=False1518 )1519 assert response.code == 4001520 # Integer should also work1521 response = await client.fetch(1522 f"http://localhost:{server_bind_port.get()}/api/v1/test_list/1?filter=42&filter=45", raise_error=False1523 )1524 assert response.code == 2001525 # list nested in dict1526 response = await client.fetch(1527 f"http://localhost:{server_bind_port.get()}/api/v1/test_dict_of_lists/1?filter.a=42&filter.a=55&filter.b=e",1528 raise_error=False,1529 )1530 assert response.code == 2001531 filter_with_comma = {"filter.a": "42,55,%2C70", "filter.b": "e"}1532 url = url_concat(f"http://localhost:{server_bind_port.get()}/api/v1/test_dict_of_lists/1", filter_with_comma)1533 response = await client.fetch(1534 url,1535 raise_error=False,1536 )1537 assert response.code == 2001538 response = await client.fetch(1539 f"http://localhost:{server_bind_port.get()}/api/v1/test_dict_of_lists/1?filter.a=42&filter.a=55&filter.&filter.c=a",1540 raise_error=False,1541 )1542 assert response.code == 4001543 filter_with_comma = {"filter.a": "b", "filter.c": "e", "filter.,&?=%": ",&?=%"}1544 url = url_concat(f"http://localhost:{server_bind_port.get()}/api/v1/test/1/monty", filter_with_comma)1545 response = await client.fetch(1546 url,1547 raise_error=False,1548 )1549 assert response.code == 2001550async def test_api_datetime_utc(unused_tcp_port, postgres_db, database_name, async_finalizer):1551 """1552 Test API input and output conversion for timestamps. Objects should be either timezone-aware or implicit UTC.1553 """1554 configure(unused_tcp_port, database_name, postgres_db.port)1555 timezone: datetime.timezone = datetime.timezone(datetime.timedelta(hours=2))1556 now: datetime.datetime = datetime.datetime.now().astimezone(timezone)1557 naive_utc: datetime.datetime = now.astimezone(datetime.timezone.utc).replace(tzinfo=None)1558 class ProjectServer(ServerSlice):1559 @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])1560 def test_method(timestamp: datetime.datetime) -> List[datetime.datetime]:1561 pass1562 @protocol.handle(test_method)1563 async def test_method(self, timestamp: datetime.datetime) -> List[datetime.datetime]:1564 assert timestamp.tzinfo is not None1565 assert timestamp == now1566 return [1567 now,1568 now.astimezone(datetime.timezone.utc),1569 now.astimezone(datetime.timezone.utc).replace(tzinfo=None),1570 ]1571 rs = Server()1572 server = ProjectServer(name="projectserver")1573 rs.add_slice(server)1574 await rs.start()1575 async_finalizer.add(server.stop)1576 async_finalizer.add(rs.stop)1577 client: protocol.Client = protocol.Client("client")1578 response: Result = await client.test_method(timestamp=now)1579 assert response.code == 2001580 assert all(pydantic.parse_obj_as(datetime.datetime, timestamp) == naive_utc for timestamp in response.result["data"])1581 response: Result = await client.test_method(timestamp=now.astimezone(datetime.timezone.utc))1582 assert response.code == 2001583 response: Result = await client.test_method(timestamp=now.astimezone(datetime.timezone.utc).replace(tzinfo=None))1584 assert response.code == 2001585 response: Result = await client.test_method(timestamp=now.replace(tzinfo=None))1586 assert response.code == 5001587 # Test REST API without going through Python client1588 port = opt.get_bind_port()1589 client = AsyncHTTPClient()1590 async def request(timestamp: datetime.datetime) -> tornado.httpclient.HTTPResponse:1591 request = HTTPRequest(1592 url=(1593 f"http://localhost:{port}/api/v1/test?timestamp="1594 f"{urllib.parse.quote(timestamp.isoformat(timespec='microseconds'))}"1595 ),1596 method="GET",1597 )1598 return await client.fetch(request)1599 response = await request(now)1600 assert response.code == 2001601 response = await request(now.astimezone(datetime.timezone.utc).replace(tzinfo=None))1602 assert response.code == 2001603 with pytest.raises(tornado.httpclient.HTTPClientError):1604 response = await request(now.replace(tzinfo=None))1605async def test_dict_of_list(unused_tcp_port, postgres_db, database_name, async_finalizer):1606 """1607 Test API input and output conversion for timestamps. Objects should be either timezone-aware or implicit UTC.1608 """1609 configure(unused_tcp_port, database_name, postgres_db.port)1610 class APydanticType(BaseModel):1611 attr: int1612 class ProjectServer(ServerSlice):1613 @protocol.typedmethod(path="/test", operation="GET", client_types=[const.ClientType.api])1614 def test_method(id: str) -> Dict[str, List[APydanticType]]:1615 pass1616 @protocol.handle(test_method)1617 async def test_method(self, id: str) -> Dict[str, List[APydanticType]]:1618 return {id: [APydanticType(attr=1), APydanticType(attr=5)]}1619 rs = Server()1620 server = ProjectServer(name="projectserver")1621 rs.add_slice(server)1622 await rs.start()1623 async_finalizer.add(server.stop)1624 async_finalizer.add(rs.stop)1625 client: protocol.Client = protocol.Client("client")1626 result = await client.test_method(id="test")1627 assert result.code == 200, result.result["message"]1628 assert result.result["data"] == {"test": [{"attr": 1}, {"attr": 5}]}1629async def test_return_value_with_meta(unused_tcp_port, postgres_db, database_name, async_finalizer):1630 configure(unused_tcp_port, database_name, postgres_db.port)1631 class ProjectServer(ServerSlice):1632 @protocol.typedmethod(path="/test", operation="GET", client_types=["api"])1633 def test_method(with_warning: bool) -> ReturnValueWithMeta[str]: # NOQA1634 pass1635 @protocol.handle(test_method)1636 async def test_method(self, with_warning: bool) -> ReturnValueWithMeta: # NOQA1637 metadata = {"additionalInfo": f"Today's bitcoin exchange rate is: {(random.random() * 100000):.2f}$"}1638 result = ReturnValueWithMeta(response="abcd", metadata=metadata)1639 if with_warning:1640 result.add_warnings(["Warning message"])1641 return result1642 rs = Server()1643 server = ProjectServer(name="projectserver")1644 rs.add_slice(server)1645 await rs.start()1646 async_finalizer.add(server.stop)1647 async_finalizer.add(rs.stop)1648 client: protocol.Client = protocol.Client("client")1649 response = await client.test_method(False)1650 assert response.code == 2001651 assert response.result["data"] == "abcd"1652 assert response.result["metadata"].get("additionalInfo") is not None1653 assert response.result["metadata"].get("warnings") is None1654 response = await client.test_method(True)1655 assert response.code == 2001656 assert response.result["data"] == "abcd"1657 assert response.result["metadata"].get("additionalInfo") is not None1658 assert response.result["metadata"].get("warnings") is not None1659async def test_kwargs(unused_tcp_port, postgres_db, database_name, async_finalizer):1660 """1661 Test the use and validation of methods that use common.ReturnValue1662 """1663 configure(unused_tcp_port, database_name, postgres_db.port)1664 class ProjectServer(ServerSlice):1665 @protocol.typedmethod(path="/test", operation="POST", client_types=[ClientType.api], varkw=True)1666 def test_method(id: str, **kwargs: object) -> Dict[str, str]: # NOQA1667 """1668 Create a new project1669 """1670 @protocol.handle(test_method)1671 async def test_method(self, id: str, **kwargs: object) -> Dict[str, str]:1672 return {"name": str(kwargs["name"]), "value": str(kwargs["value"])}1673 rs = Server()1674 server = ProjectServer(name="projectserver")1675 rs.add_slice(server)1676 await rs.start()1677 async_finalizer.add(server.stop)1678 async_finalizer.add(rs.stop)1679 client = protocol.Client("client")1680 result = await client.test_method(id="test", name="test", value=True)1681 assert result.code == 2001682 assert result.result["data"]["name"] == "test"...
plugin.py
Source:plugin.py
...85 res = await gen_obj.__anext__()86 return res87 def finalizer():88 """Yield again, to finalize."""89 async def async_finalizer():90 try:91 await gen_obj.__anext__()92 except StopAsyncIteration:93 pass94 else:95 msg = "Async generator fixture didn't stop."96 msg += "Yield only once."97 raise ValueError(msg)98 loop.run_until_complete(async_finalizer())99 request.addfinalizer(finalizer)100 return loop.run_until_complete(setup())101 fixturedef.func = wrapper102 elif inspect.iscoroutinefunction(fixturedef.func):103 coro = fixturedef.func104 fixture_stripper = FixtureStripper(fixturedef)105 fixture_stripper.add(FixtureStripper.EVENT_LOOP)106 def wrapper(*args, **kwargs):107 loop = fixture_stripper.get_and_strip_from(FixtureStripper.EVENT_LOOP, kwargs)108 async def setup():109 res = await coro(*args, **kwargs)110 return res111 return loop.run_until_complete(setup())112 fixturedef.func = wrapper...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!