Best Python code snippet using fMBT_python
test_receiver.py
Source:test_receiver.py
...25 def finish_connect(self):26 protocol = ReceiverProtocol(self)27 protocol.makeConnection(self.transport)28 self.connected.callback(protocol)29 def write_response(self, msg):30 self.protocol.lineReceived(msg)31 def sent(self):32 data = self.transport.value()33 self.transport.clear()34 return data35def test_receiverbase_connect_then_write():36 class TestParent(MockedConnectionReceiver):37 test_connects_directly = False38 parent = TestParent("fakehost")39 parent.finish_connect()40 parent.write("Test")41 assert parent.sent() == b"Test\r"42def test_receiverbase_write_before_connect():43 class TestParent(MockedConnectionReceiver):44 test_connects_directly = False45 parent = TestParent("fakehost")46 parent.write("Test")47 assert parent.sent() == b""48 parent.finish_connect()49 assert parent.sent() == b"Test\r"50def test_receiverbase_multiple_writes_wait_for_each_response():51 class TestParent(MockedConnectionReceiver):52 test_connects_directly = False53 control = Control("PW")54 parent = TestParent("fakehost")55 parent.finish_connect()56 parent.write("Test")57 parent.write("Test2")58 parent.write("Test3")59 assert parent.sent() == b"Test\r"60 parent.write_response(b"Response")61 assert parent.sent() == b"Test2\r"62 parent.write_response(b"Response2")63 assert parent.sent() == b"Test3\r"64@pytest_twisted.inlineCallbacks65def test_receiverbase_multiple_writes_with_no_response():66 class TestParent(MockedConnectionReceiver):67 test_connects_directly = False68 control = Control("PW")69 parent = TestParent("fakehost", 0.1)70 parent.finish_connect()71 parent.write("Test")72 parent.write("Test2")73 parent.write("Test3")74 assert parent.sent() == b"Test\r"75 def check():76 assert parent.sent() == b"Test2\rTest3\r"77 yield reactor.callLater(1.0, check)78def test_receiverbase_multiple_writes_with_no_timeout():79 class TestParent(MockedConnectionReceiver):80 test_connects_directly = False81 control = Control("PW")82 parent = TestParent("fakehost", 0)83 parent.finish_connect()84 parent.write("Test")85 parent.write("Test2")86 parent.write("Test3")87 assert parent.sent() == b"Test\rTest2\rTest3\r"88@pytest_twisted.inlineCallbacks89def test_control__name_only():90 class TestParent(MockedConnectionReceiver):91 control = Control("PW")92 parent = TestParent("fakehost")93 d = parent.control94 assert parent.sent() == b"PW?\r"95 parent.write_response(b"PWOFF")96 d.addTimeout(1, reactor)97 value = yield d98 assert value == "OFF"99 parent.control = "ON"100 assert parent.sent() == b"PWON\r"101@pytest_twisted.inlineCallbacks102def test_control__override_status_command():103 class TestParent(MockedConnectionReceiver):104 control = Control("PW", status_command="PW ?")105 parent = TestParent("fakehost")106 d = parent.control107 assert parent.sent() == b"PW ?\r"108 parent.write_response(b"PWOFF")109 d.addTimeout(1, reactor)110 value = yield d111 assert value == "OFF"112 parent.control = "ON"113 assert parent.sent() == b"PWON\r"114@pytest_twisted.inlineCallbacks115def test_control__override_set_command():116 class TestParent(MockedConnectionReceiver):117 control = Control("PW", set_command="PWA")118 parent = TestParent("fakehost")119 d = parent.control120 assert parent.sent() == b"PW?\r"121 parent.write_response(b"PWOFF")122 d.addTimeout(1, reactor)123 value = yield d124 assert value == "OFF"125 parent.control = "ON"126 assert parent.sent() == b"PWAON\r"127@pytest_twisted.inlineCallbacks128def test_control__override_response_prefix():129 class TestParent(MockedConnectionReceiver):130 control = Control("PW", response_prefix="PWA")131 parent = TestParent("fakehost")132 d = parent.control133 assert parent.sent() == b"PW?\r"134 parent.write_response(b"PWAOFF")135 d.addTimeout(1, reactor)136 value = yield d137 assert value == "OFF"138 parent.control = "ON"139 assert parent.sent() == b"PWON\r"140@pytest_twisted.inlineCallbacks141def test_control__incorrect_response():142 class TestParent(MockedConnectionReceiver):143 control = Control("PW")144 parent = TestParent("fakehost")145 d = parent.control146 assert parent.sent() == b"PW?\r"147 parent.write_response(b"RWAUTO")148 d.addTimeout(1, reactor)149 with pytest.raises(TimeoutError):150 yield d151@pytest_twisted.inlineCallbacks152def test_control__ignore_other_responses():153 class TestParent(MockedConnectionReceiver):154 control = Control("PW")155 parent = TestParent("fakehost")156 d = parent.control157 assert parent.sent() == b"PW?\r"158 parent.write_response(b"CV20")159 parent.write_response(b"MV30")160 parent.write_response(b"MUON")161 parent.write_response(b"PWOFF")162 d.addTimeout(1, reactor)163 value = yield d164 assert value == "OFF"165@pytest_twisted.inlineCallbacks166def test_numericcontrol():167 class TestParent(MockedConnectionReceiver):168 control = NumericControl("MV")169 parent = TestParent("fakehost")170 d = parent.control171 assert parent.sent() == b"MV?\r"172 parent.write_response(b"MV50")173 d.addTimeout(1, reactor)174 value = yield d175 assert value == 50176 parent.write_response(b"MV70")177 d = parent.control178 assert not parent.sent()179 d.addTimeout(1, reactor)180 value = yield d181 assert value == 70182 parent.control = 4183 assert parent.sent() == b"MV04\r"184 parent.write_response(b"MV04")185 parent.control = 38186 assert parent.sent() == b"MV38\r"187@pytest_twisted.inlineCallbacks188def test_numericcontrol__invalid_response():189 class TestParent(MockedConnectionReceiver):190 control = NumericControl("MV")191 parent = TestParent("fakehost")192 d = parent.control193 assert parent.sent() == b"MV?\r"194 parent.write_response(b"MVO")195 parent.write_response(b"MVTRUE")196 parent.write_response(b"MVFALSE")197 parent.write_response(b"MV")198 parent.write_response(b"MVB30")199 d.addTimeout(1, reactor)200 with pytest.raises(TimeoutError):201 yield d202def test_numericcontrol__invalid_input():203 class TestParent(MockedConnectionReceiver):204 control = NumericControl("MV")205 parent = TestParent("fakehost")206 with pytest.raises(ValueError):207 parent.control = -1208 with pytest.raises(ValueError):209 parent.control = 100210@pytest_twisted.inlineCallbacks211def test_numericcontrol__more_digits():212 class TestParent(MockedConnectionReceiver):213 control = NumericControl("MV", digits=4)214 parent = TestParent("fakehost")215 parent.write_response(b"MV0050")216 value = yield parent.control.addTimeout(1, reactor)217 assert value == 50218 parent.write_response(b"MV0700")219 value = yield parent.control.addTimeout(1, reactor)220 assert value == 700221 parent.control = 4222 assert parent.sent() == b"MV0004\r"223 parent.write_response(b"MV0004")224 parent.control = 38225 assert parent.sent() == b"MV0038\r"226 parent.write_response(b"MV0038")227 parent.control = 210228 assert parent.sent() == b"MV0210\r"229 parent.write_response(b"MV0210")230 parent.control = 3233231 assert parent.sent() == b"MV3233\r"232 parent.write_response(b"MV3233")233def test_volumecontrol():234 class TestParent(MockedConnectionReceiver):235 control = VolumeControl("MV")236 parent = TestParent("fakehost")237 parent.control = "+"238 assert parent.sent() == b"MVUP\r"239 parent.write_response(b"MV31")240 parent.control = "-"241 assert parent.sent() == b"MVDOWN\r"242 parent.write_response(b"MV30")243@pytest_twisted.inlineCallbacks244def test_enumcontrol():245 class TestEnum(Enum):246 Auto = "AUTO"247 HDMI = "HDMI"248 Digital = "DIGITAL"249 class TestParent(MockedConnectionReceiver):250 control = EnumControl("SD", enum_type=TestEnum)251 parent = TestParent("fakehost")252 parent.write_response(b"SDAUTO")253 value = yield parent.control.addTimeout(1, reactor)254 assert value == TestEnum.Auto255 parent.write_response(b"SDHDMI")256 value = yield parent.control.addTimeout(1, reactor)257 assert value == TestEnum.HDMI258 parent.write_response(b"SDDIGITAL")259 value = yield parent.control.addTimeout(1, reactor)260 assert value == TestEnum.Digital261 parent.write_response(b"SDNO")262 with pytest.raises(TimeoutError):263 yield parent.control.addTimeout(1, reactor)264 assert parent.sent() == b"SD?\r"265 parent.write_response(b"SDNO")266 parent.control = TestEnum.Auto...
views.py
Source:views.py
...8 body = get_body(r)9 try:10 d = json.loads(body)11 except ValueError:12 write_response(w, '{"error": "incorrect json"}')13 return None14 return d15def not_found(r, w, params):16 write_response(w, '{"error": "404"}')17def register(r, w, params):18 d = get_json_or_error(r, w)19 if d is None:20 return21 if "username" not in d or "password" not in d:22 write_response(w, '{"error": "no username or password in body"}')23 return24 username = d["username"]25 password = d["password"]26 u = find_user(username)27 if u is not None:28 write_response(w, '{"error": "user already exists"}')29 return30 if not validate(username):31 write_response(w, '{"error": "invalid username or password"}')32 return33 create_user(username, password)34 write_response(w, '{"result": "ok"}')35def login(r, w, params):36 d = get_json_or_error(r, w)37 if d is None:38 return39 if "username" not in d or "password" not in d:40 write_response(w, '{"error": "no username or password in body"}')41 return42 username = d["username"]43 password = d["password"]44 u = find_user(username)45 if u is None:46 write_response(w, '{"error": "no such user"}')47 return48 if u.password != password:49 write_response(w, '{"error": "incorrect password"}')50 return51 mng = get_session_manager()52 session_name = mng.init_session()53 mng.update_session(session_name, username)54 set_cookies(w, { 'session': session_name })55 write_response(w, '{"result": "ok"}')56def list_kozinaks(r, w, params):57 os.chdir("/db/kozi")58 kozi = filter(os.path.isfile, os.listdir("/db/kozi"))59 kozi = [os.path.join("/db/kozi", f) for f in kozi]60 kozi.sort(key=lambda x: os.path.getmtime(x))61 kozi = [f[9:] for f in kozi][::-1]62 write_response(w, json.dumps({63 "result": kozi64 }))65def create_kozi(r, w, params):66 cookies = parse_cookies(r)67 if "session" not in cookies:68 write_response(w, '{"error": "auth is required"}')69 return70 mng = get_session_manager()71 username = mng.get_session(cookies["session"])72 if username is None:73 write_response(w, '{"error": "auth is required"}')74 return75 u = find_user(username)76 if u is None:77 write_response(w, '{"error": "auth is required"}')78 return79 d = get_json_or_error(r, w)80 if d is None:81 return82 if "name" not in d or "fortune" not in d or "checksum" not in d:83 write_response(w, '{"error": "no name or fortune or checksum in body"}')84 return85 name = d["name"]86 if not validate(name):87 write_response(w, '{"error": "invalid name"}')88 return89 k = find_kozinak("%s_%s" % (username, name))90 if k is not None:91 write_response(w, '{"error": "kozinak already exists"}')92 return93 fortune = d["fortune"]94 checksum = d["checksum"]95 pipe = d["pipe"] if "pipe" in d else None96 if create_kozinak_chk(name, fortune, checksum, username, pipe):97 write_response(w, '{"result": "ok"}')98 else:99 write_response(w, '{"error": "incorrect checksum"}')100def get_kozi(r, w, params):101 if len(params) != 1:102 write_response(w, '{"error": "no kozi in params"}')103 return104 kozi_name = params[0]105 k = find_kozinak(kozi_name)106 if k is None:107 write_response(w, '{"error": "no such kozinak"}')108 return109 if k.owner != "open":110 cookies = parse_cookies(r)111 if "session" not in cookies:112 write_response(w, '{"error": "auth is required"}')113 return114 mng = get_session_manager()115 username = mng.get_session(cookies["session"])116 if username is None:117 write_response(w, '{"error": "auth is required"}')118 return119 u = find_user(username)120 if u is None:121 write_response(w, '{"error": "auth is required"}')122 return123 if not chk_owner(username, k.owner):124 write_response(w, '{"error": "no access"}')125 return126 write_response(w, json.dumps({127 'result': json.loads(k.to_json())128 }))129def open_kozi(r, w, params):130 cookies = parse_cookies(r)131 if "session" not in cookies:132 write_response(w, '{"error": "auth is required"}')133 return134 mng = get_session_manager()135 username = mng.get_session(cookies["session"])136 if username is None:137 write_response(w, '{"error": "auth is required"}')138 return139 u = find_user(username)140 if u is None:141 write_response(w, '{"error": "auth is required"}')142 return143 d = get_json_or_error(r, w)144 if d is None:145 return146 if "kozi" not in d:147 write_response(w, '{"error": "no kozi in body"}')148 return149 kozi_name = d["kozi"]150 k = find_kozinak(kozi_name)151 if k is None:152 write_response(w, '{"error": "no such kozinak"}')153 return154 if not chk_owner(username, k.owner):155 write_response(w, '{"error": "no access"}')156 return157 k.delete()158 k.set_owner("open")159 k.save()160 write_response(w, '{"result": "ok"}')161def copy_kozi(r, w, params):162 cookies = parse_cookies(r)163 if "session" not in cookies:164 write_response(w, '{"error": "auth is required"}')165 return166 mng = get_session_manager()167 username = mng.get_session(cookies["session"])168 if username is None:169 write_response(w, '{"error": "auth is required"}')170 return171 u = find_user(username)172 if u is None:173 write_response(w, '{"error": "auth is required"}')174 return175 d = get_json_or_error(r, w)176 if d is None:177 return178 if "url" not in d or "name" not in d:179 write_response(w, '{"error": "no name or url in body"}')180 return181 url = d["url"]182 name = d["name"]183 if not validate(name):184 write_response(w, '{"error": "invalid name"}')185 return186 if url.lower().startswith("gopher") or url.lower().startswith("file"):187 write_response(w, '{"error": "invalid url"}')188 return189 req = urlopen(url)190 data = req.read()191 try:192 j = json.loads(data)193 except:194 write_response(w, '{"error": "incorrect json %s"}' % data)195 return196 if "result" not in j:197 write_response(w, '{"error": "incorrect json %s"}' % data)198 return199 j = j["result"]200 if type(j) != type({}):201 write_response(w, '{"error": "incorrect json %s"}' % data)202 return203 if "name" not in j or "fortune" not in j or "owner" not in j or "pipe" not in j:204 write_response(w, '{"error": "no name or fortune or owner or pipe in body"}')205 return206 fortune = j["fortune"]207 pipe = j["pipe"]208 k = find_kozinak("%s_%s" % (username, name))209 if k is not None:210 write_response(w, '{"error": "kozinak already exists"}')211 return212 create_kozinak(name, fortune, username, pipe)213 write_response(w, '{"result": "ok"}')214def exec_kozi(r, w, params):215 if len(params) != 1:216 write_response(w, '{"error": "no kozi in params"}')217 return218 kozi_name = params[0]219 k = find_kozinak(kozi_name)220 if k is None:221 write_response(w, '{"error": "no such kozinak"}')222 return223 if k.owner != "open":224 cookies = parse_cookies(r)225 if "session" not in cookies:226 write_response(w, '{"error": "auth is required"}')227 return228 mng = get_session_manager()229 username = mng.get_session(cookies["session"])230 if username is None:231 write_response(w, '{"error": "auth is required"}')232 return233 u = find_user(username)234 if u is None:235 write_response(w, '{"error": "auth is required"}')236 return237 if not chk_owner(username, k.owner):238 write_response(w, '{"error": "no access"}')239 return240 result = ""241 while True:242 result += k.fortune243 if k.pipe is not None:244 newk = find_kozinak(k.pipe)245 if newk is not None:246 k = newk247 else:248 break249 else:250 break251 write_response(w, json.dumps({252 'result': result...
create_snapshots.py
Source:create_snapshots.py
1import os2import json3from usgs import api4testdir = os.path.dirname(os.path.abspath(__file__))5def write_response(response, filename):6 fpath = os.path.join(testdir, 'data', filename)7 with open(fpath, 'w') as f:8 json.dump(response, f)9def create_snapshots():10 """11 Run requests against USGS API for use in tests.12 """13 api_key = api.login(os.environ['USGS_USERNAME'], os.environ['USGS_PASSWORD'])14 # Dataset Fields15 response = api.dataset_fields("LANDSAT_8_C1", "EE", api_key=api_key)16 write_response(response, 'dataset-fields.json')17 # Datasets18 response = api.datasets(None, "EE")19 write_response(response, 'datasets.json')20 # Download21 response = api.download("LANDSAT_8_C1", "EE", ["LC80810712017104LGN00"], product='STANDARD')22 write_response(response, 'download.json')23 # Download Options24 response = api.download_options("LANDSAT_8_C1", "EE", ["LC80810712017104LGN00"])25 write_response(response, 'download-options.json')26 # Metadata27 response = api.metadata("LANDSAT_8_C1", "EE", ["LC80810712017104LGN00"])28 write_response(response, 'metadata.json')29 # Search30 response = api.search("LANDSAT_8_C1", "EE", start_date='20170401', end_date='20170402', max_results=10)31 write_response(response, 'search.json')32 api.logout(api_key)33if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!