Best Python code snippet using playwright-python
test_emitter.py
Source:test_emitter.py
...79 time.sleep(5)80 if issubclass(exc[0], Empty) and platform.is_windows():81 return True82 return False83def expect_event(expected_event, timeout=2):84 """ Utility function to wait up to `timeout` seconds for an `event_type` for `path` to show up in the queue.85 Provides some robustness for the otherwise flaky nature of asynchronous notifications.86 """87 try:88 event = event_queue.get(timeout=timeout)[0]89 assert event == expected_event90 except Empty:91 raise92@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)93def test_create():94 start_watching()95 open(p('a'), 'a').close()96 expect_event(FileCreatedEvent(p('a')))97 if not platform.is_windows():98 expect_event(DirModifiedEvent(p()))99 if platform.is_linux():100 event = event_queue.get(timeout=5)[0]101 assert event.src_path == p('a')102 assert isinstance(event, FileClosedEvent)103@pytest.mark.skipif(not platform.is_linux(), reason="FileCloseEvent only supported in GNU/Linux")104@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)105def test_close():106 f_d = open(p('a'), 'a')107 start_watching()108 f_d.close()109 # After file creation/open in append mode110 event = event_queue.get(timeout=5)[0]111 assert event.src_path == p('a')112 assert isinstance(event, FileClosedEvent)113 event = event_queue.get(timeout=5)[0]114 assert os.path.normpath(event.src_path) == os.path.normpath(p(''))115 assert isinstance(event, DirModifiedEvent)116 # After read-only, only IN_CLOSE_NOWRITE is emitted but not catched for now #747117 open(p('a'), 'r').close()118 assert event_queue.empty()119@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)120@pytest.mark.skipif(121 platform.is_darwin() or platform.is_windows(),122 reason="Windows and macOS enforce proper encoding"123)124def test_create_wrong_encoding():125 start_watching()126 open(p('a_\udce4'), 'a').close()127 event = event_queue.get(timeout=5)[0]128 assert event.src_path == p('a_\udce4')129 assert isinstance(event, FileCreatedEvent)130 if not platform.is_windows():131 event = event_queue.get(timeout=5)[0]132 assert os.path.normpath(event.src_path) == os.path.normpath(p(''))133 assert isinstance(event, DirModifiedEvent)134@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)135def test_delete():136 mkfile(p('a'))137 start_watching()138 rm(p('a'))139 expect_event(FileDeletedEvent(p('a')))140 if not platform.is_windows():141 expect_event(DirModifiedEvent(p()))142@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)143def test_modify():144 mkfile(p('a'))145 start_watching()146 touch(p('a'))147 expect_event(FileModifiedEvent(p('a')))148 if platform.is_linux():149 event = event_queue.get(timeout=5)[0]150 assert event.src_path == p('a')151 assert isinstance(event, FileClosedEvent)152@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)153def test_chmod():154 mkfile(p('a'))155 start_watching()156 # Note: We use S_IREAD here because chmod on Windows only157 # allows setting the read-only flag.158 os.chmod(p('a'), stat.S_IREAD)159 expect_event(FileModifiedEvent(p('a')))160 # Reset permissions to allow cleanup.161 os.chmod(p('a'), stat.S_IWRITE)162@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)163def test_move():164 mkdir(p('dir1'))165 mkdir(p('dir2'))166 mkfile(p('dir1', 'a'))167 start_watching()168 mv(p('dir1', 'a'), p('dir2', 'b'))169 if not platform.is_windows():170 expect_event(FileMovedEvent(p('dir1', 'a'), p('dir2', 'b')))171 else:172 event = event_queue.get(timeout=5)[0]173 assert event.src_path == p('dir1', 'a')174 assert isinstance(event, FileDeletedEvent)175 event = event_queue.get(timeout=5)[0]176 assert event.src_path == p('dir2', 'b')177 assert isinstance(event, FileCreatedEvent)178 event = event_queue.get(timeout=5)[0]179 assert event.src_path in [p('dir1'), p('dir2')]180 assert isinstance(event, DirModifiedEvent)181 if not platform.is_windows():182 event = event_queue.get(timeout=5)[0]183 assert event.src_path in [p('dir1'), p('dir2')]184 assert isinstance(event, DirModifiedEvent)185@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)186def test_case_change():187 mkdir(p('dir1'))188 mkdir(p('dir2'))189 mkfile(p('dir1', 'file'))190 start_watching()191 mv(p('dir1', 'file'), p('dir2', 'FILE'))192 if not platform.is_windows():193 expect_event(FileMovedEvent(p('dir1', 'file'), p('dir2', 'FILE')))194 else:195 event = event_queue.get(timeout=5)[0]196 assert event.src_path == p('dir1', 'file')197 assert isinstance(event, FileDeletedEvent)198 event = event_queue.get(timeout=5)[0]199 assert event.src_path == p('dir2', 'FILE')200 assert isinstance(event, FileCreatedEvent)201 event = event_queue.get(timeout=5)[0]202 assert event.src_path in [p('dir1'), p('dir2')]203 assert isinstance(event, DirModifiedEvent)204 if not platform.is_windows():205 event = event_queue.get(timeout=5)[0]206 assert event.src_path in [p('dir1'), p('dir2')]207 assert isinstance(event, DirModifiedEvent)208@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)209def test_move_to():210 mkdir(p('dir1'))211 mkdir(p('dir2'))212 mkfile(p('dir1', 'a'))213 start_watching(p('dir2'))214 mv(p('dir1', 'a'), p('dir2', 'b'))215 expect_event(FileCreatedEvent(p('dir2', 'b')))216 if not platform.is_windows():217 expect_event(DirModifiedEvent(p('dir2')))218@pytest.mark.skipif(not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux")219def test_move_to_full():220 mkdir(p('dir1'))221 mkdir(p('dir2'))222 mkfile(p('dir1', 'a'))223 start_watching(p('dir2'), use_full_emitter=True)224 mv(p('dir1', 'a'), p('dir2', 'b'))225 event = event_queue.get(timeout=5)[0]226 assert isinstance(event, FileMovedEvent)227 assert event.dest_path == p('dir2', 'b')228 assert event.src_path is None # Should equal None since the path was not watched229@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)230def test_move_from():231 mkdir(p('dir1'))232 mkdir(p('dir2'))233 mkfile(p('dir1', 'a'))234 start_watching(p('dir1'))235 mv(p('dir1', 'a'), p('dir2', 'b'))236 expect_event(FileDeletedEvent(p('dir1', 'a')))237 if not platform.is_windows():238 expect_event(DirModifiedEvent(p('dir1')))239@pytest.mark.skipif(not platform.is_linux(), reason="InotifyFullEmitter only supported in Linux")240def test_move_from_full():241 mkdir(p('dir1'))242 mkdir(p('dir2'))243 mkfile(p('dir1', 'a'))244 start_watching(p('dir1'), use_full_emitter=True)245 mv(p('dir1', 'a'), p('dir2', 'b'))246 event = event_queue.get(timeout=5)[0]247 assert isinstance(event, FileMovedEvent)248 assert event.src_path == p('dir1', 'a')249 assert event.dest_path is None # Should equal None since path not watched250@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)251def test_separate_consecutive_moves():252 mkdir(p('dir1'))253 mkfile(p('dir1', 'a'))254 mkfile(p('b'))255 start_watching(p('dir1'))256 mv(p('dir1', 'a'), p('c'))257 mv(p('b'), p('dir1', 'd'))258 dir_modif = DirModifiedEvent(p('dir1'))259 a_deleted = FileDeletedEvent(p('dir1', 'a'))260 d_created = FileCreatedEvent(p('dir1', 'd'))261 expected_events = [a_deleted, dir_modif, d_created, dir_modif]262 if platform.is_windows():263 expected_events = [a_deleted, d_created]264 if platform.is_bsd():265 # Due to the way kqueue works, we can't really order266 # 'Created' and 'Deleted' events in time, so creation queues first267 expected_events = [d_created, a_deleted, dir_modif, dir_modif]268 for expected_event in expected_events:269 expect_event(expected_event)270@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)271@pytest.mark.skipif(platform.is_bsd(), reason="BSD create another set of events for this test")272def test_delete_self():273 mkdir(p('dir1'))274 start_watching(p('dir1'))275 rm(p('dir1'), True)276 expect_event(DirDeletedEvent(p('dir1')))277 emitter.join(5)278 assert not emitter.is_alive()279@pytest.mark.skipif(platform.is_windows() or platform.is_bsd(),280 reason="Windows|BSD create another set of events for this test")281def test_fast_subdirectory_creation_deletion():282 root_dir = p('dir1')283 sub_dir = p('dir1', 'subdir1')284 times = 30285 mkdir(root_dir)286 start_watching(root_dir)287 for _ in range(times):288 mkdir(sub_dir)289 rm(sub_dir, True)290 time.sleep(0.1) # required for macOS emitter to catch up with us291 count = {DirCreatedEvent: 0,292 DirModifiedEvent: 0,293 DirDeletedEvent: 0}294 etype_for_dir = {DirCreatedEvent: sub_dir,295 DirModifiedEvent: root_dir,296 DirDeletedEvent: sub_dir}297 for _ in range(times * 4):298 event = event_queue.get(timeout=5)[0]299 logger.debug(event)300 etype = type(event)301 count[etype] += 1302 assert event.src_path == etype_for_dir[etype]303 assert count[DirCreatedEvent] >= count[DirDeletedEvent]304 assert count[DirCreatedEvent] + count[DirDeletedEvent] >= count[DirModifiedEvent]305 assert count == {DirCreatedEvent: times,306 DirModifiedEvent: times * 2,307 DirDeletedEvent: times}308@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)309def test_passing_unicode_should_give_unicode():310 start_watching(str(p()))311 mkfile(p('a'))312 event = event_queue.get(timeout=5)[0]313 assert isinstance(event.src_path, str)314@pytest.mark.skipif(platform.is_windows(),315 reason="Windows ReadDirectoryChangesW supports only"316 " unicode for paths.")317def test_passing_bytes_should_give_bytes():318 start_watching(p().encode())319 mkfile(p('a'))320 event = event_queue.get(timeout=5)[0]321 assert isinstance(event.src_path, bytes)322@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)323def test_recursive_on():324 mkdir(p('dir1', 'dir2', 'dir3'), True)325 start_watching()326 touch(p('dir1', 'dir2', 'dir3', 'a'))327 event = event_queue.get(timeout=5)[0]328 assert event.src_path == p('dir1', 'dir2', 'dir3', 'a')329 assert isinstance(event, FileCreatedEvent)330 if not platform.is_windows():331 event = event_queue.get(timeout=5)[0]332 assert event.src_path == p('dir1', 'dir2', 'dir3')333 assert isinstance(event, DirModifiedEvent)334 if not platform.is_bsd():335 event = event_queue.get(timeout=5)[0]336 assert event.src_path == p('dir1', 'dir2', 'dir3', 'a')337 assert isinstance(event, FileModifiedEvent)338@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)339def test_recursive_off():340 mkdir(p('dir1'))341 start_watching(recursive=False)342 touch(p('dir1', 'a'))343 with pytest.raises(Empty):344 event_queue.get(timeout=5)345 mkfile(p('b'))346 expect_event(FileCreatedEvent(p('b')))347 if not platform.is_windows():348 expect_event(DirModifiedEvent(p()))349 if platform.is_linux():350 expect_event(FileClosedEvent(p('b')))351 # currently limiting these additional events to macOS only, see https://github.com/gorakhargosh/watchdog/pull/779352 if platform.is_darwin():353 mkdir(p('dir1', 'dir2'))354 with pytest.raises(Empty):355 event_queue.get(timeout=5)356 mkfile(p('dir1', 'dir2', 'somefile'))357 with pytest.raises(Empty):358 event_queue.get(timeout=5)359 mkdir(p('dir3'))360 expect_event(DirModifiedEvent(p())) # the contents of the parent directory changed361 mv(p('dir1', 'dir2', 'somefile'), p('somefile'))362 expect_event(FileMovedEvent(p('dir1', 'dir2', 'somefile'), p('somefile')))363 expect_event(DirModifiedEvent(p()))364 mv(p('dir1', 'dir2'), p('dir2'))365 expect_event(DirMovedEvent(p('dir1', 'dir2'), p('dir2')))366 expect_event(DirModifiedEvent(p()))367@pytest.mark.skipif(platform.is_windows(),368 reason="Windows create another set of events for this test")369def test_renaming_top_level_directory():370 start_watching()371 mkdir(p('a'))372 expect_event(DirCreatedEvent(p('a')))373 expect_event(DirModifiedEvent(p()))374 mkdir(p('a', 'b'))375 expect_event(DirCreatedEvent(p('a', 'b')))376 expect_event(DirModifiedEvent(p('a')))377 mv(p('a'), p('a2'))378 expect_event(DirMovedEvent(p('a'), p('a2')))379 expect_event(DirModifiedEvent(p()))380 expect_event(DirModifiedEvent(p()))381 expect_event(DirMovedEvent(p('a', 'b'), p('a2', 'b')))382 if platform.is_bsd():383 expect_event(DirModifiedEvent(p()))384 open(p('a2', 'b', 'c'), 'a').close()385 # DirModifiedEvent may emitted, but sometimes after waiting time is out.386 events = []387 while True:388 events.append(event_queue.get(timeout=5)[0])389 if event_queue.empty():390 break391 assert all([isinstance(e, (FileCreatedEvent, FileMovedEvent, DirModifiedEvent, FileClosedEvent)) for e in events])392 for event in events:393 if isinstance(event, FileCreatedEvent):394 assert event.src_path == p('a2', 'b', 'c')395 elif isinstance(event, FileMovedEvent):396 assert event.dest_path == p('a2', 'b', 'c')397 assert event.src_path == p('a', 'b', 'c')398 elif isinstance(event, DirModifiedEvent):399 assert event.src_path == p('a2', 'b')400@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)401@pytest.mark.skipif(not platform.is_windows(),402 reason="Non-Windows create another set of events for this test")403def test_renaming_top_level_directory_on_windows():404 start_watching()405 mkdir(p('a'))406 event = event_queue.get(timeout=5)[0]407 assert isinstance(event, DirCreatedEvent)408 assert event.src_path == p('a')409 mkdir(p('a', 'b'))410 event = event_queue.get(timeout=5)[0]411 assert isinstance(event, DirCreatedEvent)412 assert event.src_path == p('a', 'b')413 event = event_queue.get(timeout=5)[0]414 assert isinstance(event, DirCreatedEvent)415 assert event.src_path == p('a', 'b')416 event = event_queue.get(timeout=5)[0]417 assert isinstance(event, DirModifiedEvent)418 assert event.src_path == p('a')419 mv(p('a'), p('a2'))420 event = event_queue.get(timeout=5)[0]421 assert isinstance(event, DirMovedEvent)422 assert event.src_path == p('a', 'b')423 open(p('a2', 'b', 'c'), 'a').close()424 events = []425 while True:426 events.append(event_queue.get(timeout=5)[0])427 if event_queue.empty():428 break429 assert all([isinstance(e, (FileCreatedEvent, FileMovedEvent, DirMovedEvent, DirModifiedEvent)) for e in events])430 for event in events:431 if isinstance(event, FileCreatedEvent):432 assert event.src_path == p('a2', 'b', 'c')433 elif isinstance(event, FileMovedEvent):434 assert event.dest_path == p('a2', 'b', 'c')435 assert event.src_path == p('a', 'b', 'c')436 elif isinstance(event, DirMovedEvent):437 assert event.dest_path == p('a2')438 assert event.src_path == p('a')439 elif isinstance(event, DirModifiedEvent):440 assert event.src_path == p('a2', 'b')441@pytest.mark.skipif(platform.is_windows(),442 reason="Windows create another set of events for this test")443def test_move_nested_subdirectories():444 mkdir(p('dir1/dir2/dir3'), parents=True)445 mkfile(p('dir1/dir2/dir3', 'a'))446 start_watching()447 mv(p('dir1/dir2'), p('dir2'))448 expect_event(DirMovedEvent(p('dir1', 'dir2'), p('dir2')))449 expect_event(DirModifiedEvent(p('dir1')))450 expect_event(DirModifiedEvent(p()))451 expect_event(DirMovedEvent(p('dir1', 'dir2', 'dir3'), p('dir2', 'dir3')))452 expect_event(FileMovedEvent(p('dir1', 'dir2', 'dir3', 'a'), p('dir2', 'dir3', 'a')))453 if platform.is_bsd():454 event = event_queue.get(timeout=5)[0]455 assert p(event.src_path) == p()456 assert isinstance(event, DirModifiedEvent)457 event = event_queue.get(timeout=5)[0]458 assert p(event.src_path) == p('dir1')459 assert isinstance(event, DirModifiedEvent)460 touch(p('dir2/dir3', 'a'))461 event = event_queue.get(timeout=5)[0]462 assert event.src_path == p('dir2/dir3', 'a')463 assert isinstance(event, FileModifiedEvent)464@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)465@pytest.mark.skipif(not platform.is_windows(),466 reason="Non-Windows create another set of events for this test")467def test_move_nested_subdirectories_on_windows():468 mkdir(p('dir1/dir2/dir3'), parents=True)469 mkfile(p('dir1/dir2/dir3', 'a'))470 start_watching(p(''))471 mv(p('dir1/dir2'), p('dir2'))472 event = event_queue.get(timeout=5)[0]473 assert event.src_path == p('dir1', 'dir2')474 assert isinstance(event, FileDeletedEvent)475 event = event_queue.get(timeout=5)[0]476 assert event.src_path == p('dir2')477 assert isinstance(event, DirCreatedEvent)478 event = event_queue.get(timeout=5)[0]479 assert event.src_path == p('dir2', 'dir3')480 assert isinstance(event, DirCreatedEvent)481 event = event_queue.get(timeout=5)[0]482 assert event.src_path == p('dir2', 'dir3', 'a')483 assert isinstance(event, FileCreatedEvent)484 touch(p('dir2/dir3', 'a'))485 events = []486 while True:487 events.append(event_queue.get(timeout=5)[0])488 if event_queue.empty():489 break490 assert all([isinstance(e, (FileModifiedEvent, DirModifiedEvent)) for e in events])491 for event in events:492 if isinstance(event, FileModifiedEvent):493 assert event.src_path == p('dir2', 'dir3', 'a')494 elif isinstance(event, DirModifiedEvent):495 assert event.src_path in [p('dir2'), p('dir2', 'dir3')]496@pytest.mark.flaky(max_runs=5, min_passes=1, rerun_filter=rerun_filter)497@pytest.mark.skipif(platform.is_bsd(), reason="BSD create another set of events for this test")498def test_file_lifecyle():499 start_watching()500 mkfile(p('a'))501 touch(p('a'))502 mv(p('a'), p('b'))503 rm(p('b'))504 expect_event(FileCreatedEvent(p('a')))505 if not platform.is_windows():506 expect_event(DirModifiedEvent(p()))507 if platform.is_linux():508 expect_event(FileClosedEvent(p('a')))509 expect_event(DirModifiedEvent(p()))510 expect_event(FileModifiedEvent(p('a')))511 if platform.is_linux():512 expect_event(FileClosedEvent(p('a')))513 expect_event(DirModifiedEvent(p()))514 expect_event(FileMovedEvent(p('a'), p('b')))515 if not platform.is_windows():516 expect_event(DirModifiedEvent(p()))517 expect_event(DirModifiedEvent(p()))518 expect_event(FileDeletedEvent(p('b')))519 if not platform.is_windows():...
test_app_BaseServerManager.py
Source:test_app_BaseServerManager.py
1'''2Created on Feb 26, 20143@author: igor4'''5import unittest6from mock import MagicMock, ANY7import sys8import zmq9from app.BaseServerManager import BaseServerManager10from app.PollerManager import PollerManager11from transport.Event import EventBuilder12from transport.ConnectionBuilderLight import ConnectionBuilderLight13from transport.ConnectionLight import ConnectionLight14from transport.Connection import ConnectionTimeout15from transport.Connection import TransportInternalErr16from transport.Response import Response17from transport.Connection import Connection18import transport.Consts as transport_const19from dtm.Constants import EVENT_TYPES 20from dtm.EventObjects import AdminState, AdminStatData21import logging22FORMAT = '%(asctime)s - %(thread)ld - %(threadName)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s'23logging.basicConfig(level=logging.DEBUG, format=FORMAT) #basic logging configuration24class Matcher(object):25 26 def __init__(self, compare, some_obj):27 self.compare = compare28 self.some_obj = some_obj29 30 def __eq__(self, other):31 return self.compare(self.some_obj, other)32def matchEvents(first, second):33 return first.uid == second.uid and first.eventType == second.eventType and\34 first.eventObj == second.eventObj 35 36 37def matchStatEvents(first, second):38 print first.eventObj.fields39 print second.eventObj.fields40 return first.uid == second.uid and first.eventType == second.eventType and\41 first.eventObj.fields == second.eventObj.fields42def match_tcp_raw_events(first, second):43 return first.eventType == second.eventType and first.eventObj.__dict__ == second.eventObj.__dict__44class TestBaseServerManager(unittest.TestCase):45 def setUp(self):46 self.connectionBuilderLight = ConnectionBuilderLight()47 self.adminServerFake = self.connectionBuilderLight.build(transport_const.SERVER_CONNECT, BaseServerManager.ADMIN_CONNECT_ENDPOINT)48 49 self.sock_mock = MagicMock(spec = zmq.Socket)50 self.poller_mock = MagicMock(spec = zmq.Poller)51 self.poller_manager_mock = MagicMock(spec = PollerManager)52 self.connect_mock = MagicMock(spec = ConnectionLight) 53 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=self.connectionBuilderLight)54 self.event_builder = EventBuilder()55 self.event = self.event_builder.build(EVENT_TYPES.NEW_TASK, "eventObj") 56 self.connect_name = "Admin"57 self.event.connect_name = self.connect_name 58 self.adminState = AdminState("BaseServerManager", AdminState.STATE_SHUTDOWN)59 60 61 def tearDown(self):62 self.adminServerFake.close()63 def test_correct_add_new_connection(self):64 connect = ConnectionLight(self.sock_mock, transport_const.CLIENT_CONNECT) 65 self.base_server_manager.addConnection(self.connect_name, connect) 66 67 self.assertEqual(self.base_server_manager.connections[self.connect_name], connect, "")68 69 70 def test_set__event_handler(self): 71 handler_mock = MagicMock() 72 self.base_server_manager.setEventHandler(EVENT_TYPES.NEW_TASK, handler_mock)73 74 self.base_server_manager.process(self.event)75 76 handler_mock.assert_called_once_with(self.event)77 78 def test_call_unhandled_event_handler(self):79 unhandled_mock = MagicMock()80 self.base_server_manager.on_unhandled_event = unhandled_mock81 82 self.base_server_manager.process(self.event)83 84 unhandled_mock.assert_called_once_with(self.event)85 def test_send_event(self):86 self.base_server_manager.addConnection(self.connect_name, self.connect_mock) 87 self.base_server_manager.send(self.connect_name,self. event)88 89 self.connect_mock.send.assert_called_once_with(self.event)90 91 def test_reply(self):92 self.base_server_manager.addConnection(self.connect_name, self.connect_mock)93 94 reply_event = self.event_builder.build(EVENT_TYPES.NEW_TASK, "new obj") 95 expect_event = reply_event96 expect_event.uid = self.event.uid97 98 match_event = Matcher(matchEvents, expect_event)99 self.base_server_manager.reply(self.event, reply_event)100 101 self.connect_mock.send.assert_called_once_with(match_event)102 103 104 def test_run_one_loop(self): 105 self.poller_manager_mock.poll.return_value = [self.connect_name]106 self.connect_mock.recv.return_value = self.event107 handler_mock = MagicMock() 108 self.base_server_manager.setEventHandler(EVENT_TYPES.NEW_TASK, handler_mock)109 self.base_server_manager.addConnection(self.connect_name, self.connect_mock)110 111 self.base_server_manager.poll()112 113 handler_mock.assert_called_once_with(self.event)114 115 116 def test_call_on_timeout_callback(self):117 self.poller_manager_mock.poll.side_effect = ConnectionTimeout("boom") 118 on_timeout_mock = MagicMock()119 self.base_server_manager.on_poll_timeout = on_timeout_mock120 121 self.base_server_manager.poll()122 123 on_timeout_mock.assert_called_once_with()124 125 126 def test_propagate_out_TransportInternalErr(self):127 self.poller_manager_mock.poll.return_value = [self.connect_name]128 self.connect_mock.recv.side_effect = TransportInternalErr("boom")129 130 self.base_server_manager.addConnection(self.connect_name, self.connect_mock)131 132 with self.assertRaises(TransportInternalErr):133 list(self.base_server_manager.poll())134 135 136 137 def test_process_response_tcp_raw(self):138 raw_server = "raw_server"139 response = Response(["sock_identity", "id", "body"])140 connect_mock = MagicMock(Connection)141 connect_mock.recv.return_value = response142 self.poller_manager_mock.poll.return_value = [raw_server]143 144 self.base_server_manager.addConnection(raw_server, connect_mock)145 event_processor_mock = MagicMock()146 147 expect_event = self.base_server_manager.eventBuilder.build(EVENT_TYPES.SERVER_TCP_RAW, response)148 match_event = Matcher(match_tcp_raw_events, expect_event)149 150 self.base_server_manager.setEventHandler(EVENT_TYPES.SERVER_TCP_RAW, event_processor_mock)151 152 self.base_server_manager.poll()153 154 event_processor_mock.assert_called_once_with(match_event)155 156 157 def test_create_client_admin_connect(self):158 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)159 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)160 161 connectLightBuilder_mock.build.assert_called_once_with(ANY, ANY)162 163 164 def test_create_server_connect(self):165 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)166 adminConnect_mock = MagicMock(ConnectionLight)167 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock,168 admin_connection=adminConnect_mock)169 self.assertEqual(connectLightBuilder_mock.build.call_count, 0, "was called")170 171 172 def test_get_client_admin_event(self):173 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)174 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)175 176 adminEvent = self.event_builder.build(EVENT_TYPES.ADMIN_STATE, self.adminState) 177 connect_mock = MagicMock(ConnectionLight)178 connect_mock.recv.return_value = adminEvent179 180 self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT] 181 self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)182 183 self.base_server_manager.poll()184 185 self.assertEqual(self.base_server_manager.exit_flag, True, "still running")186 connect_mock.send.assert_called_once_with(ANY)187 188 189 def test_admin_event_broken_command_type(self):190 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)191 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)192 193 self.adminState.command = AdminState.STATE_NOP194 adminEvent = self.event_builder.build(EVENT_TYPES.ADMIN_STATE, self.adminState) 195 connect_mock = MagicMock(ConnectionLight)196 connect_mock.recv.return_value = adminEvent197 198 self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT] 199 self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)200 201 self.base_server_manager.poll()202 203 self.assertEqual(self.base_server_manager.exit_flag, False, "stopped")204 connect_mock.send.assert_called_once_with(ANY)205 206 207 def test_admin_event_broken_class_name(self):208 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)209 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)210 211 self.adminState.className = "AdminState.STATE_NOP"212 adminEvent = self.event_builder.build(EVENT_TYPES.ADMIN_STATE, self.adminState) 213 connect_mock = MagicMock(ConnectionLight)214 connect_mock.recv.return_value = adminEvent215 216 self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT] 217 self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)218 219 self.base_server_manager.poll()220 221 self.assertEqual(self.base_server_manager.exit_flag, False, "stopped")222 connect_mock.send.assert_called_once_with(ANY)223 224 225 def test_init_stat_fields_admin_connect(self):226 className = self.base_server_manager.__class__.__name__227 ready = AdminState(className, AdminState.STATE_READY)228 readyEvent = self.base_server_manager.eventBuilder.build(EVENT_TYPES.ADMIN_STATE_RESPONSE, ready)229 admin_connect_name = BaseServerManager.ADMIN_CONNECT_CLIENT230 self.assertTrue(self.base_server_manager.statFields[admin_connect_name + "_send_cnt"] == 1, "")231 self.assertTrue(self.base_server_manager.statFields[admin_connect_name + "_recv_cnt"] == 0, "")232 self.assertTrue(self.base_server_manager.statFields[admin_connect_name + "_send_bytes"] == sys.getsizeof(readyEvent), "")233 self.assertTrue(self.base_server_manager.statFields[admin_connect_name + "_recv_bytes"] == 0, "")234 235 236 def test_init_user_connection(self):237 fake_user_connect = None238 connect_name = "user_connect"239 self.base_server_manager.addConnection(connect_name, fake_user_connect)240 241 self.assertTrue(self.base_server_manager.statFields[connect_name + "_send_cnt"] == 0, "")242 self.assertTrue(self.base_server_manager.statFields[connect_name + "_recv_cnt"] == 0, "")243 self.assertTrue(self.base_server_manager.statFields[connect_name + "_send_bytes"] == 0, "")244 self.assertTrue(self.base_server_manager.statFields[connect_name + "_recv_bytes"] == 0, "")245 246 def test_stats_recv_several_msg(self):247 msg_size = sys.getsizeof(self.event)248 fake_user_connect = None249 connect_name = "user_connect"250 self.event.connect_name = connect_name251 self.base_server_manager.addConnection(connect_name, fake_user_connect)252 253 self.base_server_manager.process(self.event)254 self.base_server_manager.process(self.event)255 self.assertTrue(self.base_server_manager.statFields[connect_name + "_send_cnt"] == 0, "")256 self.assertTrue(self.base_server_manager.statFields[connect_name + "_recv_cnt"] == 2, "")257 self.assertTrue(self.base_server_manager.statFields[connect_name + "_send_bytes"] == 0, "")258 self.assertTrue(self.base_server_manager.statFields[connect_name + "_recv_bytes"] == 2 * msg_size, "")259 260 261 def test_stats_update_invalid_stat_key(self):262 self.event.connect_name = "bad_stat_name"263 264 self.base_server_manager.process(self.event)265 266 267 def test__admin_fetch_stat_data_all(self): 268 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)269 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)270 271 adminStatData = AdminStatData("BaseServerManager") 272 adminStatEvent = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA, adminStatData) 273 connect_mock = MagicMock(ConnectionLight)274 connect_mock.recv.return_value = adminStatEvent275 276 self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT] 277 self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)278 expectAdminStatData = AdminStatData("BaseServerManager", self.base_server_manager.statFields)279 expect_event = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, expectAdminStatData)280 expect_event.uid = adminStatEvent.uid 281 match_event = Matcher(matchStatEvents, expect_event)282 283 self.base_server_manager.poll()284 285 connect_mock.send.assert_called_once_with(match_event)286 287 288 def test__admin_fetch_stat_data_some_fields(self): 289 connectLightBuilder_mock = MagicMock(ConnectionBuilderLight)290 self.base_server_manager = BaseServerManager(self.poller_manager_mock, conectionLightBuilder=connectLightBuilder_mock)291 292 adminStatData = AdminStatData("BaseServerManager", dict({"Admin_recv_cnt":0, "WrongName":0})) 293 adminStatEvent = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA, adminStatData) 294 connect_mock = MagicMock(ConnectionLight)295 connect_mock.recv.return_value = adminStatEvent296 297 self.poller_manager_mock.poll.return_value = [self.base_server_manager.ADMIN_CONNECT_CLIENT]298 ##broken initial statistics 299 self.base_server_manager.addConnection(self.base_server_manager.ADMIN_CONNECT_CLIENT, connect_mock)300 expectAdminStatData = AdminStatData("BaseServerManager", dict({"Admin_recv_cnt":1, "WrongName":0}))301 expect_event = self.event_builder.build(EVENT_TYPES.ADMIN_FETCH_STAT_DATA_RESPONSE, expectAdminStatData)302 expect_event.uid = adminStatEvent.uid 303 match_event = Matcher(matchStatEvents, expect_event)304 305 self.base_server_manager.poll()306 307 connect_mock.send.assert_called_once_with(match_event)...
log.py
Source:log.py
...70 return "log_id" in other and other["log_id"] == self.log_id71 return NotImplemented72 def __ne__(self, other):73 return not self == other74def expect_event(func):75 @functools.wraps(func)76 def wrapper(self, event, *args, **kwds):77 if isinstance(event, str):78 event = Event(event)79 elif isinstance(event, bytes):80 event = Event(event)81 elif isinstance(event, six.text_type):82 event = Event(event) 83 elif not callable(event):84 raise ValueError("func object was not callable nor str or byte")85 return func(self, event, *args, **kwds)86 return wrapper87class Logger(_Logger):88 @expect_event...
conditional_selection_rate.py
Source:conditional_selection_rate.py
1# Copyright (c) Microsoft Corporation. All rights reserved.2# Licensed under the MIT License.3import pandas as pd4from .moment import ClassificationMoment5from .moment import _GROUP_ID, _LABEL, _PREDICTION, _ALL, _EVENT, _SIGN, _SW6from .error_rate import ErrorRate7_DIFF = "diff"8_WPREDICTION = "wprediction"9class ConditionalSelectionRate(ClassificationMoment):10 """Generic fairness metric including DemographicParity and EqualizedOdds"""11 def default_objective(self):12 return ErrorRate()13 def load_data(self, X, y, sw, event=None, **kwargs):14 super().load_data(X, y, sw, **kwargs)15 self.tags[_EVENT] = event16 self.prob_event = self.tags.groupby(_EVENT)[_SW].sum() / self.sw.sum()17 self.prob_group_event = self.tags.groupby(18 [_EVENT, _GROUP_ID])[_SW].sum() / self.sw.sum()19 signed = pd.concat([self.prob_group_event, self.prob_group_event],20 keys=["+", "-"],21 names=[_SIGN, _EVENT, _GROUP_ID])22 self.index = signed.index23 self.default_objective_lambda_vec = None24 25 # fill in the information about the basis26 event_vals = self.tags[_EVENT].dropna().unique()27 group_vals = self.tags[_GROUP_ID].unique()28 self.pos_basis = pd.DataFrame()29 self.neg_basis = pd.DataFrame()30 self.neg_basis_present = pd.Series()31 zero_vec = pd.Series(0.0, self.index)32 i = 033 for event_val in event_vals:34 for group in group_vals[:-1]:35 self.pos_basis[i] = 0 + zero_vec36 self.neg_basis[i] = 0 + zero_vec37 self.pos_basis[i]["+", event_val, group] = 138 self.neg_basis[i]["-", event_val, group] = 139 self.neg_basis_present.at[i] = True40 i += 141 def gamma(self, predictor):42 """ Calculates the degree to which constraints are currently violated by43 the predictor.44 """45 pred = predictor(self.X)46 self.tags[_PREDICTION] = pred47 self.tags[_WPREDICTION] = self.tags[_PREDICTION].multiply(self.sw)48 expect_event = self.tags.groupby(_EVENT).mean()49 expect_event[_WPREDICTION] = expect_event[_WPREDICTION]/expect_event[_SW]50 expect_group_event = self.tags.groupby(51 [_EVENT, _GROUP_ID]).mean()52 expect_group_event[_WPREDICTION] = expect_group_event[_WPREDICTION]/expect_group_event[_SW]53 expect_group_event[_DIFF] = expect_group_event[_WPREDICTION] - expect_event[_WPREDICTION]54 g_unsigned = expect_group_event[_DIFF]55 g_signed = pd.concat([g_unsigned, -g_unsigned],56 keys=["+", "-"],57 names=[_SIGN, _EVENT, _GROUP_ID])58 self._gamma_descr = str(expect_group_event[[_WPREDICTION, _DIFF]])59 return g_signed60 # TODO: this can be further improved using the overcompleteness in group membership61 def project_lambda(self, lambda_vec):62 lambda_pos = lambda_vec["+"] - lambda_vec["-"]63 lambda_neg = -lambda_pos64 lambda_pos[lambda_pos < 0.0] = 0.065 lambda_neg[lambda_neg < 0.0] = 0.066 lambda_projected = pd.concat([lambda_pos, lambda_neg],67 keys=["+", "-"],68 names=[_SIGN, _EVENT, _GROUP_ID])69 return lambda_projected70 def signed_weights(self, lambda_vec):71# lambda_signed = lambda_vec["+"] - lambda_vec["-"]72# adjust = lambda_signed.sum(level=_EVENT) / self.prob_event \73# - lambda_signed / self.prob_group_event74# signed_weights = self.tags.apply(75# lambda row: adjust[row[_EVENT], row[_GROUP_ID]], axis=176# )77# return signed_weights78 lambda_event = (lambda_vec["+"] - lambda_vec["-"]).sum(level=_EVENT) / \79 self.prob_event80 lambda_group_event = (lambda_vec["+"] - lambda_vec["-"]) / \81 self.prob_group_event82 adjust = lambda_event - lambda_group_event83 signed_weights = self.tags.apply(84 lambda row: 0 if pd.isna(row[_EVENT]) else adjust[row[_EVENT], row[_GROUP_ID]], axis=185 )86# utility_diff = self.utilities[:, 1] - self.utilities[:, 0]87# signed_weights = utility_diff.T * signed_weights88 return signed_weights89# Ensure that ConditionalSelectionRate shows up in correct place in documentation90# when it is used as a base class91ConditionalSelectionRate.__module__ = "fairlearn.reductions"92class DemographicParity(ConditionalSelectionRate):93 """ Demographic parity94 A classifier h satisfies DemographicParity if95 Prob[h(X) = y' | A = a] = Prob[h(X) = y'] for all a, y'96 """97 short_name = "DemographicParity"98 def load_data(self, X, y, sw, **kwargs):99 super().load_data(X, y, sw, event=_ALL, **kwargs)100class EqualizedOdds(ConditionalSelectionRate):101 """ Equalized odds102 Adds conditioning on label compared to Demographic parity, i.e.103 Prob[h(X) = y' | A = a, Y = y] = Prob[h(X) = y' | Y = y] for all a, y, y'104 """105 short_name = "EqualizedOdds"106 def load_data(self, X, y, sw, **kwargs):107 super().load_data(X, y, sw,108 event=pd.Series(y).apply(lambda y: _LABEL + "=" + str(y)),109 **kwargs)110class EqualOpportunity(ConditionalSelectionRate):111 """ Equal Opportunity112 Adds conditioning on label compared to Demographic parity, i.e.113 Prob[h(X) = y' | A = a, Y = y] = Prob[h(X) = y' | Y = y] for all a, y=1, y'114 """115 short_name = "EqualOpportunity"116 def load_data(self, X, y, sw, **kwargs):117 super().load_data(X, y, sw,118 event=pd.Series(y).apply(lambda y: _LABEL + "=" + str(y)).where(y == 1),...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!