Best Python code snippet using refurb_python
test_tasks.py
Source:test_tasks.py
1# Licensed under the Apache License, Version 2.0 (the "License");2# you may not use this file except in compliance with the License.3# You may obtain a copy of the License at4#5# http://www.apache.org/licenses/LICENSE-2.06#7# Unless required by applicable law or agreed to in writing, software8# distributed under the License is distributed on an "AS IS" BASIS,9# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.10# See the License for the specific language governing permissions and11# limitations under the License.12import celery13import pretend14import pytest15from warehouse.malware import tasks16from warehouse.malware.models import MalwareCheck, MalwareCheckState, MalwareVerdict17from warehouse.malware.services import PrinterMalwareCheckService18from ...common import checks as test_checks19from ...common.db.malware import MalwareCheckFactory, MalwareVerdictFactory20from ...common.db.packaging import FileFactory, ProjectFactory, ReleaseFactory21class TestRunCheck:22 def test_success(self, db_request, monkeypatch):23 db_request.route_url = pretend.call_recorder(lambda *a, **kw: "fake_route")24 monkeypatch.setattr(tasks, "checks", test_checks)25 file0 = FileFactory.create()26 MalwareCheckFactory.create(27 name="ExampleHookedCheck", state=MalwareCheckState.Enabled28 )29 task = pretend.stub()30 tasks.run_check(task, db_request, "ExampleHookedCheck", obj_id=file0.id)31 assert db_request.route_url.calls == [32 pretend.call("packaging.file", path=file0.path)33 ]34 assert db_request.db.query(MalwareVerdict).one()35 @pytest.mark.parametrize(("manually_triggered"), [True, False])36 def test_evaluation_run(self, db_session, monkeypatch, manually_triggered):37 monkeypatch.setattr(tasks, "checks", test_checks)38 MalwareCheckFactory.create(39 name="ExampleScheduledCheck", state=MalwareCheckState.Evaluation40 )41 ProjectFactory.create()42 task = pretend.stub()43 request = pretend.stub(44 db=db_session,45 log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),46 )47 tasks.run_check(48 task,49 request,50 "ExampleScheduledCheck",51 manually_triggered=manually_triggered,52 )53 if manually_triggered:54 assert db_session.query(MalwareVerdict).one()55 else:56 assert request.log.info.calls == [57 pretend.call(58 "ExampleScheduledCheck is in the `evaluation` state and must be \59manually triggered to run."60 )61 ]62 assert db_session.query(MalwareVerdict).all() == []63 def test_disabled_check(self, db_session, monkeypatch):64 monkeypatch.setattr(tasks, "checks", test_checks)65 MalwareCheckFactory.create(66 name="ExampleHookedCheck", state=MalwareCheckState.Disabled67 )68 task = pretend.stub()69 request = pretend.stub(70 db=db_session,71 log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),72 )73 file = FileFactory.create()74 tasks.run_check(task, request, "ExampleHookedCheck", obj_id=file.id)75 assert request.log.info.calls == [76 pretend.call("Check ExampleHookedCheck isn't active. Aborting.")77 ]78 def test_missing_check(self, db_request, monkeypatch):79 monkeypatch.setattr(tasks, "checks", test_checks)80 task = pretend.stub()81 with pytest.raises(AttributeError):82 tasks.run_check(task, db_request, "DoesNotExistCheck")83 def test_missing_obj_id(self, db_session, monkeypatch):84 monkeypatch.setattr(tasks, "checks", test_checks)85 task = pretend.stub()86 MalwareCheckFactory.create(87 name="ExampleHookedCheck", state=MalwareCheckState.Enabled88 )89 task = pretend.stub()90 request = pretend.stub(91 db=db_session,92 log=pretend.stub(error=pretend.call_recorder(lambda *args, **kwargs: None)),93 )94 tasks.run_check(task, request, "ExampleHookedCheck")95 assert request.log.error.calls == [96 pretend.call(97 "Fatal exception: ExampleHookedCheck: Missing required kwarg `obj_id`"98 )99 ]100 def test_retry(self, db_session, monkeypatch):101 monkeypatch.setattr(tasks, "checks", test_checks)102 exc = Exception("Scan failed")103 def scan(self, **kwargs):104 raise exc105 monkeypatch.setattr(tasks.checks.ExampleHookedCheck, "scan", scan)106 MalwareCheckFactory.create(107 name="ExampleHookedCheck", state=MalwareCheckState.Enabled108 )109 task = pretend.stub(110 retry=pretend.call_recorder(pretend.raiser(celery.exceptions.Retry))111 )112 request = pretend.stub(113 db=db_session,114 log=pretend.stub(error=pretend.call_recorder(lambda *args, **kwargs: None)),115 route_url=pretend.call_recorder(lambda *a, **kw: pretend.stub()),116 )117 file = FileFactory.create()118 with pytest.raises(celery.exceptions.Retry):119 tasks.run_check(task, request, "ExampleHookedCheck", obj_id=file.id)120 assert request.log.error.calls == [121 pretend.call("Error executing check ExampleHookedCheck: Scan failed")122 ]123 assert task.retry.calls == [pretend.call(exc=exc)]124class TestRunScheduledCheck:125 def test_invalid_check_name(self, db_request, monkeypatch):126 monkeypatch.setattr(tasks, "checks", test_checks)127 task = pretend.stub()128 with pytest.raises(AttributeError):129 tasks.run_scheduled_check(task, db_request, "DoesNotExist")130 def test_run_check(self, db_session, capfd, monkeypatch):131 MalwareCheckFactory.create(132 name="ExampleScheduledCheck", state=MalwareCheckState.Enabled133 )134 request = pretend.stub(135 db=db_session,136 find_service_factory=pretend.call_recorder(137 lambda interface: PrinterMalwareCheckService.create_service138 ),139 )140 task = pretend.stub()141 tasks.run_scheduled_check(task, request, "ExampleScheduledCheck")142 assert request.find_service_factory.calls == [143 pretend.call(tasks.IMalwareCheckService)144 ]145 out, err = capfd.readouterr()146 assert out == "ExampleScheduledCheck {'manually_triggered': False}\n"147class TestBackfill:148 def test_invalid_check_name(self, db_request, monkeypatch):149 monkeypatch.setattr(tasks, "checks", test_checks)150 task = pretend.stub()151 with pytest.raises(AttributeError):152 tasks.backfill(task, db_request, "DoesNotExist", 1)153 @pytest.mark.parametrize(154 ("num_objects", "num_runs"), [(11, 1), (11, 11), (101, 90)]155 )156 def test_run(self, db_session, capfd, num_objects, num_runs, monkeypatch):157 monkeypatch.setattr(tasks, "checks", test_checks)158 ids = []159 for i in range(num_objects):160 ids.append(FileFactory.create().id)161 MalwareCheckFactory.create(162 name="ExampleHookedCheck", state=MalwareCheckState.Enabled163 )164 request = pretend.stub(165 db=db_session,166 log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),167 find_service_factory=pretend.call_recorder(168 lambda interface: PrinterMalwareCheckService.create_service169 ),170 )171 task = pretend.stub()172 tasks.backfill(task, request, "ExampleHookedCheck", num_runs)173 assert request.log.info.calls == [174 pretend.call("Running backfill on %d Files." % num_runs)175 ]176 assert request.find_service_factory.calls == [177 pretend.call(tasks.IMalwareCheckService)178 ]179 out, err = capfd.readouterr()180 num_output_lines = 0181 for file_id in ids:182 logged_output = "ExampleHookedCheck:%s %s\n" % (183 file_id,184 {"manually_triggered": True},185 )186 num_output_lines += 1 if logged_output in out else 0187 assert num_output_lines == num_runs188class TestSyncChecks:189 def test_no_updates(self, db_session, monkeypatch):190 monkeypatch.setattr(tasks, "checks", test_checks)191 monkeypatch.setattr(tasks.checks.ExampleScheduledCheck, "version", 2)192 MalwareCheckFactory.create(193 name="ExampleHookedCheck", state=MalwareCheckState.Disabled194 )195 MalwareCheckFactory.create(196 name="ExampleScheduledCheck", state=MalwareCheckState.Disabled197 )198 MalwareCheckFactory.create(199 name="ExampleScheduledCheck", state=MalwareCheckState.Enabled, version=2200 )201 task = pretend.stub()202 request = pretend.stub(203 db=db_session,204 log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),205 )206 tasks.sync_checks(task, request)207 assert request.log.info.calls == [208 pretend.call("2 malware checks found in codebase."),209 pretend.call("ExampleHookedCheck is unmodified."),210 pretend.call("ExampleScheduledCheck is unmodified."),211 ]212 @pytest.mark.parametrize(213 ("final_state"), [MalwareCheckState.Enabled, MalwareCheckState.Disabled]214 )215 def test_upgrade_check(self, monkeypatch, db_session, final_state):216 monkeypatch.setattr(tasks, "checks", test_checks)217 monkeypatch.setattr(tasks.checks.ExampleHookedCheck, "version", 2)218 MalwareCheckFactory.create(name="ExampleHookedCheck", state=final_state)219 MalwareCheckFactory.create(220 name="ExampleScheduledCheck", state=MalwareCheckState.Disabled221 )222 task = pretend.stub()223 request = pretend.stub(224 db=db_session,225 log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),226 )227 tasks.sync_checks(task, request)228 assert request.log.info.calls == [229 pretend.call("2 malware checks found in codebase."),230 pretend.call("Updating existing ExampleHookedCheck."),231 pretend.call("ExampleScheduledCheck is unmodified."),232 ]233 db_checks = (234 db_session.query(MalwareCheck)235 .filter(MalwareCheck.name == "ExampleHookedCheck")236 .all()237 )238 assert len(db_checks) == 2239 if final_state == MalwareCheckState.Disabled:240 assert (241 db_checks[0].state == db_checks[1].state == MalwareCheckState.Disabled242 )243 else:244 for c in db_checks:245 if c.state == final_state:246 assert c.version == 2247 else:248 assert c.version == 1249 def test_one_new_check(self, db_session, monkeypatch):250 monkeypatch.setattr(tasks, "checks", test_checks)251 MalwareCheckFactory.create(252 name="ExampleHookedCheck", state=MalwareCheckState.Disabled253 )254 MalwareCheckFactory.create(255 name="ExampleScheduledCheck", state=MalwareCheckState.Disabled256 )257 task = pretend.stub()258 class FakeMalwareCheck:259 version = 1260 short_description = "This is a short description."261 long_description = "This is a longer description."262 check_type = "scheduled"263 schedule = {"minute": "0", "hour": "*/8"}264 tasks.checks.FakeMalwareCheck = FakeMalwareCheck265 request = pretend.stub(266 db=db_session,267 log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),268 )269 tasks.sync_checks(task, request)270 assert request.log.info.calls == [271 pretend.call("3 malware checks found in codebase."),272 pretend.call("ExampleHookedCheck is unmodified."),273 pretend.call("ExampleScheduledCheck is unmodified."),274 pretend.call("Adding new FakeMalwareCheck to the database."),275 ]276 assert db_session.query(MalwareCheck).count() == 3277 new_check = (278 db_session.query(MalwareCheck)279 .filter(MalwareCheck.name == "FakeMalwareCheck")280 .one()281 )282 assert new_check.state == MalwareCheckState.Disabled283 del tasks.checks.FakeMalwareCheck284 def test_too_many_db_checks(self, db_session, monkeypatch):285 monkeypatch.setattr(tasks, "checks", test_checks)286 MalwareCheckFactory.create(287 name="ExampleHookedCheck", state=MalwareCheckState.Enabled288 )289 MalwareCheckFactory.create(290 name="ExampleScheduledCheck", state=MalwareCheckState.Enabled291 )292 MalwareCheckFactory.create(293 name="AnotherCheck", state=MalwareCheckState.Evaluation, version=2294 )295 task = pretend.stub()296 request = pretend.stub(297 db=db_session,298 log=pretend.stub(299 info=pretend.call_recorder(lambda *args, **kwargs: None),300 error=pretend.call_recorder(lambda *args, **kwargs: None),301 ),302 )303 with pytest.raises(Exception):304 tasks.sync_checks(task, request)305 assert request.log.info.calls == [306 pretend.call("2 malware checks found in codebase.")307 ]308 assert request.log.error.calls == [309 pretend.call(310 "Found 3 active checks in the db, but only 2 checks in code. Please \311manually move superfluous checks to the wiped_out state in the check admin: \312AnotherCheck"313 )314 ]315 def test_only_wiped_out(self, db_session, monkeypatch):316 monkeypatch.setattr(tasks, "checks", test_checks)317 MalwareCheckFactory.create(318 name="ExampleHookedCheck", state=MalwareCheckState.WipedOut319 )320 MalwareCheckFactory.create(321 name="ExampleScheduledCheck", state=MalwareCheckState.WipedOut322 )323 task = pretend.stub()324 request = pretend.stub(325 db=db_session,326 log=pretend.stub(327 info=pretend.call_recorder(lambda *args, **kwargs: None),328 error=pretend.call_recorder(lambda *args, **kwargs: None),329 ),330 )331 tasks.sync_checks(task, request)332 assert request.log.info.calls == [333 pretend.call("2 malware checks found in codebase.")334 ]335 assert request.log.error.calls == [336 pretend.call(337 "ExampleHookedCheck is wiped_out and cannot be synced. Please remove check \338from codebase."339 ),340 pretend.call(341 "ExampleScheduledCheck is wiped_out and cannot be synced. Please remove check \342from codebase."343 ),344 ]345class TestRemoveVerdicts:346 def test_no_verdicts(self, db_session):347 check = MalwareCheckFactory.create()348 request = pretend.stub(349 db=db_session,350 log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),351 )352 task = pretend.stub()353 removed = tasks.remove_verdicts(task, request, check.name)354 assert request.log.info.calls == [355 pretend.call(356 "Removing 0 malware verdicts associated with %s version 1." % check.name357 )358 ]359 assert removed == 0360 @pytest.mark.parametrize(("check_with_verdicts"), [True, False])361 def test_many_verdicts(self, db_session, check_with_verdicts):362 check0 = MalwareCheckFactory.create()363 check1 = MalwareCheckFactory.create()364 project = ProjectFactory.create(name="foo")365 release = ReleaseFactory.create(project=project)366 file0 = FileFactory.create(release=release, filename="foo.bar")367 num_verdicts = 10368 for i in range(num_verdicts):369 MalwareVerdictFactory.create(check=check1, release_file=file0)370 assert db_session.query(MalwareVerdict).count() == num_verdicts371 request = pretend.stub(372 db=db_session,373 log=pretend.stub(info=pretend.call_recorder(lambda *args, **kwargs: None)),374 )375 task = pretend.stub()376 if check_with_verdicts:377 wiped_out_check = check1378 else:379 wiped_out_check = check0380 num_verdicts = 0381 removed = tasks.remove_verdicts(task, request, wiped_out_check.name)382 assert request.log.info.calls == [383 pretend.call(384 "Removing %d malware verdicts associated with %s version 1."385 % (num_verdicts, wiped_out_check.name)386 )387 ]...
test_doctor.py
Source:test_doctor.py
1""" Doctor command unit tests """2# standard python imports3import subprocess4import time5import unittest.mock as mock6# 3rd party imports7import pytest8# two1 imports9# importing the class directly to get around renaming doctor10from two1.commands.doctor import Check11from two1.commands.doctor import Doctor12@pytest.mark.parametrize("url, return_value, server_port", [13 ("http://0.0.0.0:8000", True, 8000),14 ("http://0.0.0.0:8001", False, 8000),15 ("http://0.0.0.0", False, 8000),16 ("https://0.0.0.0:8000", True, 8000),17 ("https://0.0.0.0", False, 8000),18 ])19def test_make_http_connection(doctor, url, return_value, server_port):20 """ Fires up an http server to check functionality of make_connection """21 server_cmd = "python3 -m http.server {}"22 proc = subprocess.Popen([server_cmd.format(server_port)], shell=True, stdout=subprocess.PIPE)23 time.sleep(.5)24 assert doctor.make_http_connection(url) == return_value25 proc.kill()26 proc.wait()27@pytest.mark.parametrize("checks, expected_length", [28 ({29 "type1": [30 Check("name", "message", "value", Check.Result.PASS),31 Check("name", "message", "value", Check.Result.FAIL),32 Check("name", "message", "value", Check.Result.SKIP),33 Check("name", "message", "value", Check.Result.WARN)]}, 4),34 ({35 "type1": [36 Check("name", "message", "value", Check.Result.FAIL),37 Check("name", "message", "value", Check.Result.FAIL)],38 "type2": [39 Check("name", "message", "value", Check.Result.FAIL),40 Check("name", "message", "value", Check.Result.FAIL)]}, 4),41 ({"type1": []}, 0)])42def test_get_checks(doctor, checks, expected_length):43 """ Ensures the function get_checks is returning a flat list of checks """44 # sets the checks list manually instead of running tests45 doctor.checks = checks46 returned_checks = doctor.get_checks()47 assert isinstance(returned_checks, list)48 assert len(returned_checks) == expected_length49@pytest.mark.parametrize("checks, expected_length, result_filter", [50 ({51 "type1": [52 Check("name", "message", "value", Check.Result.PASS),53 Check("name", "message", "value", Check.Result.FAIL),54 Check("name", "message", "value", Check.Result.SKIP),55 Check("name", "message", "value", Check.Result.WARN)]},56 1,57 Check.Result.PASS),58 ({59 "type1": [60 Check("name", "message", "value", Check.Result.FAIL),61 Check("name", "message", "value", Check.Result.FAIL)],62 "type2": [63 Check("name", "message", "value", Check.Result.FAIL),64 Check("name", "message", "value", Check.Result.FAIL)]},65 0,66 Check.Result.WARN),67 ({68 "type1": [69 Check("name", "message", "value", Check.Result.FAIL),70 Check("name", "message", "value", Check.Result.FAIL)],71 "type2": [72 Check("name", "message", "value", Check.Result.FAIL),73 Check("name", "message", "value", Check.Result.FAIL)]},74 4,75 Check.Result.FAIL),76 ({"type1": []}, 0, Check.Result.PASS),77 ({"type1": []}, 0, "donkey")])78def test_get_checks_with_result_filter(doctor, checks, expected_length, result_filter):79 """ Checks that get_checks returns a flat list of checks and uses a search filter """80 # sets the checks list manually instead of running tests81 doctor.checks = checks82 if not isinstance(result_filter, Check.Result):83 with pytest.raises(ValueError):84 returned_checks = doctor.get_checks(result_filter)85 else:86 returned_checks = doctor.get_checks(result_filter)87 assert isinstance(returned_checks, list)88 assert len(returned_checks) == expected_length89@pytest.mark.parametrize("test_checks", [90 ({91 "type1": [92 Check("name", "message", "value", Check.Result.PASS),93 Check("name", "message", "value", Check.Result.FAIL),94 Check("name", "message", "value", Check.Result.SKIP),95 Check("name", "message", "value", Check.Result.WARN)]}),96 ({97 "type1": [98 Check("name", "message", "value", Check.Result.FAIL),99 Check("name", "message", "value", Check.Result.FAIL)],100 "type2": [101 Check("name", "message", "value", Check.Result.FAIL),102 Check("name", "message", "value", Check.Result.FAIL)]}),103 ({"type1": []})])104def test_to_dict(doctor, test_checks):105 """ Ensures that the Doctor.to_doct is returning the correct dict values106 for all check data members107 """108 # sets the checks list manually instead of running tests109 doctor.checks = test_checks110 doc_dict = doctor.to_dict()111 assert isinstance(doc_dict, dict)112 for check_type in test_checks.keys():113 for check_obj, check_dict in zip(test_checks[check_type], doc_dict[check_type]):114 assert check_obj.name == check_dict["name"]115 assert check_obj.message == check_dict["message"]116 assert check_obj.value == check_dict["value"]117 assert check_obj.result.name == check_dict["result"]118@pytest.mark.integration119def test_doctor_integration(doctor):120 """ Runs the full doctor suite of tests and ensures there are no failures"""121 specialties = Doctor.SPECIALTIES122 # Gets a dict of the types of checks and the functions of that check type as a list123 expected_doctor_checks = {check_type: [] for check_type in specialties.keys()}124 for attr_name in dir(doctor):125 for check_type in specialties.keys():126 if attr_name.startswith("check_{}".format(check_type)) and callable(getattr(doctor, attr_name)):127 expected_doctor_checks[check_type].append(attr_name)128 # runs each of the different checks129 for check_type in expected_doctor_checks.keys():130 doctor.checkup(check_type)131 # gets the results from all checks in dict form132 appointment_results = doctor.to_dict()133 # iterates over all expected checks ensuring they were actually called134 for check_type in expected_doctor_checks.keys():135 expected_check_functions = expected_doctor_checks[check_type]136 actual_check_functions = appointment_results[check_type]137 for check_result in actual_check_functions:138 assert check_result['name'] in expected_check_functions139 assert len(actual_check_functions) == len(expected_check_functions)140 # makes sure there are no failures141 assert len(doctor.get_checks(Check.Result.FAIL)) == 0142@pytest.mark.parametrize('system, release_os, check_status', [143 ('Linux', '4.0.0', Check.Result.PASS),144 ('Darwin', '14.5.0', Check.Result.PASS),145])146def test_doctor_operating_system_check(doctor, system, release_os, check_status):147 """ Unit test the ability to check the user's operating system."""148 with mock.patch('platform.system', mock.Mock(return_value=system)), mock.patch('platform.release', mock.Mock(return_value=release_os)): # nopep8149 status, _, actual_os = doctor.check_general_operating_system_release()150 assert status == check_status...
verification_program.py
Source:verification_program.py
1import sys2import subprocess3import numpy as np4args = sys.argv5exe_path = sys.argv[1]6n_tests = int(sys.argv[-1])7print(n_tests)8test_checks = []9for i in range(n_tests):10 random_permutation = np.random.permutation(range(1, 101))11 random_permutation_args = [f'{n}' for n in random_permutation]12 sleep_sort_args = [exe_path] + random_permutation_args13 result = subprocess.run(sleep_sort_args, stdout=subprocess.PIPE).stdout.decode('utf-8')14 result_lst = result.split(' ')[:-1]15 result_lst = list(map(int, result_lst))16 check = all(result_lst[i] < result_lst[i + 1] for i in range(len(result_lst) - 1))17 test_checks.append(check)18print(f'{sum(test_checks)} / {len(test_checks)}')19if sum(test_checks) / len(test_checks) < 0.9:20 print('Not OK')21else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!