Best Python code snippet using localstack_python
testing.py
Source:testing.py
1"""Utilities for testing. Includes a base test class2for testing parsers.3.. warning::4 Methods and functions in this module may change without5 warning and without a major version change.6"""7import pytest8import webtest9from webargs.core import json10class CommonTestCase:11 """Base test class that defines test methods for common functionality across all12 parsers. Subclasses must define `create_app`, which returns a WSGI-like app.13 """14 def create_app(self):15 """Return a WSGI app"""16 raise NotImplementedError("Must define create_app()")17 def create_testapp(self, app):18 return webtest.TestApp(app)19 def before_create_app(self):20 pass21 def after_create_app(self):22 pass23 @pytest.fixture(scope="class")24 def testapp(self):25 self.before_create_app()26 yield self.create_testapp(self.create_app())27 self.after_create_app()28 def test_parse_querystring_args(self, testapp):29 assert testapp.get("/echo?name=Fred").json == {"name": "Fred"}30 def test_parse_form(self, testapp):31 assert testapp.post("/echo_form", {"name": "Joe"}).json == {"name": "Joe"}32 def test_parse_json(self, testapp):33 assert testapp.post_json("/echo_json", {"name": "Fred"}).json == {34 "name": "Fred"35 }36 def test_parse_json_missing(self, testapp):37 assert testapp.post("/echo_json", "").json == {"name": "World"}38 def test_parse_json_or_form(self, testapp):39 assert testapp.post_json("/echo_json_or_form", {"name": "Fred"}).json == {40 "name": "Fred"41 }42 assert testapp.post("/echo_json_or_form", {"name": "Joe"}).json == {43 "name": "Joe"44 }45 assert testapp.post("/echo_json_or_form", "").json == {"name": "World"}46 def test_parse_querystring_default(self, testapp):47 assert testapp.get("/echo").json == {"name": "World"}48 def test_parse_json_default(self, testapp):49 assert testapp.post_json("/echo_json", {}).json == {"name": "World"}50 def test_parse_json_with_charset(self, testapp):51 res = testapp.post(52 "/echo_json",53 json.dumps({"name": "Steve"}),54 content_type="application/json;charset=UTF-8",55 )56 assert res.json == {"name": "Steve"}57 def test_parse_json_with_vendor_media_type(self, testapp):58 res = testapp.post(59 "/echo_json",60 json.dumps({"name": "Steve"}),61 content_type="application/vnd.api+json;charset=UTF-8",62 )63 assert res.json == {"name": "Steve"}64 def test_parse_ignore_extra_data(self, testapp):65 assert testapp.post_json(66 "/echo_ignoring_extra_data", {"extra": "data"}67 ).json == {"name": "World"}68 def test_parse_json_empty(self, testapp):69 assert testapp.post_json("/echo_json", {}).json == {"name": "World"}70 def test_parse_json_error_unexpected_int(self, testapp):71 res = testapp.post_json("/echo_json", 1, expect_errors=True)72 assert res.status_code == 42273 def test_parse_json_error_unexpected_list(self, testapp):74 res = testapp.post_json("/echo_json", [{"extra": "data"}], expect_errors=True)75 assert res.status_code == 42276 def test_parse_json_many_schema_invalid_input(self, testapp):77 res = testapp.post_json(78 "/echo_many_schema", [{"name": "a"}], expect_errors=True79 )80 assert res.status_code == 42281 def test_parse_json_many_schema(self, testapp):82 res = testapp.post_json("/echo_many_schema", [{"name": "Steve"}]).json83 assert res == [{"name": "Steve"}]84 def test_parse_json_many_schema_error_malformed_data(self, testapp):85 res = testapp.post_json(86 "/echo_many_schema", {"extra": "data"}, expect_errors=True87 )88 assert res.status_code == 42289 def test_parsing_form_default(self, testapp):90 assert testapp.post("/echo_form", {}).json == {"name": "World"}91 def test_parse_querystring_multiple(self, testapp):92 expected = {"name": ["steve", "Loria"]}93 assert testapp.get("/echo_multi?name=steve&name=Loria").json == expected94 # test that passing a single value parses correctly95 # on parsers like falconparser, where there is no native MultiDict type,96 # this verifies the usage of MultiDictProxy to ensure that single values97 # are "listified"98 def test_parse_querystring_multiple_single_value(self, testapp):99 expected = {"name": ["steve"]}100 assert testapp.get("/echo_multi?name=steve").json == expected101 def test_parse_form_multiple(self, testapp):102 expected = {"name": ["steve", "Loria"]}103 assert (104 testapp.post("/echo_multi_form", {"name": ["steve", "Loria"]}).json105 == expected106 )107 def test_parse_json_list(self, testapp):108 expected = {"name": ["Steve"]}109 assert (110 testapp.post_json("/echo_multi_json", {"name": ["Steve"]}).json == expected111 )112 def test_parse_json_list_error_malformed_data(self, testapp):113 res = testapp.post_json(114 "/echo_multi_json", {"name": "Steve"}, expect_errors=True115 )116 assert res.status_code == 422117 def test_parse_json_with_nonascii_chars(self, testapp):118 text = "øËÆ£ººâÆËâ"119 assert testapp.post_json("/echo_json", {"name": text}).json == {"name": text}120 # https://github.com/marshmallow-code/webargs/issues/427121 def test_parse_json_with_nonutf8_chars(self, testapp):122 res = testapp.post(123 "/echo_json",124 b"\xfe",125 headers={"Accept": "application/json", "Content-Type": "application/json"},126 expect_errors=True,127 )128 assert res.status_code == 400129 assert res.json == {"json": ["Invalid JSON body."]}130 def test_validation_error_returns_422_response(self, testapp):131 res = testapp.post_json("/echo_json", {"name": "b"}, expect_errors=True)132 assert res.status_code == 422133 def test_user_validation_error_returns_422_response_by_default(self, testapp):134 res = testapp.post_json("/error", {"text": "foo"}, expect_errors=True)135 assert res.status_code == 422136 def test_use_args_decorator(self, testapp):137 assert testapp.get("/echo_use_args?name=Fred").json == {"name": "Fred"}138 def test_use_args_with_path_param(self, testapp):139 url = "/echo_use_args_with_path_param/foo"140 res = testapp.get(url + "?value=42")141 assert res.json == {"value": 42}142 def test_use_args_with_validation(self, testapp):143 result = testapp.post("/echo_use_args_validated", {"value": 43})144 assert result.status_code == 200145 result = testapp.post(146 "/echo_use_args_validated", {"value": 41}, expect_errors=True147 )148 assert result.status_code == 422149 def test_use_kwargs_decorator(self, testapp):150 assert testapp.get("/echo_use_kwargs?name=Fred").json == {"name": "Fred"}151 def test_use_kwargs_with_path_param(self, testapp):152 url = "/echo_use_kwargs_with_path_param/foo"153 res = testapp.get(url + "?value=42")154 assert res.json == {"value": 42}155 def test_parsing_headers(self, testapp):156 res = testapp.get("/echo_headers", headers={"name": "Fred"})157 assert res.json == {"name": "Fred"}158 def test_parsing_cookies(self, testapp):159 testapp.set_cookie("name", "Steve")160 res = testapp.get("/echo_cookie")161 assert res.json == {"name": "Steve"}162 def test_parse_nested_json(self, testapp):163 res = testapp.post_json(164 "/echo_nested", {"name": {"first": "Steve", "last": "Loria"}}165 )166 assert res.json == {"name": {"first": "Steve", "last": "Loria"}}167 def test_parse_nested_many_json(self, testapp):168 in_data = {"users": [{"id": 1, "name": "foo"}, {"id": 2, "name": "bar"}]}169 res = testapp.post_json("/echo_nested_many", in_data)170 assert res.json == in_data171 # Regression test for https://github.com/marshmallow-code/webargs/issues/120172 def test_parse_nested_many_missing(self, testapp):173 in_data = {}174 res = testapp.post_json("/echo_nested_many", in_data)175 assert res.json == {}176 def test_parse_files(self, testapp):177 res = testapp.post(178 "/echo_file", {"myfile": webtest.Upload("README.rst", b"data")}179 )180 assert res.json == {"myfile": "data"}181 # https://github.com/sloria/webargs/pull/297182 def test_empty_json(self, testapp):183 res = testapp.post("/echo_json")184 assert res.status_code == 200185 assert res.json == {"name": "World"}186 # https://github.com/sloria/webargs/pull/297187 def test_empty_json_with_headers(self, testapp):188 res = testapp.post(189 "/echo_json",190 "",191 headers={"Accept": "application/json", "Content-Type": "application/json"},192 )193 assert res.status_code == 200194 assert res.json == {"name": "World"}195 # https://github.com/sloria/webargs/issues/329196 def test_invalid_json(self, testapp):197 res = testapp.post(198 "/echo_json",199 '{"foo": "bar", }',200 headers={"Accept": "application/json", "Content-Type": "application/json"},201 expect_errors=True,202 )203 assert res.status_code == 400204 assert res.json == {"json": ["Invalid JSON body."]}205 @pytest.mark.parametrize(206 ("path", "payload", "content_type"),207 [208 (209 "/echo_json",210 json.dumps({"name": "foo"}),211 "application/x-www-form-urlencoded",212 ),213 ("/echo_form", {"name": "foo"}, "application/json"),214 ],215 )216 def test_content_type_mismatch(self, testapp, path, payload, content_type):217 res = testapp.post(path, payload, headers={"Content-Type": content_type})...
lib.py
Source:lib.py
...32 channels = filter_channels(entities)33 assert len(channels) + len(chats) == len(34 entities35 ), f"{len(channels)} + {len(chats)} != {len(entities)}"36 echo_json(chats_to_list(chats))37 echo_json(channels_to_list(channels))38 if dry_run:39 return40 peer_chats = [types.InputPeerChat(c.id) for c in chats]41 peer_channels = [types.InputPeerChannel(c.id, c.access_hash) for c in channels]42 dialog_filter.include_peers = (43 dialog_filter.include_peers + peer_chats + peer_channels44 )45 request = messages.UpdateDialogFilterRequest(dialog_filter.id, dialog_filter)46 result = await client(request)47 click.echo(48 f'Added {len(entities)} entries to folder "{folder_title}": {result}', err=True49 )50def filter_chats(entities):51 return [e for e in entities if isinstance(e, types.Chat)]52def filter_channels(entities):53 return [e for e in entities if isinstance(e, types.Channel)]54def filter_users(entities):55 return [e for e in entities if isinstance(e, types.User)]56def chats_to_list(chats: List[types.Chat]) -> List[Dict[str, Any]]:57 return [{"type": "chat", "id": e.id, "title": e.title} for e in chats]58def channels_to_list(channels: List[types.Channel]) -> List[Dict[str, Any]]:59 return [{"type": "channel", "id": e.id, "title": e.title} for e in channels]60def users_to_list(users: List[types.User]) -> List[Dict[str, Any]]:61 return [{"type": "user", "id": e.id, "username": e.username} for e in users]62def get_entity_ids(peers: List[types.TypeInputPeer]) -> List[int]:63 entity_ids = [get_entity_id(peer) for peer in peers]64 result = [entity_id for entity_id in entity_ids if entity_id is not None]65 assert len(peers) == len(result), f"{len(peers)} != {len(result)}"66 return result67def get_entity_id(e: types.TypeInputPeer) -> Optional[int]:68 if isinstance(e, types.InputPeerChat):69 return e.chat_id70 if isinstance(e, (types.InputPeerUser, types.InputPeerUserFromMessage)):71 return e.user_id72 if isinstance(e, (types.InputPeerChannel, types.InputPeerChannelFromMessage)):73 return e.channel_id74 return None75async def async_command_common_chat_list(user_ids: List[int]):76 client = await new_client()77 async def get_common_chats(user_id: int) -> Set[int]:78 result = await client(79 messages.GetCommonChatsRequest(user_id, max_id=2147483647, limit=100)80 )81 chats: List[types.Chat] = result.chats82 return set(chat.id for chat in chats)83 chat_batches: List[Set[int]] = await asyncio.gather(84 *[get_common_chats(user_id) for user_id in user_ids]85 )86 chat_ids = set()87 for batch in chat_batches:88 chat_ids.update(batch)89 entities = await asyncio.gather(*[client.get_entity(peer) for peer in chat_ids])90 chats = filter_chats(entities)91 channels = filter_channels(entities)92 users = filter_users(entities)93 echo_json(chats_to_list(chats))94 echo_json(channels_to_list(channels))95 echo_json(users_to_list(users))96async def get_dialog_filter(client, title: str) -> Optional[types.DialogFilter]:97 dialog_filters: List[types.DialogFilter] = await client(98 messages.GetDialogFiltersRequest()99 )100 found = [f for f in dialog_filters if f.title == title]101 if not found:102 return103 return found[0]104async def async_command_user_list(folder_title: str):105 client = await new_client()106 dialog_filter = await get_dialog_filter(client, folder_title)107 if dialog_filter is None:108 click.echo(f"folder not found: {folder_title}", err=True)109 return110 entities = await asyncio.gather(111 *[112 client.get_entity(peer)113 for peer in dialog_filter.include_peers + dialog_filter.pinned_peers114 ]115 )116 chats = filter_chats(entities)117 channels = filter_channels(entities)118 users = filter_users(entities)119 echo_json(chats_to_list(chats))120 echo_json(channels_to_list(channels))121 echo_json(users_to_list(users))122async def new_client():123 workdir = os.path.join(os.environ["HOME"], ".tgfolder")124 cfg = load_config()125 session = await new_session(os.path.join(workdir, "session"))126 client = TelegramClient(session, api_id=cfg.api_id, api_hash=cfg.api_hash)127 client = await client.start()128 return client129async def async_command_list():130 client = await new_client()131 dialog_filters: List[types.DialogFilter] = await client(132 messages.GetDialogFiltersRequest()133 )134 titles = [item.title for item in dialog_filters]135 echo_json(titles)136def echo_json(arr: List[Any]):137 if len(arr) == 0:138 return139 click.echo("\n".join(json.dumps(s, ensure_ascii=False) for s in arr))140async def new_session(path: str) -> SQLiteSession:141 p = pathlib.Path(path)142 if not p.parent.exists():143 p.parent.mkdir(0o755, parents=True)144 return SQLiteSession(path)145@dataclass146class Config:147 api_id: int148 api_hash: str149def load_config() -> Config:150 # obfuscate a little to avoid brute force scan...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!