Best Python code snippet using tappy_python
test_operation.py
Source:test_operation.py
...105 def _make_link(link_id):106 return Link(command='', paw='123456', ability=test_ability, id=link_id, executor=next(test_ability.executors))107 return _make_link108@pytest.fixture109def make_test_result():110 def _make_result(link_id):111 result = dict(112 id=link_id,113 output=str(base64.b64encode('10.10.10.10'.encode('utf-8')).decode('utf-8')),114 pid=0,115 status=0116 )117 return Result(**result)118 return _make_result119@pytest.fixture120def op_with_learning_parser(ability, adversary):121 op = Operation(id='12345', name='testA', agents=[], adversary=adversary, use_learning_parsers=True)122 return op123@pytest.fixture124def op_without_learning_parser(ability, adversary):125 op = Operation(id='54321', name='testB', agents=[], adversary=adversary, use_learning_parsers=False)126 return op127@pytest.fixture128def op_with_learning_and_seeded(ability, adversary, operation_agent, parse_datestring):129 sc = Source(id='3124', name='test', facts=[Fact(trait='domain.user.name', value='bob')])130 op = Operation(id='6789', name='testC', agents=[], adversary=adversary, source=sc, use_learning_parsers=True)131 # patch operation to make it 'realistic'132 op.start = parse_datestring(OP_START_TIME)133 op.adversary = op.adversary()134 op.planner = Planner(planner_id='12345', name='test_planner',135 module='not.an.actual.planner', params=None)136 op.objective = Objective(id='6428', name='not_an_objective')137 t_operation_agent = operation_agent138 t_operation_agent.paw = '123456'139 op.agents = [t_operation_agent]140 return op141class TestOperation:142 def test_ran_ability_id(self, ability, adversary):143 op = Operation(name='test', agents=[], adversary=adversary)144 mock_link = MagicMock(spec=Link, ability=ability(ability_id='123'), finish=MOCK_LINK_FINISH_TIME)145 op.chain = [mock_link]146 assert op.ran_ability_id('123')147 def test_event_logs(self, event_loop, op_for_event_logs, operation_agent, file_svc, data_svc, event_log_op_start_time,148 op_agent_creation_time):149 event_loop.run_until_complete(data_svc.remove('agents', match=dict(unique=operation_agent.unique)))150 event_loop.run_until_complete(data_svc.store(operation_agent))151 want_agent_metadata = dict(152 paw='testpaw',153 group='red',154 architecture='amd64',155 username='testagent',156 location=r'C:\Users\Public\test.exe',157 pid=1234,158 ppid=123,159 privilege='User',160 host='WORKSTATION',161 contact='unknown',162 created=op_agent_creation_time,163 )164 want_operation_metadata = dict(165 operation_name='test',166 operation_start=event_log_op_start_time,167 operation_adversary='test adversary',168 )169 want_attack_metadata = dict(170 tactic='test tactic',171 technique_name='test technique',172 technique_id='T0000',173 )174 want = [175 dict(176 command='d2hvYW1p',177 delegated_timestamp=LINK1_DECIDE_TIME,178 collected_timestamp=LINK1_COLLECT_TIME,179 finished_timestamp=LINK1_FINISH_TIME,180 status=0,181 platform='windows',182 executor='psh',183 pid=789,184 agent_metadata=want_agent_metadata,185 ability_metadata=dict(186 ability_id='123',187 ability_name='test ability',188 ability_description='test ability desc',189 ),190 operation_metadata=want_operation_metadata,191 attack_metadata=want_attack_metadata,192 ),193 dict(194 command='aG9zdG5hbWU=',195 delegated_timestamp=LINK2_DECIDE_TIME,196 collected_timestamp=LINK2_COLLECT_TIME,197 finished_timestamp=LINK2_FINISH_TIME,198 status=0,199 platform='windows',200 executor='psh',201 pid=7890,202 agent_metadata=want_agent_metadata,203 ability_metadata=dict(204 ability_id='456',205 ability_name='test ability 2',206 ability_description='test ability 2 desc',207 ),208 operation_metadata=want_operation_metadata,209 attack_metadata=want_attack_metadata,210 ),211 ]212 event_logs = event_loop.run_until_complete(op_for_event_logs.event_logs(file_svc, data_svc))213 assert event_logs == want214 def test_writing_event_logs_to_disk(self, event_loop, op_for_event_logs, operation_agent, file_svc, data_svc,215 event_log_op_start_time, op_agent_creation_time):216 event_loop.run_until_complete(data_svc.remove('agents', match=dict(unique=operation_agent.unique)))217 event_loop.run_until_complete(data_svc.store(operation_agent))218 want_agent_metadata = dict(219 paw='testpaw',220 group='red',221 architecture='amd64',222 username='testagent',223 location=r'C:\Users\Public\test.exe',224 pid=1234,225 ppid=123,226 privilege='User',227 host='WORKSTATION',228 contact='unknown',229 created=op_agent_creation_time,230 )231 want_operation_metadata = dict(232 operation_name='test',233 operation_start=event_log_op_start_time,234 operation_adversary='test adversary',235 )236 want_attack_metadata = dict(237 tactic='test tactic',238 technique_name='test technique',239 technique_id='T0000',240 )241 want = [242 dict(243 command='d2hvYW1p',244 delegated_timestamp=LINK1_DECIDE_TIME,245 collected_timestamp=LINK1_COLLECT_TIME,246 finished_timestamp=LINK1_FINISH_TIME,247 status=0,248 platform='windows',249 executor='psh',250 pid=789,251 agent_metadata=want_agent_metadata,252 ability_metadata=dict(253 ability_id='123',254 ability_name='test ability',255 ability_description='test ability desc',256 ),257 operation_metadata=want_operation_metadata,258 attack_metadata=want_attack_metadata,259 ),260 dict(261 command='aG9zdG5hbWU=',262 delegated_timestamp=LINK2_DECIDE_TIME,263 collected_timestamp=LINK2_COLLECT_TIME,264 finished_timestamp=LINK2_FINISH_TIME,265 status=0,266 platform='windows',267 executor='psh',268 pid=7890,269 agent_metadata=want_agent_metadata,270 ability_metadata=dict(271 ability_id='456',272 ability_name='test ability 2',273 ability_description='test ability 2 desc',274 ),275 operation_metadata=want_operation_metadata,276 attack_metadata=want_attack_metadata,277 ),278 ]279 event_loop.run_until_complete(op_for_event_logs.write_event_logs_to_disk(file_svc, data_svc))280 target_path = '/tmp/event_logs/operation_%s.json' % op_for_event_logs.id281 assert os.path.isfile(target_path)282 try:283 with open(target_path, 'rb') as log_file:284 recorded_log = json.load(log_file)285 assert recorded_log == want286 finally:287 os.remove(target_path)288 @mock.patch.object(Operation, '_emit_state_change_event')289 def test_no_state_change_event_on_instantiation(self, mock_emit_state_change_method, fake_event_svc, adversary):290 Operation(name='test', agents=[], adversary=adversary)291 mock_emit_state_change_method.assert_not_called()292 @mock.patch.object(Operation, '_emit_state_change_event')293 def test_no_state_change_event_fired_when_setting_same_state(self, mock_emit_state_change_method, fake_event_svc,294 adversary):295 initial_state = 'running'296 op = Operation(name='test', agents=[], adversary=adversary, state=initial_state)297 op.state = initial_state298 mock_emit_state_change_method.assert_not_called()299 @mock.patch.object(Operation, '_emit_state_change_event')300 def test_state_change_event_fired_on_state_change(self, mock_emit_state_change_method, fake_event_svc, adversary):301 op = Operation(name='test', agents=[], adversary=adversary, state='running')302 op.state = 'finished'303 mock_emit_state_change_method.assert_called_with(from_state='running', to_state='finished')304 def test_emit_state_change_event(self, event_loop, fake_event_svc, adversary):305 op = Operation(name='test', agents=[], adversary=adversary, state='running')306 fake_event_svc.reset()307 event_loop.run_until_complete(308 op._emit_state_change_event(309 from_state='running',310 to_state='finished'311 )312 )313 expected_key = (Operation.EVENT_EXCHANGE, Operation.EVENT_QUEUE_STATE_CHANGED)314 assert expected_key in fake_event_svc.fired315 event_kwargs = fake_event_svc.fired[expected_key]316 assert event_kwargs['op'] == op.id317 assert event_kwargs['from_state'] == 'running'318 assert event_kwargs['to_state'] == 'finished'319 def test_with_learning_parser(self, event_loop, contact_svc, data_svc, learning_svc, event_svc, op_with_learning_parser,320 make_test_link, make_test_result, knowledge_svc):321 test_link = make_test_link(1234)322 op_with_learning_parser.add_link(test_link)323 test_result = make_test_result(test_link.id)324 event_loop.run_until_complete(data_svc.store(op_with_learning_parser))325 event_loop.run_until_complete(contact_svc._save(test_result))326 assert len(test_link.facts) == 1327 fact = test_link.facts[0]328 assert fact.trait == 'host.ip.address'329 assert fact.value == '10.10.10.10'330 knowledge_data = event_loop.run_until_complete(op_with_learning_parser.all_facts())331 assert len(knowledge_data) == 1332 assert knowledge_data[0].trait == 'host.ip.address'333 assert knowledge_data[0].value == '10.10.10.10'334 def test_without_learning_parser(self, event_loop, app_svc, contact_svc, data_svc, learning_svc, event_svc,335 op_without_learning_parser, make_test_link, make_test_result):336 test_link = make_test_link(5678)337 op_without_learning_parser.add_link(test_link)338 test_result = make_test_result(test_link.id)339 event_loop.run_until_complete(data_svc.store(op_without_learning_parser))340 event_loop.run_until_complete(contact_svc._save(test_result))341 assert len(test_link.facts) == 0342 def test_facts(self, event_loop, app_svc, contact_svc, file_svc, data_svc, learning_svc, event_svc,343 op_with_learning_and_seeded, make_test_link, make_test_result, knowledge_svc):344 test_link = make_test_link(9876)345 op_with_learning_and_seeded.add_link(test_link)346 test_result = make_test_result(test_link.id)347 event_loop.run_until_complete(data_svc.store(op_with_learning_and_seeded))348 event_loop.run_until_complete(op_with_learning_and_seeded._init_source()) # need to call this manually (no 'run')349 event_loop.run_until_complete(contact_svc._save(test_result))350 assert len(test_link.facts) == 1351 fact = test_link.facts[0]352 assert fact.trait == 'host.ip.address'353 assert fact.value == '10.10.10.10'354 knowledge_data = event_loop.run_until_complete(op_with_learning_and_seeded.all_facts())355 assert len(knowledge_data) == 2356 origin_set = [x.source for x in knowledge_data]357 assert op_with_learning_and_seeded.id in origin_set358 assert op_with_learning_and_seeded.source.id in origin_set359 report = event_loop.run_until_complete(op_with_learning_and_seeded.report(file_svc, data_svc))360 assert len(report['facts']) == 2
report.py
Source:report.py
...57 result = Result()58 _set_result(result, steps, status, start_time, end_time)59 return result60_counter = 0 # used to create some unicity in test/suite names/descriptions61def make_test_result(name=None, description=None, steps=(), status=NotImplemented, start_time=NotImplemented, end_time=NotImplemented):62 global _counter63 test = TestResult(name or "test_%d" % _counter, description or "Test %d" % _counter)64 _counter += 165 _set_result(test, steps, status, start_time, end_time)66 return test67def make_suite_result(name=None, description=None, tests=(), sub_suites=(), setup=None, teardown=None,68 start_time=NotImplemented, end_time=NotImplemented):69 global _counter70 suite = SuiteResult(name or "suite_%d" % _counter, description or "Suite %d" % _counter)71 _counter += 172 if setup:73 suite.suite_setup = setup74 if teardown:75 suite.suite_teardown = teardown76 for test in tests:77 suite.add_test(test)78 for sub_suite in sub_suites:79 suite.add_suite(sub_suite)80 results = list(flatten_results([suite]))81 if start_time is not NotImplemented:82 suite.start_time = start_time83 elif results:84 suite.start_time = results[0].start_time85 else:86 suite.start_time = time.time()87 if end_time is not NotImplemented:88 suite.end_time = end_time89 elif results:90 suite.end_time = results[-1].end_time91 else:92 suite.end_time = time.time()93 return suite94def make_report(suites=(), setup=None, teardown=None):95 report = Report()96 if setup:97 report.test_session_setup = setup98 for suite in suites:99 report.add_suite(suite)100 if teardown:101 report.test_session_teardown = teardown102 results = list(report.all_results())103 if results:104 report.start_time = results[0].start_time105 report.end_time = results[-1].end_time106 report.saving_time = time.time()107 else:108 report.start_time = report.end_time = report.saving_time = time.time()109 return report110###111# Some helpful fixtures112###113def make_report_in_progress():114 # create a pseudo report where all elements that can be "in-progress" (meaning without115 # an end time) are present in the report116 now = time.time()117 return make_report(118 setup=make_result(start_time=now, end_time=None),119 teardown=make_result(start_time=now, end_time=None),120 suites=[make_suite_result(121 "suite", "suite",122 start_time=now, end_time=None,123 setup=make_result(start_time=now, end_time=None),124 teardown=make_result(start_time=now, end_time=None),125 tests=[126 make_test_result(127 "test_1", "test_1", start_time=now, end_time=None, status=None,128 steps=[make_step("step", start_time=now, end_time=None, logs=[make_log("info", "message", ts=now)])]129 ),130 make_test_result(131 "test_2", "test_2", start_time=now, end_time=now+1, status="passed",132 steps=[make_step("step", start_time=now, end_time=now+1, logs=[make_log("info", "message", ts=now)])]133 )134 ]135 )]136 )137@pytest.fixture()138def report_in_progress():139 return make_report_in_progress()140@pytest.fixture()141def report_in_progress_path(tmpdir):142 backend = JsonBackend()143 report_path = os.path.join(tmpdir.strpath, "report.json")144 backend.save_report(report_path, make_report_in_progress())...
vhdl_run_test_case.py
Source:vhdl_run_test_case.py
...168 diff_command = diff_command.replace("\\","/")169 print("executing diff command: "+diff_command)170 x=os.system(diff_command)171 return [diff_file, diff_tool]172def make_test_result(child, buildFolder, inputfile, tempoutfile, referencefile,diff_file,diff_tool):173 name = child.get("name")174 entity = str(child.find("entityname").text).strip()175 ret = {}176 ret["name"] = name177 ret["entity"] =entity178 ret["descitption"] =make_description(child, buildFolder)179 ret["InputFile"] = inputfile180 ret["OutputFile"] = tempoutfile181 ret["referencefile"] = referencefile182 ret["diff_file"]= diff_file183 ret["diff_tool"]=diff_tool184 ret["IsDifferent"] = (size_of_file(diff_file) != 0)185 ret["TestType"] = xml_find_or_defult(child,"tc_type","TestCase")186 return ret187class test_case_runner:188 def __init__(self,buildFolder = "build/", reparse = True,update_reference_file=False, verbose = False ) -> None:189 self.buildFolder = buildFolder190 self.reparse = reparse191 self.update_reference_file = update_reference_file192 self.testResults = []193 self.build_systems = list()194 self.verbose = verbose195 def run(self, FileName):196 FileName= make_rel_linux_path(FileName)197 tree = ET.parse(FileName)198 root = tree.getroot()199 split_test_case(FileName)200 filePath = os.path.dirname(FileName)201 202 print(root)203 for child in root:204 name = child.get("name")205 entity = str(child.find("entityname").text).strip()206 entity_folder = self.buildFolder+entity+"/"207 print("Start Running Test-Case: " + name + " for entity: "+entity)208 if entity not in self.build_systems:209 reparse = build_simulation(self, entity=entity , entity_folder=entity_folder)210 211 pre_run_script(child, entity_folder)212 [inputfile, tempoutfile, referencefile] = run_simulation(213 filePath=filePath,214 child=child,215 entity_folder=entity_folder, 216 verbose=self.verbose217 )218 post_run_script(child,entity_folder)219 [diff_file, diff_tool] =diff_output(220 child=child,221 entity_folder=entity_folder,222 tempoutfile=tempoutfile,223 referencefile=referencefile224 )225 226 if self.update_reference_file:227 copyfile(tempoutfile, referencefile)228 229 230 tc_result = make_test_result(231 child, 232 self.buildFolder, 233 inputfile, 234 tempoutfile, 235 referencefile,236 diff_file,237 diff_tool238 )239 240 self.testResults.append(tc_result)241 merge_test_case(FileName)242 return reparse243 def generate_report(self, FileName):244 content = ""...
test_report.py
Source:test_report.py
...23 _test_timestamp_round(1485105524.353112, 1485105524.353)24def test_report_stats_simple():25 report = make_report(suites=[26 make_suite_result(tests=[27 make_test_result(steps=[make_step(logs=[make_check(True)])])28 ])29 ])30 assert_report_stats(report, expected_passed_tests=1)31def test_report_stats_failure():32 report = make_report(suites=[33 make_suite_result(tests=[34 make_test_result(steps=[make_step(logs=[make_check(True), make_log("error")])])35 ])36 ])37 assert_report_stats(report, expected_failed_tests=1)38def test_report_stats_skipped():39 report = make_report(suites=[40 make_suite_result(tests=[41 make_test_result(status="skipped")42 ])43 ])44 assert_report_stats(report, expected_skipped_tests=1)45def test_test_duration_unfinished():46 test = TstResult("test", "test")47 test.start_time = NOW48 assert test.duration is None49def test_test_duration_finished():50 test = TstResult("test", "test")51 test.start_time = NOW52 test.end_time = NOW + 1.053 assert test.duration == 1.054def test_step_duration_unfinished():55 step = Step("step")56 step.start_time = NOW57 assert step.duration is None58def test_step_duration_finished():59 step = Step("step")60 step.start_time = NOW61 step.end_time = NOW + 1.062 assert step.duration == 1.063def test_suite_duration():64 suite = make_suite_result(tests=[65 make_test_result(start_time=NOW, end_time=NOW+1),66 make_test_result(start_time=NOW+1, end_time=NOW+2)67 ])68 assert suite.duration == 269def test_suite_duration_with_setup():70 suite = make_suite_result(71 setup=make_result(start_time=NOW, end_time=NOW+1),72 tests=[make_test_result(start_time=NOW+1, end_time=NOW+2)],73 )74 assert suite.duration == 275def test_suite_duration_with_teardown():76 suite = make_suite_result(77 tests=[make_test_result(start_time=NOW+1, end_time=NOW+2)],78 teardown=make_result(start_time=NOW+2, end_time=NOW+3)79 )80 assert suite.duration == 281def test_is_successful_is_true():82 @lcc.suite("suite")83 class suite:84 @lcc.test("test")85 def test(self):86 lcc.log_info("some log")87 report = run_suite_class(suite)88 assert report.is_successful()89def test_is_successful():90 @lcc.suite("suite")91 class suite:92 @lcc.test("test")93 def test(self):94 lcc.log_error("some error")95 report = run_suite_class(suite)96 assert not report.is_successful()97def test_is_successful_with_disabled_test():98 @lcc.suite("suite")99 class suite:100 @lcc.test("test_1")101 def test_1(self):102 lcc.log_info("some log")103 @lcc.test("test_2")104 @lcc.disabled()105 def test_2(self):106 lcc.log_info("some log")107 report = run_suite_class(suite)108 assert report.is_successful()109def test_is_successful_with_failed_test():110 @lcc.suite("suite")111 class suite:112 @lcc.test("test")113 def test(self):114 lcc.log_error("some error")115 report = run_suite_class(suite)116 assert not report.is_successful()117def test_is_successful_with_failed_teardown():118 @lcc.suite("suite")119 class suite:120 @lcc.test("test")121 def test(self):122 lcc.log_info("some log")123 def teardown_suite(self):124 lcc.log_error("some error")125 report = run_suite_class(suite)126 assert not report.is_successful()127def test_check_report_message_template_ok():128 template = "{passed} test passed"129 assert check_report_message_template(template) == template130def test_check_report_message_template_ko():131 with pytest.raises(ValueError):132 assert check_report_message_template("{invalid_var}'")133@pytest.fixture()134def report_sample():135 @lcc.suite()136 class suite:137 @lcc.test()138 def test_1(self):139 pass140 @lcc.test()141 @lcc.disabled()142 def test_2(self):143 pass144 @lcc.test()145 def test_3(self):146 lcc.log_error("some issue")147 @lcc.test()148 def test_4(self):149 lcc.log_info("everything ok")150 return run_suite_class(suite)151def test_report_build_message_with_start_time(report_sample):152 assert str(CURRENT_YEAR) in report_sample.build_message("{start_time}")153def test_report_build_message_with_end_time(report_sample):154 assert str(CURRENT_YEAR) in report_sample.build_message("{end_time}")155def test_report_build_message_with_duration(report_sample):156 report_sample.end_time = report_sample.start_time + 1157 assert report_sample.build_message("{duration}") == "1s"158def test_report_build_message_with_total(report_sample):159 assert report_sample.build_message("{total}") == "4"160def test_report_build_message_with_enabled(report_sample):161 assert report_sample.build_message("{enabled}") == "3"162def test_report_build_message_with_passed(report_sample):163 assert report_sample.build_message("{passed}") == "2"164def test_report_build_message_with_passed_pct(report_sample):165 assert report_sample.build_message("{passed_pct}") == "66%"166def test_report_build_message_with_failed(report_sample):167 assert report_sample.build_message("{failed}") == "1"168def test_report_build_message_with_failed_pct(report_sample):169 assert report_sample.build_message("{failed_pct}") == "33%"170def test_report_build_message_with_skipped():171 report = make_report([172 make_suite_result(tests=[173 make_test_result(status="passed"), make_test_result(status="passed"), make_test_result(status="skipped")174 ])175 ])176 assert report.build_message("{skipped}") == "1"177def test_report_build_message_with_skipped_pct():178 report = make_report([179 make_suite_result(tests=[180 make_test_result(status="passed"), make_test_result(status="passed"), make_test_result(status="skipped")181 ])182 ])183 assert report.build_message("{skipped_pct}") == "33%"184def test_report_build_message_with_disabled(report_sample):185 assert report_sample.build_message("{disabled}") == "1"186def test_report_build_message_with_disabled_pct(report_sample):187 assert report_sample.build_message("{disabled_pct}") == "25%"...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!